当前位置: 首页 > news >正文

厚街找人做网站微信视频号推广方法

厚街找人做网站,微信视频号推广方法,wordpress正版插件,wordpress音乐插件h5一、Kafka消费者提交Offset的策略 Kafka消费者提交Offset的策略有 自动提交Offset#xff1a; 消费者将消息拉取下来以后未被消费者消费前#xff0c;直接自动提交offset。自动提交可能丢失数据#xff0c;比如消息在被消费者消费前已经提交了offset#xff0c;有可能消息…一、Kafka消费者提交Offset的策略 Kafka消费者提交Offset的策略有 自动提交Offset 消费者将消息拉取下来以后未被消费者消费前直接自动提交offset。自动提交可能丢失数据比如消息在被消费者消费前已经提交了offset有可能消息拉取下来以后消费者挂了手动提交Offset 消费者在消费消息时/后再提交offset在消费者中实现手动提交Offset分为手动同步提交、手动异步提交什么是Offset 参考文章Linux【Kafka三】组件介绍 二、自动提交策略 Kafka消费者默认是自动提交Offset的策略 可设置自动提交的时间间隔 package com.demo.lxb.kafka;import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.serialization.StringSerializer;import java.time.Duration; import java.util.Arrays; import java.util.Properties;/*** Description: kafka消费者消费消息,自动提交offset* Author: lvxiaobu* Date: 2023-10-24 16:26**/ public class MyConsumerAutoSubmitOffset {private final static String CONSUMER_GROUP_NAME GROUP1;private final static String TOPIC_NAME topic0921;public static void main(String[] args) {Properties props new Properties();// 一、设置参数// 配置kafka地址 // props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, // 192.168.151.28:9092); // 单机配置props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,192.168.154.128:9092,192.168.154.128:9093,192.168.154.128:9094); // 集群配置// 配置消息 键值的序列化规则props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());// 配置消费者组props.put(ConsumerConfig.GROUP_ID_CONFIG,CONSUMER_GROUP_NAME);// 设置消费者offset的提交方式// 自动提交默认配置props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,true);// 自动提交offset的时间间隔props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,1000);// 二、创建消费者KafkaConsumerString,String consumer new KafkaConsumerString,String(props);// 三、消费者订阅主题consumer.subscribe(Arrays.asList(TOPIC_NAME));// 四、拉取消息开始消费while (true){// 从kafka集群中拉取消息ConsumerRecordsString, String records consumer.poll(Duration.ofMillis(1000));// 消费消息,当前是自动提交模式,在消息上一行消息被拉取下来以后,offset就自动被提交了,下面的代码如果出错,或者此时// 消费者挂掉了,那么消费其实是没有进行消费的(也就是业务逻辑处理)for (ConsumerRecordString, String record : records) {System.out.println(接收到的消息: 分区: record.partition() , offset: record.offset() , key值: record.key() , value值: record.value());}}} }上述代码中的如下代码是自动提交策略的相关设置  // 设置消费者offset的提交方式// 自动提交默认配置props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,true);// 自动提交offset的时间间隔props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,1000); 三、手动提交策略 3.1、手动同步提交策略 手动同步提交会在提交offset处阻塞。当消费者接收到 kafka集群返回的消费者提交offset成功的ack后才开始执行消费者中后续的代码。 因为使用异步提交容易丢失消息固一般使用同步提交在同步提交后不要再做其他逻辑处理。 package com.demo.lxb.kafka;import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.serialization.StringSerializer;import java.time.Duration; import java.util.Arrays; import java.util.Properties;/*** Description: kafka消费者消费消息,手动同步提交offset* Author: lvxiaobu* Date: 2023-10-24 16:26**/ public class MyConsumerMauSubmitOffset {private final static String CONSUMER_GROUP_NAME GROUP1;private final static String TOPIC_NAME topic0921;public static void main(String[] args) {Properties props new Properties();// 一、设置参数// 配置kafka地址 // props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, // 192.168.151.28:9092); // 单机配置props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,192.168.154.128:9092,192.168.154.128:9093,192.168.154.128:9094); // 集群配置// 配置消息 键值的序列化规则props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());// 配置消费者组props.put(ConsumerConfig.GROUP_ID_CONFIG,CONSUMER_GROUP_NAME);// 设置消费者offset的提交方式// 手动提交offsetprops.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false);// 自动提交offset的时间间隔:此时不再需要设置该值 // props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,1000);// 二、创建消费者KafkaConsumerString,String consumer new KafkaConsumerString,String(props);// 三、消费者订阅主题consumer.subscribe(Arrays.asList(TOPIC_NAME));// 四、拉取消息开始消费while (true){// 从kafka集群中拉取消息ConsumerRecordsString, String records consumer.poll(Duration.ofMillis(1000));// 业务逻辑处理for (ConsumerRecordString, String record : records) {System.out.println(接收到的消息: 分区: record.partition() , offset: record.offset() , key值: record.key() , value值: record.value());}// 当for循环业务逻辑处理结束以后,再手动提交offset// 同步方式提交,此时会产生阻塞,当kafka集群返回了提交成功的ack以后,才会消除阻塞,进行后续的代码逻辑。// 一般使用同步提交在同步提交后不再做其他逻辑处理consumer.commitAsync();// do anything}} }3.2、手动异步提交策略 异步提交不会在提交offset代码处阻塞即消费者提交了offset后不需要等待kafka集群返回的ack即可继续执行后续代码。但是在提交offset时需要提供一个回调方法供kafka集群回调来告诉消费者提交offset的结果。 package com.demo.lxb.kafka;import com.alibaba.fastjson.JSONObject; import org.apache.kafka.clients.consumer.*; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.serialization.StringSerializer;import java.time.Duration; import java.util.Arrays; import java.util.Map; import java.util.Properties;/*** Description: kafka消费者消费消息,手动异步提交offset* Author: lvxiaobu* Date: 2023-10-24 16:26**/ public class MyConsumerMauSubmitOffset2 {private final static String CONSUMER_GROUP_NAME GROUP1;private final static String TOPIC_NAME topic0921;public static void main(String[] args) {Properties props new Properties();// 一、设置参数// 配置kafka地址 // props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, // 192.168.151.28:9092); // 单机配置props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,192.168.154.128:9092,192.168.154.128:9093,192.168.154.128:9094); // 集群配置// 配置消息 键值的序列化规则props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());// 配置消费者组props.put(ConsumerConfig.GROUP_ID_CONFIG,CONSUMER_GROUP_NAME);// 设置消费者offset的提交方式// 手动提交offsetprops.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false);// 自动提交offset的时间间隔:此时不再需要设置该值 // props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,1000);// 二、创建消费者KafkaConsumerString,String consumer new KafkaConsumerString,String(props);// 三、消费者订阅主题consumer.subscribe(Arrays.asList(TOPIC_NAME));// 四、拉取消息开始消费while (true){// 从kafka集群中拉取消息ConsumerRecordsString, String records consumer.poll(Duration.ofMillis(1000));for (ConsumerRecordString, String record : records) {System.out.println(接收到的消息: 分区: record.partition() , offset: record.offset() , key值: record.key() , value值: record.value());}// 异步提交,不影响后续的内容。// new OffsetCommitCallback是kafka集群会回调的方法,告诉消费者提交offset的结果consumer.commitAsync(new OffsetCommitCallback() {Overridepublic void onComplete(MapTopicPartition, OffsetAndMetadata map, Exception e) {if(e ! null){// 可将提交失败的消息记录到日志System.out.println(记录提交offset失败的消息到日志);System.out.println(消费者提交offset抛出异常: Arrays.toString(e.getStackTrace()));System.out.println(消费者提交offset异常的消息信息: JSONObject.toJSONString(map));}}});// 后续逻辑处理,不需要等到kafka集群返回了提交成功的ack以后才开始处理。//do anything}} }
http://www.zqtcl.cn/news/472576/

