自学网站建设看什么书,专业做网站建设制作服务,长春做官网的公司,网站换公司吗【Kafka-3.x-教程】专栏#xff1a;
【Kafka-3.x-教程】-【一】Kafka 概述、Kafka 快速入门 【Kafka-3.x-教程】-【二】Kafka-生产者-Producer 【Kafka-3.x-教程】-【三】Kafka-Broker、Kafka-Kraft 【Kafka-3.x-教程】-【四】Kafka-消费者-Consumer 【Kafka-3.x-教程】-【五…【Kafka-3.x-教程】专栏
【Kafka-3.x-教程】-【一】Kafka 概述、Kafka 快速入门 【Kafka-3.x-教程】-【二】Kafka-生产者-Producer 【Kafka-3.x-教程】-【三】Kafka-Broker、Kafka-Kraft 【Kafka-3.x-教程】-【四】Kafka-消费者-Consumer 【Kafka-3.x-教程】-【五】Kafka-监控-Eagle 【Kafka-3.x-教程】-【六】Kafka 外部系统集成 【Flume、Flink、SpringBoot、Spark】 【Kafka-3.x-教程】-【七】Kafka 生产调优、Kafka 压力测试 【Kafka-3.x-教程】-【四】Kafka-消费者-Consumer 1Kafka 消费方式2Kafka 消费者工作流程2.1.消费者总体工作流程2.2.消费者组原理2.3.消费者组初始化流程2.4.消费者组详细消费流程2.5.消费者重要参数 3消费者 API3.1.独立消费者案例订阅主题3.2.独立消费者案例订阅分区3.3.消费者组案例 4分区的分配以及再平衡4.1.Range 以及再平衡4.2.RoundRobin 以及再平衡4.3.Sticky 以及再平衡 5offset 位移5.1.offset 的默认维护位置5.2.自动提交 offset5.3.手动提交 offset5.4.指定 Offset 消费5.5.指定时间消费5.6.漏消费和重复消费 6消费者事务7数据积压 1Kafka 消费方式 2Kafka 消费者工作流程
2.1.消费者总体工作流程 1、一个消费者可以消费多个分区的数据。消费者之间是完全独立的。
2、每个分区的数据只能由消费者组中的一个消费者进行消费。
3、Kafka 的 offset偏移量负责记录消费者或消费者组消费数据的位置。
4、新版本的 Kafka 由 __consumer_offsets 这个主题来记录 offset。老版本是存储在 ZK 的 consumer 目录下会产生消费者和 ZK 的大量频繁的交互网络压力较大。
5、Kafka 的 offset 是持久化到硬盘中的所以数据安全性得以保障。
2.2.消费者组原理 1、一个消费者组中包含多个消费者形成一个消费者组的条件是消费者对应的 group.id 一致。
1消费者组内每个消费者负责消费不同分区的数据一个分区只能由一个组内消费者进行消费。
2消费者组之间互不影响可以把消费者组理解成为一个包装后的消费者。
2、如果消费者组中的消费者数量大于分区数量那么消费者组中必定会出现闲置的消费者。
2.3.消费者组初始化流程 消费者是如何形成消费者组的呢
1、coordinator辅助实现消费者组的初始化和分区的分配。
1每个 Broker 中都会存在一个 coordinator。
2coordinator 的选择取决于 group.id 的 hashcode % 50__consumer_offsets 这个主题的分区数假如取得的值为 1那么就找到 1 号分区所在的节点进行连接。
2、每个 Consumer 都发送 join Group 的请求。
3、从众多 Consumer 中随机选择一个 Consumer - Leader。
4、coordinator 把要消费的 topic 详情发送给 Consumer - Leader。
5、Consumer - Leader 制定消费方案。
6、Consumer - Leader 将制定计划发送给 coordinator。
7、coordinator 将对应计划发布到各个 Consumer。
注意每个消费者都会定时和 coordinator 保持心跳默认 3 秒一旦超时session.timeout.ms45scoordinator 会认为消费者挂了该消费者会被移除并触发再平衡消费者处理的时间过长max.poll.interval.ms 5分钟也会将这个消费者下线并触发再平衡。
2.4.消费者组详细消费流程 1、消费者组创建网络连接客户端ConsumerNetworkClient用于和集群进行交互。
2、调用 sendFetches发送消费数据请求
Fetch.min.bytes每批次最小抓取大小默认1字节。fetch.max.wait.ms一批数据最小值未达到的超时时间默认500ms。Fetch.max.bytes每批次最大抓取大小默认50m。
3、ConsumerNetworkClient 发送 send 请求Kafka 集群通过回调方法onSuccess把对应的结果拉取过来。
4、拉取到的数据会放到 completedFetches 的消息队列中queue。
5、消费者从队列中拉取数据默认一次 500 条Max.poll.records。
1反序列化操作.
2拦截器操作用于处理数据。
3处理数据。
2.5.消费者重要参数 3消费者 API
3.1.独立消费者案例订阅主题
1、需求创建一个独立消费者消费 first 主题中数据。 注意在消费者 API 代码中必须配置消费者组 id。命令行启动消费者不填写消费者组id 会被自动填写随机的消费者组 id。
2、编写代码
public class CustomConsumer {public static void main(String[] args) {// 0 配置Properties properties new Properties();// 连接 bootstrap.serversproperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,hadoop102:9092,hadoop103:9092);// 反序列化properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());// 配置消费者组idproperties.put(ConsumerConfig.GROUP_ID_CONFIG,test5);// 1 创建一个消费者 , helloKafkaConsumerString, String kafkaConsumer new KafkaConsumer(properties);// 2 订阅主题 firstArrayListString topics new ArrayList();topics.add(first);kafkaConsumer.subscribe(topics);// 3 消费数据while (true){ConsumerRecordsString, String consumerRecords kafkaConsumer.poll(Duration.ofSeconds(1));for (ConsumerRecordString, String consumerRecord : consumerRecords) {System.out.println(consumerRecord);}kafkaConsumer.commitAsync();}}
}3、测试
1在 IDEA 中执行消费者程序。
2在 Kafka 集群控制台创建 Kafka 生产者并输入数据。
bin/kafka-console-producer.sh --bootstrap-server hadoop102:9092 --topic firsthello3在 IDEA 控制台观察接收到的数据。
ConsumerRecord(topic first, partition 1, leaderEpoch 3, offset 0, CreateTime 1629160841112, serialized key size -1, serialized value size 5, headers RecordHeaders(headers [], isReadOnly
false), key null, value hello)3.2.独立消费者案例订阅分区
1、需求创建一个独立消费者消费 first 主题 0 号分区的数据。 2、代码编写
public class CustomConsumerPartition {public static void main(String[] args) {// 0 配置Properties properties new Properties();// 连接properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,hadoop102:9092,hadoop103:9092);// 反序列化properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());// 组idproperties.put(ConsumerConfig.GROUP_ID_CONFIG,test);// 1 创建一个消费者KafkaConsumerString, String kafkaConsumer new KafkaConsumer(properties);// 2 订阅主题对应的分区ArrayListTopicPartition topicPartitions new ArrayList();topicPartitions.add(new TopicPartition(first,0));kafkaConsumer.assign(topicPartitions);// 3 消费数据while (true){ConsumerRecordsString, String consumerRecords kafkaConsumer.poll(Duration.ofSeconds(1));for (ConsumerRecordString, String consumerRecord : consumerRecords) {System.out.println(consumerRecord);}}}
}3、测试 1在 IDEA 中执行消费者程序。
2在 IDEA 中执行生产者程序 CustomProducerCallback()在控制台观察生成几个 0 号分区的数据。
first 0 381
first 0 382
first 2 168
first 1 165
first 1 1663在 IDEA 控制台观察接收到的数据只能消费到 0 号分区数据表示正确。
ConsumerRecord(topic first, partition 0, leaderEpoch 14,
offset 381, CreateTime 1636791331386, serialized key size -
1, serialized value size 9, headers RecordHeaders(headers
[], isReadOnly false), key null, value atguigu 0)
ConsumerRecord(topic first, partition 0, leaderEpoch 14,
offset 382, CreateTime 1636791331397, serialized key size -
1, serialized value size 9, headers RecordHeaders(headers
[], isReadOnly false), key null, value atguigu 1)3.3.消费者组案例
1、需求测试同一个主题的分区数据只能由一个消费者组中的一个消费。 2、代码实现
复制代码为三份同时启动模拟三个消费者都在一个消费者组中。
public class CustomConsumer1 {public static void main(String[] args) {// 0 配置Properties properties new Properties();// 连接 bootstrap.serversproperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,hadoop102:9092,hadoop103:9092);// 反序列化properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());// 配置消费者组idproperties.put(ConsumerConfig.GROUP_ID_CONFIG,test5);// 设置分区分配策略properties.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,org.apache.kafka.clients.consumer.StickyAssignor);// 1 创建一个消费者 , helloKafkaConsumerString, String kafkaConsumer new KafkaConsumer(properties);// 2 订阅主题 firstArrayListString topics new ArrayList();topics.add(first);kafkaConsumer.subscribe(topics);// 3 消费数据while (true){ConsumerRecordsString, String consumerRecords kafkaConsumer.poll(Duration.ofSeconds(1));for (ConsumerRecordString, String consumerRecord : consumerRecords) {System.out.println(consumerRecord);}}}
}3、测试
1启动代码中的生产者发送消息在 IDEA 控制台即可看到两个消费者在消费不同分区的数据如果只发生到一个分区可以在发送时增加延迟代码 Thread.sleep(2)。
ConsumerRecord(topic first, partition 0, leaderEpoch 2,
offset 3, CreateTime 1629169606820, serialized key size -1,
serialized value size 8, headers RecordHeaders(headers [],
isReadOnly false), key null, value hello1)
ConsumerRecord(topic first, partition 1, leaderEpoch 3,
offset 2, CreateTime 1629169609524, serialized key size -1,
serialized value size 6, headers RecordHeaders(headers [],
isReadOnly false), key null, value hello2)
ConsumerRecord(topic first, partition 2, leaderEpoch 3,
offset 21, CreateTime 1629169611884, serialized key size -1,
serialized value size 6, headers RecordHeaders(headers [],
isReadOnly false), key null, value hello3)2观察三份代码的控制台每份代码中只会消费一个分区中的数据。
3重新发送到一个全新的主题中由于默认创建的主题分区数为 1可以看到只能有一个消费者消费到数据。 4分区的分配以及再平衡 1、一个 consumer group 中有多个 consumer 组成一个 topic 有多个 partition 组成现在的问题是到底由哪个 consumer 来消费哪个 partition 的数据。
2、Kafka有四种主流的分区分配策略 Range、RoundRobin、Sticky、CooperativeSticky。
可以通过配置参数partition.assignment.strategy修改分区的分配策略。默认策略是 Range CooperativeSticky。Kafka 可以同时使用多个分区分配策略。
4.1.Range 以及再平衡
特点针对一个分区做排序后计算。 Range 分区分配再平衡案例
1、停止掉 0 号消费者快速重新发送消息观看结果45s 以内越快越好。 1 号消费者消费到 3、4 号分区数据。 2 号消费者消费到 5、6 号分区数据。 0 号消费者的任务会整体被分配到 1 号消费者或者 2 号消费者。
说明0 号消费者挂掉后消费者组需要按照超时时间 45s 来判断它是否退出所以需要等待时间到了 45s 后判断它真的退出就会把任务分配给其他 broker 执行。
2、再次重新发送消息观看结果45s 以后。 1 号消费者消费到 0、1、2、3 号分区数据。 2 号消费者消费到 4、5、6 号分区数据。
说明消费者 0 已经被踢出消费者组所以重新按照 range 方式分配。
4.2.RoundRobin 以及再平衡
特点针对所有分区做排序后轮询。 RoundRobin 分区分配再平衡案例
1、停止掉 0 号消费者快速重新发送消息观看结果45s 以内越快越好。 1 号消费者消费到 2、5 号分区数据 2 号消费者消费到 4、1 号分区数据 0 号消费者的任务会按照 RoundRobin 的方式把数据轮询分成 0 、6 和 3 号分区数据分别由 1 号消费者或者 2 号消费者消费。
说明0 号消费者挂掉后消费者组需要按照超时时间 45s 来判断它是否退出所以需要等待时间到了 45s 后判断它真的退出就会把任务分配给其他 broker 执行。
2、再次重新发送消息观看结果45s 以后。 1 号消费者消费到 0、2、4、6 号分区数据 2 号消费者消费到 1、3、5 号分区数据
说明消费者 0 已经被踢出消费者组所以重新按照 RoundRobin 方式分配。
4.3.Sticky 以及再平衡
特点尽量均匀随机的分配。
粘性分区定义可以理解为分配的结果带有“粘性的”。即在执行一次新的分配之前考虑上一次分配的结果尽量少的调整分配的变动可以节省大量的开销。
粘性分区是 Kafka 从 0.11.x 版本开始引入这种分配策略首先会尽量均衡的放置分区到消费者上面在出现同一消费者组内消费者出现问题的时候会尽量保持原有分配的分区不变化。
Sticky 分区分配再平衡案例
1、停止掉 0 号消费者快速重新发送消息观看结果45s 以内越快越好。 1 号消费者消费到 2、5、3 号分区数据。 2 号消费者消费到 4、6 号分区数据。 0 号消费者的任务会按照粘性规则尽可能均衡的随机分成 0 和 1 号分区数据分别由 1 号消费者或者 2 号消费者消费。
说明0 号消费者挂掉后消费者组需要按照超时时间 45s 来判断它是否退出所以需要等待时间到了 45s 后判断它真的退出就会把任务分配给其他 broker 执行。
2、再次重新发送消息观看结果45s 以后。 1 号消费者消费到 2、3、5 号分区数据。 2 号消费者消费到 0、1、4、6 号分区数据。
说明消费者 0 已经被踢出消费者组所以重新按照粘性方式分配。
5offset 位移
5.1.offset 的默认维护位置 1、__consumer_offsets 主题里面采用 key 和 value 的方式存储数据。key 是 group.idtopic分区号value 就是当前 offset 的值。每隔一段时间kafka 内部会对这个 topic 进行 compact也就是每个 group.idtopic分区号就保留最新数据。
2、消费 offset 案例
思想__consumer_offsets 为 Kafka 中的 topic那就可以通过消费者进行消费。
1在配置文件 config/consumer.properties 中添加配置 exclude.internal.topicsfalse默认是 true表示不能消费系统主题。为了查看该系统主题数据所以该参数修改为 false。
2采用命令行方式创建一个新的 topic。
bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --create --topic atguigu --partitions 2 --replication-factor 23启动生产者往 atguigu 生产数据。
bin/kafka-console-producer.sh --topic atguigu --bootstrap-server hadoop102:90924启动消费者消费 atguigu 数据。
bin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic atguigu --group test注意指定消费者组名称更好观察数据存储位置key 是 group.idtopic分区号。
5查看消费者消费主题__consumer_offsets。
bin/kafka-console-consumer.sh --topic __consumer_offsets --bootstrap-server hadoop102:9092 --consumer.config config/consumer.properties --formatter kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter --from-beginning[offset,atguigu,1]::OffsetAndMetadata(offset7,
leaderEpochOptional[0], metadata, commitTimestamp1622442520203,
expireTimestampNone)
[offset,atguigu,0]::OffsetAndMetadata(offset8,
leaderEpochOptional[0], metadata, commitTimestamp1622442520203,
expireTimestampNone)5.2.自动提交 offset 1、原理
1消费者主动去拉取数据。
2到达 5s 时 Consumer 自动提交 offset。
2、代码示例
public class CustomConsumerAutoOffset {public static void main(String[] args) {// 0 配置Properties properties new Properties();// 连接 bootstrap.serversproperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,hadoop102:9092,hadoop103:9092);// 反序列化properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());// 配置消费者组idproperties.put(ConsumerConfig.GROUP_ID_CONFIG,test);// 自动提交properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,true);// 提交时间间隔properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,1000);// 1 创建一个消费者 , helloKafkaConsumerString, String kafkaConsumer new KafkaConsumer(properties);// 2 订阅主题 firstArrayListString topics new ArrayList();topics.add(first);kafkaConsumer.subscribe(topics);// 3 消费数据while (true){ConsumerRecordsString, String consumerRecords kafkaConsumer.poll(Duration.ofSeconds(1));for (ConsumerRecordString, String consumerRecord : consumerRecords) {System.out.println(consumerRecord);}}}
}5.3.手动提交 offset 1、原理 commitSync同步提交必须等待offset提交完毕再去消费下一批数据。 commitAsync异步提交 发送完提交offset请求后就开始消费下一批数据了。
2、代码示例
1由于同步提交 offset 有失败重试机制故更加可靠但是由于一直等待提交结果提交的效率比较低。以下为同步提交 offset 的示例。
public class CustomConsumerByHandSync {public static void main(String[] args) {// 0 配置Properties properties new Properties();// 连接 bootstrap.serversproperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,hadoop102:9092,hadoop103:9092);// 反序列化properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());// 配置消费者组idproperties.put(ConsumerConfig.GROUP_ID_CONFIG,test);// 手动提交properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false);// 1 创建一个消费者 , helloKafkaConsumerString, String kafkaConsumer new KafkaConsumer(properties);// 2 订阅主题 firstArrayListString topics new ArrayList();topics.add(first);kafkaConsumer.subscribe(topics);// 3 消费数据while (true){ConsumerRecordsString, String consumerRecords kafkaConsumer.poll(Duration.ofSeconds(1));for (ConsumerRecordString, String consumerRecord : consumerRecords) {System.out.println(consumerRecord);}// 手动提交offsetkafkaConsumer.commitSync();}}
}
2虽然同步提交 offset 更可靠一些但是由于其会阻塞当前线程直到提交成功。因此吞吐量会受到很大的影响。因此更多的情况下会选用异步提交 offset 的方式。以下为异步提交 offset 的示例
public class CustomConsumerByHandSync {public static void main(String[] args) {// 0 配置Properties properties new Properties();// 连接 bootstrap.serversproperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,hadoop102:9092,hadoop103:9092);// 反序列化properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());// 配置消费者组idproperties.put(ConsumerConfig.GROUP_ID_CONFIG,test);// 手动提交properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false);// 1 创建一个消费者 , helloKafkaConsumerString, String kafkaConsumer new KafkaConsumer(properties);// 2 订阅主题 firstArrayListString topics new ArrayList();topics.add(first);kafkaConsumer.subscribe(topics);// 3 消费数据while (true){ConsumerRecordsString, String consumerRecords kafkaConsumer.poll(Duration.ofSeconds(1));for (ConsumerRecordString, String consumerRecord : consumerRecords) {System.out.println(consumerRecord);}// 手动提交offsetkafkaConsumer.commitAsync();}}
}
5.4.指定 Offset 消费 auto.offset.reset earliest | latest | none 默认是 latest。
当 Kafka 中没有初始偏移量消费者组第一次消费或服务器上不再存在当前偏移量时例如该数据已被删除该怎么办
1、earliest自动将偏移量重置为最早的偏移量–from-beginning。
2、latest默认值自动将偏移量重置为最新偏移量。
3、none如果未找到消费者组的先前偏移量则向消费者抛出异常。
代码示例
public class CustomConsumerSeek {public static void main(String[] args) {// 0 配置信息Properties properties new Properties();// 连接properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,hadoop102:9092,hadoop103:9092);// 反序列化properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());// 组idproperties.put(ConsumerConfig.GROUP_ID_CONFIG,test3);// 1 创建消费者KafkaConsumerString, String kafkaConsumer new KafkaConsumer(properties);// 2 订阅主题ArrayListString topics new ArrayList();topics.add(first);kafkaConsumer.subscribe(topics);// 指定位置进行消费SetTopicPartition assignment kafkaConsumer.assignment();// 保证分区分配方案已经制定完毕while (assignment.size() 0){kafkaConsumer.poll(Duration.ofSeconds(1));assignment kafkaConsumer.assignment();}// 指定消费的offsetfor (TopicPartition topicPartition : assignment) {kafkaConsumer.seek(topicPartition,600);}// 3 消费数据while (true){ConsumerRecordsString, String consumerRecords kafkaConsumer.poll(Duration.ofSeconds(1));for (ConsumerRecordString, String consumerRecord : consumerRecords) {System.out.println(consumerRecord);}}}
}5.5.指定时间消费
需求在生产环境中会遇到最近消费的几个小时数据异常想重新按照时间消费。
例如要求按照时间消费前一天的数据怎么处理
public class CustomConsumerSeekTime {public static void main(String[] args) {// 0 配置信息Properties properties new Properties();// 连接properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,hadoop102:9092,hadoop103:9092);// 反序列化properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());// 组idproperties.put(ConsumerConfig.GROUP_ID_CONFIG,test3);// 1 创建消费者KafkaConsumerString, String kafkaConsumer new KafkaConsumer(properties);// 2 订阅主题ArrayListString topics new ArrayList();topics.add(first);kafkaConsumer.subscribe(topics);// 指定位置进行消费SetTopicPartition assignment kafkaConsumer.assignment();// 保证分区分配方案已经制定完毕while (assignment.size() 0){kafkaConsumer.poll(Duration.ofSeconds(1));assignment kafkaConsumer.assignment();}// 希望把时间转换为对应的offsetHashMapTopicPartition, Long topicPartitionLongHashMap new HashMap();// 封装对应集合for (TopicPartition topicPartition : assignment) {topicPartitionLongHashMap.put(topicPartition,System.currentTimeMillis() - 1 * 24 * 3600 * 1000);}MapTopicPartition, OffsetAndTimestamp topicPartitionOffsetAndTimestampMap kafkaConsumer.offsetsForTimes(topicPartitionLongHashMap);// 指定消费的offsetfor (TopicPartition topicPartition : assignment) {OffsetAndTimestamp offsetAndTimestamp topicPartitionOffsetAndTimestampMap.get(topicPartition);kafkaConsumer.seek(topicPartition,offsetAndTimestamp.offset());}// 3 消费数据while (true){ConsumerRecordsString, String consumerRecords kafkaConsumer.poll(Duration.ofSeconds(1));for (ConsumerRecordString, String consumerRecord : consumerRecords) {System.out.println(consumerRecord);}}}
}5.6.漏消费和重复消费
重复消费已经消费了数据但是 offset 没提交。
漏消费先提交 offset 后消费有可能会造成数据的漏消费。 思考怎么能做到既不漏消费也不重复消费呢详看消费者事务。
6消费者事务
注意下游必须支持事务才能做到精准一次消费。 7数据积压
1、增大分区与消费者核数消费者数 分区数
2、增大每次拉取的吞吐量。