当前位置: 首页 > news >正文

博客网站开发源代码中国建设人才服务信息网是什么网站

博客网站开发源代码,中国建设人才服务信息网是什么网站,wordpress file not found,wordpress头像缓存到本地1. 自动提交最简单的提交方式是让消费者自动提交偏移量。如果 enable.auto.commit 被设为 true#xff0c;那么每过 5s#xff0c;消费者会自动把从 poll() 方法接收到的最大偏移量提交上去。提交时间间隔由 auto.commit.interval.ms 控制#xff0c;默认值是5s。消费者每次…1. 自动提交最简单的提交方式是让消费者自动提交偏移量。如果 enable.auto.commit 被设为 true那么每过 5s消费者会自动把从 poll() 方法接收到的最大偏移量提交上去。提交时间间隔由 auto.commit.interval.ms 控制默认值是5s。消费者每次获取新数据时都会先把上一次poll()方法返回的最大偏移量提交上去。可能造成的问题数据重复读假设我们仍然使用默认的 5s 提交时间间隔在最近一次提交之后的 3s 发生了再均衡再均衡之后消费者从最后一次提交的偏移量位置开始读取消息。这个时候偏移量已经落后了 3s所以在这 3s内到达的消息会被重复处理。可以通过修改提交时间间隔来更频繁地提交偏移量减小可能出现重复消息的时间窗不过这种情况是无法完全避免的。2. 手动提交(1) 同步提交// 把auto.commit.offset设为false让应用程序决定何时提交偏移量props.put(auto.commit.offset, false);try{while(true) {ConsumerRecords records consumer.poll(1000);for(ConsumerRecord record : records) {// 假设把记录内容打印出来就算处理完毕System.out.println(value record.value() , topic record.topic() , partition record.partition() , offset record.offset());}try{// 只要没有发生不可恢复的错误commitSync() 方法会一直尝试直至提交成功// 如果提交失败我们也只能把异常记录到错误日志里consumer.commitSync();}catch(CommitFailedException e) {System.err.println(commit failed! e.getMessage());}}}finally {consumer.close();}(2) 异步提交手动提交有一个不足之处在 broker 对提交请求作出回应之前应用程序会一直阻塞这样会限制应用程序的吞吐量。我们可以通过降低提交频率来提升吞吐量但如果发生了再均衡会增加重复消息的数量。这个时候可以使用异步提交只管发送提交请求无需等待 broker 的响应。// 把auto.commit.offset设为false让应用程序决定何时提交偏移量props.put(auto.commit.offset, false);try{while(true) {ConsumerRecords records consumer.poll(1000);for(ConsumerRecord record : records) {System.out.println(value record.value() , topic record.topic() , partition record.partition() , offset record.offset());}// 提交最后一个偏移量然后继续做其他事情。consumer.commitAsync();}}finally {consumer.close();}在成功提交或碰到无法恢复的错误之前commitSync()会一直重试但是commitAsync()不会这也是commitAsync()不好的一个地方。它之所以不进行重试是因为在它收到服务器响应的时候可能有一个更大的偏移量已经提交成功。假设我们发出一个请求用于提交偏移量2000这个时候发生了短暂的通信问题服务器收不到请求自然也不会作出任何响应。与此同时我们处理了另外一批消息并成功提交了偏移量3000。如果commitAsync()重新尝试提交偏移量2000它有可能在偏移量3000之后提交成功。这个时候如果发生再均衡就会出现重复消息。commitAsync()也支持回调在broker作出响应时会执行回调// 把auto.commit.offset设为false让应用程序决定何时提交偏移量props.put(auto.commit.offset, false);try {while (true) {ConsumerRecords records consumer.poll(1000);for (ConsumerRecord record : records) {System.out.println(value record.value() , topic record.topic() , partition record.partition() , offset record.offset());}consumer.commitAsync(new OffsetCommitCallback() {Overridepublic void onComplete(Map offsets, Exception exception) {if(offsets ! null) {System.out.println(commit offset successful!);}if(exception ! null) {System.out.println(commit offset fail! exception.getMessage());}}});}} finally {consumer.close();}可以在回调中重试失败的提交以下为思路使用一个单调递增的序列号来维护异步提交的顺序。在每次提交偏移量之后或在回调里提交偏移量时递增序列号。在进行重试前先检查回调的序列号和即将提交的偏移量是否相等如果相等说明没有新的提交那么可以安全地进行重试。如果序列号比较大说明有一个新的提交已经发送出去了应该停止重试。(3) 同步和异步组合提交一般情况下针对偶尔出现的提交失败不进行重试不会有太大问题因为如果提交失败是因为临时问题导致的那么后续的提交总会有成功的。但如果这是发生在关闭消费者或再均衡前的最后一次提交就要确保能够提交成功。try {while (true) {ConsumerRecords records consumer.poll(1000);for (ConsumerRecord record : records) {System.out.println(value record.value() , topic record.topic() , partition record.partition() , offset record.offset());}// 如果一切正常我们使用 commitAsync() 方法来提交// 这样速度更快而且即使这次提交失败下一次提交很可能会成功consumer.commitAsync();}}catch (Exception e) {e.printStackTrace();}finally {try {// 使用 commitSync() 方法会一直重试直到提交成功或发生无法恢复的错误// 确保关闭消费者之前成功提交了偏移量consumer.commitSync();}finally {consumer.close();}}(4) 提交特定的偏移量不管是自动提交还是使用commitAsync()或者commitSync()来提交偏移量提交的都是 poll() 方法返回的那批数据的最大偏移量想要自定义在什么时候提交偏移量可以这么做Map currentOffsets new HashMap();int count 0;......try {while (true) {ConsumerRecords records consumer.poll(1000);for (ConsumerRecord record : records) {System.out.println(value record.value() , topic record.topic() , partition record.partition() , offset record.offset());currentOffsets.put(new TopicPartition(record.topic(), record.partition()),new OffsetAndMetadata(record.offset() 1, no metadata));if (count % 1000 0) {// 这里调用的是 commitAsync()不过调用 commitSync() 也是完全可以的// 当然在提交特定偏移量时仍然要处理可能发生的错误consumer.commitAsync(currentOffsets, null);}count;}}}finally {consumer.close();}3. 分区再均衡监听器消费者在退出和进行分区再均衡之前应该做一些正确的事情提交最后一个已处理记录的偏移量(必须做)根据之前处理数据的业务不同你可能还需要关闭数据库连接池、清空缓存等程序如何能得知集群要进行分区再均衡了消费者 API 提供了再均衡监听器以下程序可以做到 kafka 消费数据的 Exactly Once 语义package com.bonc.rdpe.kafka110.consumer;import java.util.Collection;import java.util.Collections;import java.util.HashMap;import java.util.Map;import java.util.Properties;import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;import org.apache.kafka.clients.consumer.ConsumerRecord;import org.apache.kafka.clients.consumer.ConsumerRecords;import org.apache.kafka.clients.consumer.KafkaConsumer;import org.apache.kafka.clients.consumer.OffsetAndMetadata;import org.apache.kafka.common.TopicPartition;/*** Title RebalanceListenerConsumer.java* Description 再均衡监听器* Author YangYunhe* Date 2018-06-27 17:35:05*/public class RebalanceListenerConsumer {public static void main(String[] args) {Map currentOffsets new HashMap();Properties props new Properties();props.put(bootstrap.servers, 192.168.42.89:9092,192.168.42.89:9093,192.168.42.89:9094);// 把auto.commit.offset设为false让应用程序决定何时提交偏移量props.put(auto.commit.offset, false);props.put(group.id, dev3-yangyunhe-group001);props.put(key.deserializer, org.apache.kafka.common.serialization.StringDeserializer);props.put(value.deserializer, org.apache.kafka.common.serialization.StringDeserializer);KafkaConsumer consumer new KafkaConsumer(props);consumer.subscribe(Collections.singletonList(dev3-yangyunhe-topic001), new ConsumerRebalanceListener() {/** 再均衡开始之前和消费者停止读取消息之后被调用* 如果在这里提交偏移量下一个接管分区的消费者就知道该从哪里开始读取了*/Overridepublic void onPartitionsRevoked(Collection partitions) {// 如果发生再均衡我们要在即将失去分区所有权时提交偏移量// 要注意提交的是最近处理过的偏移量而不是批次中还在处理的最后一个偏移量System.out.println(Lost partitions in rebalance. Committing current offsets: currentOffsets);consumer.commitSync(currentOffsets);}/** 在重新分配分区之后和新的消费者开始读取消息之前被调用*/Overridepublic void onPartitionsAssigned(Collection partitions) {long committedOffset -1;for(TopicPartition topicPartition : partitions) {// 获取该分区已经消费的偏移量committedOffset consumer.committed(topicPartition).offset();// 重置偏移量到上一次提交的偏移量的下一个位置处开始消费consumer.seek(topicPartition, committedOffset 1);}}});try {while (true) {ConsumerRecords records consumer.poll(1000);for (ConsumerRecord record : records) {System.out.println(value record.value() , topic record.topic() , partition record.partition() , offset record.offset());currentOffsets.put(new TopicPartition(record.topic(), record.partition()),new OffsetAndMetadata(record.offset() 1, no metadata));}consumer.commitAsync(currentOffsets, null);}} catch (Exception e) {e.printStackTrace();} finally {try{consumer.commitSync(currentOffsets);} catch (Exception e) {e.printStackTrace();} finally {consumer.close();System.out.println(Closed consumer successfully!);}}}}当然你也可以选择再均衡后从头开始消费consumer.subscribe(Collections.singletonList(dev3-yangyunhe-topic001), new ConsumerRebalanceListener() {Overridepublic void onPartitionsRevoked(Collection partitions) {System.out.println(starting partitions rebalance...);}Overridepublic void onPartitionsAssigned(Collection partitions) {consumer.seekToBeginning(partitions);}});以上代码与 props.put(auto.offset.reset, earliest);是等效的。设置从最新消息开始消费consumer.subscribe(Collections.singletonList(dev3-yangyunhe-topic001), new ConsumerRebalanceListener() {Overridepublic void onPartitionsRevoked(Collection partitions) {System.out.println(starting partitions rebalance...);}Overridepublic void onPartitionsAssigned(Collection partitions) {consumer.seekToEnd(partitions);}});以上代码与props.put(auto.offset.reset, latest);等效。4. 涉及到数据库的 Exactly Once 语义的实现思路当处理 Kafka 中的数据涉及到数据库时那么即使每处理一条数据提交一次偏移量也可以造成数据重复处理或者丢失数据看以下为伪代码Map currentOffsets new HashMap();......while (true) {ConsumerRecords records consumer.poll(100);for (ConsumerRecord record : records) {currentOffsets.put(new TopicPartition(record.topic(), record.partition()),new OffsetAndMetadata(record.offset() 1);// 处理数据processRecord(record);// 把数据存储到数据库中storeRecordInDB(record);// 提交偏移量consumer.commitAsync(currentOffsets);}}假设把数据存储到数据库后没有来得及提交偏移量程序就因某种原因挂掉了那么程序再次启动后就会重复处理数据数据库中会有重复的数据。如果把存储到数据库和提交偏移量在一个原子操作里完成就可以避免这样的问题但数据存到数据库偏移量保存到kafka是无法实现原子操作的而如果把数据存储到数据库中偏移量也存储到数据库中这样就可以利用数据库的事务来把这两个操作设为一个原子操作同时结合再均衡监听器就可以实现 Exactly Once 语义以下为伪代码consumer.subscribe(Collections topics, new ConsumerRebalanceListener() {Overridepublic void onPartitionsRevoked(Collection partitions) {// 发生分区再均衡之前提交事务commitDBTransaction();}Overridepublic void onPartitionsAssigned(Collection partitions) {// 再均衡之后从数据库获得消费偏移量for(TopicPartition topicPartition : partitions) {consumer.seek(topicPartition, getOffsetFromDB(topicPartition));}}});/*** 消费之前调用一次 poll()让消费者加入到消费组中并获取分配的分区* 然后马上调用 seek() 方法定位分区的偏移量* seek() 设置消费偏移量设置的偏移量是从数据库读出来的说明本次设置的偏移量已经被处理过* 下一次调用 poll() 就会在本次设置的偏移量上加1开始处理没有处理过的数据* 如果seek()发生错误比如偏移量不存在则会抛出异常*/consumer.poll(0);for(TopicPartition topicPartition : consumer.assignment()) {consumer.seek(topicPartition, getOffsetFromDB(topicPartition));}while (true) {ConsumerRecords records consumer.poll(1000);for (ConsumerRecord record : records) {// 处理数据processRecord(record);// 把数据存储到数据库中storeRecordInDB(record);// 把偏移量存储到数据库中storeOffsetInDB(record.topic(), record.partition(), record.offset());}// 以上3步为一个事务提交事务这里在每个批次末尾提交一次事务是为了提高性能commitDBTransaction();}把偏移量和记录保存到用一个外部系统来实现 Exactly Once 有很多方法但核心思想都是结合 ConsumerRebalanceListener 和 seek() 方法来确保能够及时保存偏移量并保证消费者总是能够从正确的位置开始读取消息。
http://www.zqtcl.cn/news/75278/