相关文章:

  • 网站系统维护一般多久电商关键字优化
  • 孝感市建设局网站宁波seo网络推广价格
  • 百度商桥网站网络编程技术试题
  • 设计素材网站排名网站建设网站软件有哪些内容
  • 互联网兼职做网站维护wordpress评论微信通知
  • 合肥瑶海区网站建设方案长沙网站 建设推广世云网络
  • wordpress 挂码seo推广公司哪家好
  • 高端 网站设计公司wordpress添加投稿功能
  • 长沙 网站设计 公司价格江苏专业网站建设费用
  • 做的好的手机网站有哪些内容手机怎么做app详细步骤
  • net网站开发参考文献c++能不能作为网页开发语言
  • 我公司让别人做网站了怎么办厦门logo设计公司
  • 闸北专业做网站怎么判断网站优化过度
  • 搭建网站seowordpress重新安装如何做
  • 网站设计优化重庆教育建设有限公司网站
  • 域名注册网站查询手工制作视频教程简单又漂亮
  • 书画院网站源码网站百度指数
  • 网页设计与网站开发第三版课后答案网络运营商是干嘛的
  • wordpress分类目录网站主题自己做营销型网站
  • 简述网站推广的五要素seo排名软件怎么做
  • 做网站能做职业吗织梦如何做几种语言的网站
  • 手机网站定制咨询如何修改网站
  • 长沙大型网站建设公司建站工作室源码
  • 找设计方案的网站专注南昌网站建设
  • UE做的比较好的网站汕头网站关键词优化教程
  • 做羞羞的事情网站广州番禺招聘网最新招聘信息
  • 网站基础开发成本网站建设策划包括哪些内容
  • 商务网站建设哪家好绍兴网站建设做网站
  • 网站域名管理东莞网页设计和网页制作
  • 网站建设与制作报价网站app制作