当前位置: 首页 > news >正文

青岛公司网站建设价格去中企动力上班怎么样

青岛公司网站建设价格,去中企动力上班怎么样,源码商城网站源码,软件开发培训哪有默认情况下#xff0c;一个分区只能被消费者组中的一个消费者消费。但可以自定义PartitionAssignor来打破这个限制。 一、自定义PartitionAssignor. package com.cisdi.dsp.modules.metaAnalysis.rest.kafka2023;import org.apache.kafka.clients.consumer.internals.Abstrac…默认情况下一个分区只能被消费者组中的一个消费者消费。但可以自定义PartitionAssignor来打破这个限制。 一、自定义PartitionAssignor. package com.cisdi.dsp.modules.metaAnalysis.rest.kafka2023;import org.apache.kafka.clients.consumer.internals.AbstractPartitionAssignor; import org.apache.kafka.common.TopicPartition;import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map;public class BroadcastAssignor extends AbstractPartitionAssignor {Overridepublic String name() {return broadcast;}private MapString, ListString consumersPerTopic(MapString, Subscription consumerMetadata) {MapString, ListString res new HashMap();for (Map.EntryString, Subscription subscriptionEntry : consumerMetadata.entrySet()) {String consumerId subscriptionEntry.getKey();for (String topic : subscriptionEntry.getValue().topics())put(res, topic, consumerId);}return res;}Overridepublic MapString, ListTopicPartition assign(MapString, Integer partitionsPerTopic,MapString, Subscription subscriptions) {MapString, ListString consumersPerTopic consumersPerTopic(subscriptions);MapString, ListTopicPartition assignment new HashMap();subscriptions.keySet().forEach(memberId -assignment.put(memberId, new ArrayList()));consumersPerTopic.entrySet().forEach(topicEntry-{String topic topicEntry.getKey();ListString members topicEntry.getValue();Integer numPartitionsForTopic partitionsPerTopic.get(topic);if (numPartitionsForTopic null || members.isEmpty())return;ListTopicPartition partitions AbstractPartitionAssignor.partitions(topic, numPartitionsForTopic);if (!partitions.isEmpty()) {members.forEach(memberId -assignment.get(memberId).addAll(partitions));}});return assignment;} }二、定义两个消费者给其配置上述PartitionAssignor. package com.cisdi.dsp.modules.metaAnalysis.rest.kafka2023;import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringSerializer;import java.time.Duration; import java.time.temporal.TemporalUnit; import java.util.Arrays; import java.util.Properties; import java.util.concurrent.TimeUnit;public class KafkaTest19 {private static Properties getProperties(){Properties propertiesnew Properties();properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,xx.xx.xx.xx:9092);properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG,testGroup2023);properties.setProperty(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,BroadcastAssignor.class.getName());return properties;}public static void main(String[] args) {KafkaConsumerString,String myConsumernew KafkaConsumerString, String(getProperties());String topicstudy2023;myConsumer.subscribe(Arrays.asList(topic));while(true){ConsumerRecordsString,String consumerRecordsmyConsumer.poll(Duration.ofMillis(5000));for(ConsumerRecord record: consumerRecords){System.out.println(record.value());System.out.println(record offset is: record.offset());}}} } package com.cisdi.dsp.modules.metaAnalysis.rest.kafka2023;import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringSerializer;import java.time.Duration; import java.time.temporal.TemporalUnit; import java.util.Arrays; import java.util.Properties; import java.util.concurrent.TimeUnit;public class KafkaTest20 {private static Properties getProperties(){Properties propertiesnew Properties();properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,xx.xx.xx.xx:9092);properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG,testGroup2023);properties.setProperty(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,BroadcastAssignor.class.getName());return properties;}public static void main(String[] args) {KafkaConsumerString,String myConsumernew KafkaConsumerString, String(getProperties());String topicstudy2023;myConsumer.subscribe(Arrays.asList(topic));while(true){ConsumerRecordsString,String consumerRecordsmyConsumer.poll(Duration.ofMillis(5000));for(ConsumerRecord record: consumerRecords){System.out.println(record.value());System.out.println(record offset is: record.offset());}}} } 在kafka创建只有一个分区的topic study2023 创建一个生产者往study2023这个 topic发送消息 package com.cisdi.dsp.modules.metaAnalysis.rest.kafka2023;import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.common.serialization.StringSerializer;import java.util.Date; import java.util.Properties; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future;public class KafkaTest01 {public static void main(String[] args) {Properties properties new Properties();properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,xx.xx.xx.xx:9092);KafkaProducerString,String kafkaProducernew KafkaProducerString, String(properties);ProducerRecordString,String producerRecordnew ProducerRecord(study2023,0,fff,hello sister,now is: new Date());FutureRecordMetadata future kafkaProducer.send(producerRecord);long offset 0;try {offset future.get().offset();} catch (InterruptedException e) {e.printStackTrace();} catch (ExecutionException e) {e.printStackTrace();}System.out.println(offset);kafkaProducer.close();} } 分别运行生产者和消费者可以看到相同消费者组里两个消费者可以消费study2023这个topic的同一个分区的数据
http://www.zqtcl.cn/news/610903/

相关文章:

  • 廊坊网站建设技术托管seo怎么优化关键词排名培训
  • 抛丸机网站怎么做手机网站打不开的解决方法
  • 上海做网站的公司多少钱冷水江网站
  • 百度网站流量查询宣传片制作公司费用
  • 安徽炒股配资网站开发搭建平台载体
  • 中华建设杂志网站记者黑龙江省建设集团有限公司网站首页
  • 成都络迈品牌网站建设网站建设的行业资讯、
  • 英语网站大全免费赤峰市建设厅官方网站
  • 宁波网站建设熊掌号成都网络关键词排名
  • 织梦网站改版需要怎么做平台设计软件
  • 企业展示型网站网站建设设计
  • 增城网站建设服务网站建设制作设计公司佛山
  • 微网站套餐自媒体网站源码模板dede
  • 企业网站改版升级成都便宜网站建设公司
  • 广州公共资源建设工程交易中心网站新塘做网站
  • 数码港 太原网站开发公司iis 建立子网站
  • 做一个自己的网站需要什么商标设计网站猪八戒
  • 傻瓜式网站建设软件保险预约
  • 网站 备案规定自己做简单网站
  • 网站上怎么做支付接口南乐网站建设
  • 咸阳网站建设公司电话做个公司网站大概多少钱
  • 网站如何做关键词排名点子网创意网
  • 浙江建设培训考试网站河源东莞网站建设
  • 网站移动端做pc端的301跳转哪些网站是增值网
  • wordpress新闻站浙江耀华建设集团网站
  • 网站开发代理企业网站推广技巧和方法
  • 俄语网站开发用模板做的网站多少钱
  • 丽水网站建设公司广州网络公司
  • 做基金的网站哪个好针对大学生推广引流
  • 国外对旅游网站的建设互联网推广和互联网营销