宁波网站制作联系方式,注册网站不需要手机验证的,建设工程合同管理考试试题及答案,网站规划的公司【README】
1#xff0c; 本文总结了 kafka消费者开发方式#xff1b;2#xff0c; 本文使用的是最新的kafka版本 3.0.0#xff1b;【1】 kafka消费则
【1.1】消费者与消费者组
1#xff09;消费者#xff1a; 应用程序需要创建消费者对象#xff0c;订阅主题并开始接…【README】
1 本文总结了 kafka消费者开发方式2 本文使用的是最新的kafka版本 3.0.0【1】 kafka消费则
【1.1】消费者与消费者组
1消费者 应用程序需要创建消费者对象订阅主题并开始接收消息
2消费者群组
kafka消费者从属于消费者群组
【1.2】消费者接收分区消息模型
11个消费者接收4个分区 模型1
注kafka分区编号从0开始所以本文也从0开始 22个消费者接收4个分区模型2 34个消费者接收4个分区模型3 45个消费者接收4个分区模型4 52个消费者组接收4个分区模型5 【小结】消费者与消费者组接收分区模型
一个群组里的消费者订阅的是同一个主题每个消费者接收一部分分区消息如模型1,2,3不同群组各自接收同一个主题的全部消息相互不影响不竞争如模型5
往群组里添加消费者是横向伸缩消费能力的主要方式
不要让 消费者数量超过分区数量多余消费者会被重置如模型4
只要保证每个应用程序的消费者组id 是唯一的就可以让它们各自都获取全部数据注意是全部而不是部分 因为消费组间不竞争消息但组内竞争如模型5 【1.2】消费者群组和分区再均衡
1分区再均衡 分区所有权从一个消费者转到另一个消费者
分区再均衡期间 消费者无法读取消息整个群组一小段时间不可用
2消费者维持从属关系消费者向被指派为 群组协调器的broker 不同群组有不同的协调器发送心跳来维持 群组从属关系及它们对分区所有权
消费者会在轮询消息或提交偏移量时发送心跳如果停止发送心跳的时间足够长会话就会过期群组协调器认为它已经死亡 就会触发一次再均衡补充 kafka 0.10.1 版本中引入了 独立的心跳线程即可以在轮询消息的空档发送心跳属性配置 max.poll.interval.ms 设置轮询间隔时长session.timeout.ms 会话超时时间
3分区分配过程
当消费者要加入群组时向群组协调器发送一个 JoinGroup请求 step1第一个加入群组的消费者称为群主。群主从协调器哪里获取群组成员列表列表包含所有最近发送心跳的消费者是活跃的并负责给每一个消费者分配分区。 群主使用了一个实现 PartitionAssignor 接口默认为Range但可以通过partition.assignment.strategy 来配置来决定分区所有权应该分给哪一个消费者 step2分配完成后群主把分配情况发给协调器协调器再把这些信息发给所有消费者。每个消费者只能看到自己的分区而群主可以看到所有 这个过程在分区再均衡时重复发生【2】创建kafka消费者
1创建kafka消费者必须配置的3个属性
bootstrap.server kafka集群broker 列表key.deserializer键的反序列化器的全限定类名value.deserializer值的反序列化器的全限定类名若仅设置3个属性kafka-client3.0会报错说是必须配置 group.id
2创建消费者代码示例
// 创建消费者配置信息
Properties props new Properties();
// 属性配置
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 192.168.163.201:9092,192.168.163.202:9092,192.168.163.203:9092);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.GROUP_ID_CONFIG, G1); 3订阅主题
/* 订阅主题 */
consumer.subscribe(Arrays.asList(hello10)); 【3】轮询
1一旦消费者订阅了主题轮询会处理所有细节 包括群组协调分区再均衡 发送心跳和获取数据
2轮询代码死循环
/* 循环拉取 */
try {int i 0;while(!Thread.interrupted()) {// 模拟延时try {System.out.println(DateUtils.getNowTimestamp() 等待消费消息);TimeUnit.MILLISECONDS.sleep(100);} catch (InterruptedException e) {e.printStackTrace();}// 消费消息ConsumerRecordsString, String consumerRds consumer.poll(100);for(ConsumerRecordString, String rd : consumerRds) {System.out.println(消费者Simple-分区【 rd.partition() 】offset【 rd.offset() 】键 rd.key() 值 rd.value());}// 手动同步提交consumer.commitSync();}
} finally {// 记得关闭消费者consumer.close();
}
整体代码如下
/*** 简单消费者仅4个属性*/
public class MyConsumerSimple {public static void main(String[] args) {// 创建消费者配置信息Properties props new Properties();// 属性配置props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 192.168.163.201:9092,192.168.163.202:9092,192.168.163.203:9092);props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());props.put(ConsumerConfig.GROUP_ID_CONFIG, G1);// 关闭自动提交props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);// 创建消费者KafkaConsumerString, String consumer new KafkaConsumer(props);/* 订阅主题 */consumer.subscribe(Arrays.asList(hello10));/* 循环拉取 */try {int i 0;while(!Thread.interrupted()) {// 模拟延时try {System.out.println(DateUtils.getNowTimestamp() 等待消费消息);TimeUnit.MILLISECONDS.sleep(100);} catch (InterruptedException e) {e.printStackTrace();}// 消费消息ConsumerRecordsString, String consumerRds consumer.poll(100);for(ConsumerRecordString, String rd : consumerRds) {System.out.println(消费者Simple-分区【 rd.partition() 】offset【 rd.offset() 】键 rd.key() 值 rd.value());}// 手动同步提交consumer.commitSync();}} finally {// 记得关闭消费者consumer.close();}}
} 【4】消费者配置
1fetch.min.bytes 消费者从服务器获取记录的最小字节数
2fetch.max.wait.ms设置当没有足够数据fetch.min.bytes设置时需要等待的最大时间
默认500ms 属性1fetch.min.bytes和属性2 任意一个属性达到kafka返回消息
3max.partition.fetch.bytes指定服务器从每个分区返回给消费者的最大字节数 默认值1M
即 kafkaConsumer.poll() 从每个分区返回的消息大小不超过该设定值
注意1该值必须要比broker能够接收的最大消息的字节数max.message.size 配置要大否则消费者无法读取这些消息一直挂起注意2该值的设置并不是越大越好因为需要考虑消费者处理数据的时间因为消费者需要频繁调用poll方法来避免会话过期和分区再均衡问题如果单次返回数据太多可能无法及时轮询发送心跳
4session.timeous.ms 指定了消费者被认为死亡之前与服务器断开连接的时间
1默认 3s 消费者被认定为死亡协调器触发分区再均衡把分区分配给其他消费者2该属性与 heartbeat.interval.ms 紧密相关 heartbeat.interval.ms 指定poll()方法向协调器发送心跳频率session.timeout.ms 指定了消费者可以多久不发送心跳3一般需要同时修改这两个属性 显然 heartbeat.interval.ms 必须比 session.timeout.ms 小一般 前者是后者的 三分之一
5auto.offset.reset 指定消费者读取一个没有偏移量的分区或偏移量无效的情况下该如何读取
消费者第一次读取分区分区没有保留它的偏移量信息这叫没有偏移量消费者断开然后上线分区删除了它的偏移量信息所以无效取值默认值latest 读取最新消息earliest 从起始位置读取分区
6enable.auto.commit 是否自动提交偏移量默认值true
6.1自动提交配置 auto.commit.interval.ms 控制自动提交频率6.2手动提交 同步提交consumer.commitSync() 异步提交comsumer.commitAsync()
7partition.assignment.strategy 分区分配策略分区应该分配给同一个消费者组的哪些消费者PartitionAssignor 根据消费者和主题决定哪些分区应该分配给消费者有2个策略
7.1Range默认值 该策略把主题的若干个连续分区分配给消费者 只要使用了range且分区数量无法被消费者数量整除就会出现消费者分配的分区数不均等的情况不整除无法均分7.2RoundRobin 该策略把主题所有分区逐个分配给消费者注意是主题所有分区 一般来说 RoundRobin 会给消费者分配相同数量的分区均分
// 消费者设置分区策略 (默认值-RangeAssignor)
props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, RangeAssignor.class.getName());
props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, RoundRobinAssignor.class.getName());
补充消费者接收分区的分配策略
Range分配在分区数不整除消费者数时是不会均分的 RoundRobin分配会尽量均分即使不整除 8client.id 可以是任意字符串用来标识消息来源
通常用在日志度量指标和配额里
9max.poll.records 指定单次调用 poll 能够返回的记录数量
10receive.buffer.bytes 和 send.buffer.bytes
设置socket 在读写数据时用到的tcp 缓冲区大小若设置为-1使用操作系统默认值【5】提交和偏移量
1提交 更新分区当前位置的操作
2消费者如何提交偏移量
消费者往 _consumer_offset 这个特殊主题发送消息 消息里面包含每个分区的偏移量
如果消费者一致处于运行状态那偏移量就没有作用但若有消费者上线和下线就会触发再均衡完成再均衡后消费者为了继续之前的工作需要读取每个分区最后一次提交的偏移量然后从偏移量指定位置继续处理
33种提交偏移量方式
自动提交手动同步提交手动异步提交
【5.1】自动提交
1最简单的提交方式是自动提交
enable.auto.commit 设置为true 则每过5s消费者会自动把从 poll() 方法接收到的最大偏移量提交上去提交间隔 通过 auto.commit.interval.ms 设置默认5s自动提交在轮询里进行 在每次轮询时会检查是否需要提交偏移量若是则提交从上一次轮询返回的偏移量
2自动提交的问题 可能造成数据重复消费这需要消费者支持幂等性来解决
若提交间隔时间为5s 在最近一次提交后的3s发生了分区再均衡如分区1的消费者从A换成了BB 从最后一次A提交的偏移量开始读取消息。所以 消费者A和B会重复处理相同数据因为消费者A处理了消息但没有提交偏移量这就会造成A在最后3s处理的消息会被B重复处理 【5.2】提交当前偏移量 手动同步提交
提交处理的这批消息的最后一个偏移量
1如何实现手动同步提交
把 enable.auto.commit 设置为false让消费者调用 commitSync( 手动提交这个方法会提交由 poll() 方法返回的最新偏移量提交成功马上返回提交失败抛出异常
2如果发生分区再均衡从最近一次提交到发生再均衡之间的所有消息都将被重复处理
3代码例子
/*** Description 同步提交消费者* author xiao tang* version 1.0.0* createTime 2021年12月11日*/
public class MyConsumerSyncCommit {public static void main(String[] args) {// 创建消费者配置信息Properties props new Properties();// 属性配置props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 192.168.163.201:9092,192.168.163.202:9092,192.168.163.203:9092);props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());props.put(ConsumerConfig.GROUP_ID_CONFIG, G1);// 关闭自动提交props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);// 设置消费消息的位置消费最新消息props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, latest);// 设置分区策略 (默认值-RangeAssignor)props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, RangeAssignor.class.getName());props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, RoundRobinAssignor.class.getName());// 创建消费者KafkaConsumerString, String consumer new KafkaConsumer(props);// 订阅主题consumer.subscribe(Arrays.asList(hello11));// 循环拉取try {while(!Thread.interrupted()) {// 模拟延时try {System.out.println(DateUtils.getNowTimestamp() 消费者-SyncCommit-等待消费消息);TimeUnit.MILLISECONDS.sleep(100);} catch (InterruptedException e) {e.printStackTrace();}// 消费消息ConsumerRecordsString, String consumerRds consumer.poll(100);for(ConsumerRecordString, String rd : consumerRds) {System.out.println(消费者-SyncCommit-分区【 rd.partition() 】offset【 rd.offset() 】 值 rd.value());}// 手动同步提交consumer.commitSync();}} finally {// 记得关闭消费者consumer.close();}}
}
补充 手动同步提交的问题 在broker对提交请求做出回应之前 应用程序会一直阻塞这样会限制应用程序的吞吐量 【5.3】手动异步提交
提交处理的这批消息的最后一个偏移量
1为解决同步提交问题引入了异步提交
// 手动异步提交
consumer.commitAsync();
异步提交的问题 在成功提交或碰到无法恢复的错误前commitSync() 会一直重试但 commitAsync() 不会这就是问题所在
2异步提交的回调若有异常可以处理
一般被用于记录提交错误或生成度量指标不过如果要重试提交一定要注意顺序
// 循环拉取
try {while(!Thread.interrupted()) {// 模拟延时try {System.out.println(DateUtils.getNowTimestamp() 消费者-SyncCommit-等待消费消息);TimeUnit.MILLISECONDS.sleep(100);} catch (InterruptedException e) {e.printStackTrace();}// 消费消息ConsumerRecordsString, String consumerRds consumer.poll(100);for(ConsumerRecordString, String rd : consumerRds) {System.out.println(消费者-SyncCommit-分区【 rd.partition() 】offset【 rd.offset() 】 值 rd.value());}// 手动异步提交带异步提交完成后的回调consumer.commitAsync(new OffsetCommitCallback() {Overridepublic void onComplete(MapTopicPartition, OffsetAndMetadata offsets, Exception exception) {if (exception !null) {System.out.println(异步提交异常);// 把错误信息写入db 或日志}}});}
} finally {// 记得关闭消费者consumer.close();
}
但是异步回调也是有问题的 如消费者1发送一个请求1提交偏移量2000但因为网络问题kafka服务器没有收到或超时如消费者1发送一个请求2提交偏移量3000且成功写入kafka 这时如果请求1再重试则kafka偏移量由 3000 设置为2000则 偏移量2000~3000的消息会被重复消费这就是问题所在了 而且如果 把2000,3000 替换为 1 或 100w的偏移量那会影响整个系统性能即便支持幂等因为消费重复消息需要白费系统资源 解决方法 把异常信息提交失败的偏移量存入数据库然后重试重试时一定要注意不要让偏移量后退 【5.4】同步异步组合提交
1轮询中使用异步提交而轮询外在关闭数据库连接前使用同步提交
2代码示例
// 循环拉取
try {while(!Thread.interrupted()) {// 模拟延时try {System.out.println(DateUtils.getNowTimestamp() 消费者-SyncCommit-等待消费消息);TimeUnit.MILLISECONDS.sleep(100);} catch (InterruptedException e) {e.printStackTrace();}// 消费消息ConsumerRecordsString, String consumerRds consumer.poll(100);for(ConsumerRecordString, String rd : consumerRds) {System.out.println(消费者-SyncCommit-分区【 rd.partition() 】offset【 rd.offset() 】 值 rd.value());}// 【异步提交】consumer.commitAsync(new OffsetCommitCallback() {Overridepublic void onComplete(MapTopicPartition, OffsetAndMetadata offsets, Exception exception) {if (exception !null) {System.out.println(异步提交异常);// 把错误信息写入db 或日志}}});}
} finally {try {// 【同步提交】 因为错误时同步提交会一直重试直到提交成功或发生无法恢复的错误 consumer.commitSync();} finally {// 记得关闭消费者consumer.close();}
}
2小结
异步提交commitAsync() 提交速度更快即使这次提交失败下一次提交很可能会成功同步提交commitSync() 提交会因为错误时同步提交会一直重试直到提交成功或发生无法恢复的错误【5.5】提交特定偏移量
1场景 轮询获得了一批数据偏移量从1k ~ 10w我想每处理1k条消息我就提交一次偏移量 这是 commitSync 和 commitAsync() 办不到的因为它们提交的最后一个偏移量而不是中间某个消息的偏移量 2代码示例
// 消费消息【提交特定偏移量】
ConsumerRecordsString, String consumerRds consumer.poll(100);
int counter 0;
for(ConsumerRecordString, String rd : consumerRds) {System.out.println(消费者-SyncCommit-分区【 rd.partition() 】offset【 rd.offset() 】 值 rd.value());if (counter % 100 0) { // 每消费100条消息就提交一次偏移量// 注意这里提交的偏移量要加1curOffsets.put(new TopicPartition(rd.topic(), rd.partition()), new OffsetAndMetadata(rd.offset()1, no metadata));// 异步或同步提交特定偏移量consumer.commitAsync(curOffsets, new OffsetCommitCallbackImpl());}
}
// 轮序完之后若无法整除还要提交剩余消息的偏移量
if (counter % 100 ! 0) {consumer.commitAsync(new OffsetCommitCallbackImpl());
}
注意 提交特定偏移量时需要把最后一个消息的偏移量加1 【5.6】再均衡监听器
1应用场景 在消费者退出或进行分区再均衡前会做一些清理工作如提交偏移量或关闭数据库连接这些工作可以通过监听器来实现 2具体实现和测试参见 kafka再均衡监听器测试_PacosonSWJTU的博客-CSDN博客 【5.7】 从特定偏移量处开始处理记录
1一些场景希望从指定偏移量接收消息
2从分区起始位置末尾位置读取消息的方法分别为
seekToBeginning(CollectionTopicPartition tp);seekToEnd(CollectionTopicPartition tp);
3为了避免处理重复消息有一种解决方案采用原子操作 把消息内容和偏移量放在同一个原子操作完成消息和偏移量要么都提交成功要么都失败 如果消息存数据库偏移量提交到kafka就无法实现原子但把消息和偏移量都存入数据库就可以实现原子操作 然后当消费者下一次启动时就从数据库读取上一次保存偏移量并从该偏移量表示的位置接收消息 3.1以上方案需要依赖 ConsumerRebalanceListener分区再均衡监听器 和 seek(offset) 来实现
向seek() 方法传入偏移量 下文的poll轮询操作就可以从该偏移量接收消息在 监听器的 onPartitionsRevoked() 方法中 把消息和偏移量保存到数据库并提交事务以便下一次消费前从数据库读取偏移量传入seek方法
补充 分区再均衡监听器参见
kafka再均衡监听器测试_PacosonSWJTU的博客-CSDN博客 【5.8】消费者如何退出关闭
1告诉消费者如何优雅退出
另一个线程调用 consumer.wakeup() 方法就可以让消费者退出poll方法并抛出 WakeupException 异常该方法是唯一一个可以从其他线程安全调用的方法
并不需要处理 WakeupException 异常因为它只是用于跳出循环的一种方式记得在退出线程前需要调用 consumer.close() 方法关闭消费者会提交任何还没有提交的东西并向群组协调器发送消息告知自己要离开群组接下来会触发再均衡而不需要等待会话超时
2在消费者主线程添加 jvm关闭时的钩子 jvm中增加一个关闭的钩子当jvm关闭的时候会执行系统中已经设置的所有 通过方法addShutdownHook添加的钩子当系统执行完这些钩子后jvm才会关闭。所以这些钩子可以在jvm关闭的时候进行内存清理、对象销毁等操作。 【5.8.1】带有jvm关闭钩子的消费者
/*** Description 带有jvm关闭钩子的消费者优雅关闭消费者* author xiao tang* version 1.0.0* createTime 2021年12月12日*/
public class MyConsumerShutdownHook {public static void main(String[] args) {// 创建消费者配置信息Properties props new Properties();// 属性配置props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 192.168.163.201:9092,192.168.163.202:9092,192.168.163.203:9092);props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());props.put(ConsumerConfig.GROUP_ID_CONFIG, MyProducer.TOPIC_NAME G1);// 关闭自动提交props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);// 设置消费消息的位置消费最新消息props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, earliest);// 设置分区策略 (默认值-RangeAssignor)props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, RangeAssignor.class.getName());props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, RoundRobinAssignor.class.getName());// 创建消费者KafkaConsumerString, String consumer new KafkaConsumer(props);// 订阅主题 【没有分区再均衡监听器】consumer.subscribe(Arrays.asList(MyProducer.TOPIC_NAME));// 添加消费者jvm关闭钩子在消费者主程序退出前执行addShutdownHook(consumer);// 循环拉取try {while(!Thread.interrupted()) {System.out.println(DateUtils.getNowTimestamp() 带有jvm关闭钩子的消费者等待消费消息);TimeUtils.sleep(1000);// 消费消息ConsumerRecordsString, String consumerRds consumer.poll(100);for(ConsumerRecordString, String rd : consumerRds) {System.out.println(消费者-ShutdownHook-分区【 rd.partition() 】offset【 rd.offset() 】 值 rd.value());}// 【异步提交】consumer.commitAsync(new OffsetCommitCallbackImpl());if (!consumerRds.isEmpty()) throw new RuntimeException(测试-抛出运行时异常);}} finally {try {// 【同步提交】 因为错误时同步提交会一直重试直到提交成功或发生无法恢复的错误consumer.commitSync();} finally {// 记得关闭消费者consumer.close();System.out.println(消费者关闭);}}}/*** description 添加消费者jvm关闭钩子* param consumer 消费者* author xiao tang* date 2021/12/12*/private static void addShutdownHook(Consumer consumer) {Thread mainThread Thread.currentThread();Runtime.getRuntime().addShutdownHook(new Thread() {public void run() {// 关闭钩子运行在独立线程以便我们可以安全退出poll() 抛出WakeupException() 异常System.out.println(进入消费者jvm关闭钩子);consumer.wakeup();try {// 当前线程一直挂起直到主线程运行完成mainThread.join();System.out.println(消费者主程序执行完成关闭钩子done);} catch (InterruptedException e) {e.printStackTrace();System.out.println(消费者jvm关闭钩子执行异常);}}});}
}
运行日志 ........ 消费者-ShutdownHook-分区【2】offset【1294】值[585] ABCDE 消费者-ShutdownHook-分区【2】offset【1295】值[588] ABCDE 消费者关闭 进入消费者jvm关闭钩子 消费者主程序执行完成关闭钩子done Exception in thread main java.lang.RuntimeException: 测试-抛出运行时异常 at kafka.consumer.shutdownhook.MyConsumerShutdownHook.main(MyConsumerShutdownHook.java:57) 注意 在消费者主线程退出之前确保彻底关闭了消费者 【5.9】独立消费者如何接收消息
1独立消费者 即一个群组只有一个消费者接收所有分区消息 这样就不存在分区再均衡问题
2对于独立消费者不需要订阅主题而是为自己分配分区
一个消费者可以订阅主题并加入群组或者为自己分配分区但不能同时做这两件事情
3独立消费者为自己分配分区并接收消息
/*** Description 独立消费者* author xiao tang* version 1.0.0* createTime 2021年12月12日*/
public class MyIndependentConsumer {public static void main(String[] args) {// 创建消费者配置信息Properties props new Properties();// 属性配置props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 192.168.163.201:9092,192.168.163.202:9092,192.168.163.203:9092);props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());props.put(ConsumerConfig.GROUP_ID_CONFIG, MyProducer.TOPIC_NAME G1);// 关闭自动提交props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);// 设置消费消息的位置消费最新消息props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, earliest);// 设置分区策略 (默认值-RangeAssignor)props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, RangeAssignor.class.getName());props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, RoundRobinAssignor.class.getName());// 创建消费者KafkaConsumerString, String consumer new KafkaConsumer(props);// 为自己分配分区这里我们并没有订阅主题*assignPartition(consumer);// 循环拉取try {while(!Thread.interrupted()) {System.out.println(DateUtils.getNowTimestamp() 独立消费者等待消费消息);TimeUtils.sleep(1000);// 消费消息ConsumerRecordsString, String consumerRds consumer.poll(100);for(ConsumerRecordString, String rd : consumerRds) {System.out.println(消费者-独立-分区【 rd.partition() 】offset【 rd.offset() 】 值 rd.value());}// 【异步提交】consumer.commitAsync(new OffsetCommitCallbackImpl());}} finally {try {// 【同步提交】 因为错误时同步提交会一直重试直到提交成功或发生无法恢复的错误consumer.commitSync();} finally {// 记得关闭消费者consumer.close();System.out.println(消费者关闭);}}}/*** description 给自己分配分区* param consumer 消费者* author xiao tang* date 2021/12/12*/ private static void assignPartition(Consumer consumer) {// 获取topic的分区信息列表ListPartitionInfo partitionInfoList consumer.partitionsFor(MyProducer.TOPIC_NAME);if (partitionInfoList null || partitionInfoList.isEmpty()) {throw new RuntimeException(topic 分区查无记录);}// 创建主题分区并添加到列表(所有分区都添加到列表)ListTopicPartition partitions new ArrayList(partitionInfoList.size());partitionInfoList.forEach(x-{partitions.add(new TopicPartition(x.topic(), x.partition()));});// 给自己分配分区consumer.assign(partitions);}
}
为自己分配分区而不是订阅主题核心代码
/*** description 给自己分配分区* param consumer 消费者* author xiao tang* date 2021/12/12*/private static void assignPartition(Consumer consumer) {// 获取topic的分区信息列表ListPartitionInfo partitionInfoList consumer.partitionsFor(MyProducer.TOPIC_NAME);if (partitionInfoList null || partitionInfoList.isEmpty()) {throw new RuntimeException(topic 分区查无记录);}// 创建主题分区并添加到列表(所有分区都添加到列表)ListTopicPartition partitions new ArrayList(partitionInfoList.size());partitionInfoList.forEach(x-{partitions.add(new TopicPartition(x.topic(), x.partition()));});// 给自己分配分区consumer.assign(partitions);}