相关文章:

  • 网站平台搭建织梦企业门户网站
  • 莆田免费建站模板dz网站建设
  • vps可以做多少网站seo公司 杭州
  • 找人做网站网页浏览器cookie
  • 广西建设工程质量监督网站凡科网站建设步骤
  • 福建网站模板分红网站建设
  • 什么是网站标题做全网营销型网站建设
  • 查公司注册信息怎么查昆明网站seo报价
  • 四川建设网官河北seo推广系统
  • 网站建设电话销售话术实例建企业网站教程
  • 建设直播网站需要哪些许可证wordpress权限设置
  • 网站备案接口东莞网站优化排名诊断
  • 高端网络建站上海人力资源招聘官网
  • 文字堆积网站类似wordpress的网站
  • 平面设计主要做什么内容丹东seo排名公司
  • 西安西工大软件园做网站的公司湖南做网站 在线磐石网络
  • 如何用python做网站渭南网站建设远景
  • 网站 建设开发合作协议众包网站开发
  • 效果营销型网站建设陕西百度推广的代理商
  • 西安网站建设最新案例wordpress 动态插件
  • 网站开发培训哪家好wordpress破图
  • 高中网站制作门户网站开发需要多少钱
  • 池州市建设工程质量安全监督局网站备案网站多长时间
  • 四川省住房和城乡建设厅网站域名哪家做企业网站
  • 医学招聘网站开发区沛县网站建设
  • 网络系统设计wordpress 链接优化插件
  • 网站模板下载破解版网件路由器重置
  • 怎么做微信电影网站网站建设 架构
  • 武进网站建设多少钱wordpress更改链接后网站打不开
  • 网站建设费用计入哪个会计科目沈阳快速建站公司有哪些