当前位置: 首页 > news >正文

黄页号码查询快速优化seo

黄页号码查询,快速优化seo,2022网站快速收录技术,深圳建站公司企业kafka3.0 文章目录 kafka3.01. 什么是kafka#xff1f;2. kafka基础架构3. kafka集群搭建4. kafka命令行操作主题命令行【topic】生产者命令行【producer】消费者命令行【consumer】 5. kafka生产者生产者消息发送流程Producer 发送原理普通的异步发送带回调函数的异步发送同步…kafka3.0 文章目录 kafka3.01. 什么是kafka2. kafka基础架构3. kafka集群搭建4. kafka命令行操作主题命令行【topic】生产者命令行【producer】消费者命令行【consumer】 5. kafka生产者生产者消息发送流程Producer 发送原理普通的异步发送带回调函数的异步发送同步发送API 生产者重要参数列表生产者分区生产者发送消息的分区策略案例一指定分区发送消息案例二自定义分区器 生产者如何提高吞吐量数据可靠性【ACK】ACK应答级别ISR队列重复数据分析设置ACK操作案例**数据传递的意义**幂等性kafka事务保证消息的有序性 6. kafka-brokerzookeeper存储的kafka信息Kafka Broker总体工作流程Leader选举流程broker重要参数服役新节点和负载均衡退役旧节点和负载均衡 7. kafka-副本kafka副本基本信息Leader选举流程Follower故障处理流程Leader故障处理流程手动调整分区副本存储增加副本因子kafka文件存储机制kafka日志存储参数配置日志存储配置删除/压缩kafka为啥可以高效读写数据 8. kafak消费者消费者总体工作流程消费者组的初始化流程消费者重要参数消费者API订阅主题订阅分区消费者组案例 分区的分配以及再平衡RangeRoundRobin轮询Sticky粘性 offset偏移量自动提交 offset手动提交 offset指定 Offset 消费指定时间进行消费漏消费和重复消费 消费者事务数据积压消费者如何提高吞吐量 http://kafka.apache.org/downloads.html 1. 什么是kafka Kafka传 统定义Kafka是一个分布式的基于发布/订阅模式的消息队列MessageQueue主要应用于大数据实时处理领域。Kafka最 新定义 Kafka是 一个开源的 分布式事件流平台 Event StreamingPlatform被数千家公司用于高性能数据管道、流分析、数据集成和关键任务应用。 在大数据场景主要采用 Kafka 作为消息队列。在 JavaEE 开发中主要采用 ActiveMQ、RabbitMQ、RocketMQ。 1.1 消息队列应用场景 - 缓冲/消峰有助于控制和优化数据流经过系统的速度解决生产消息和消费消息的处理速度不一致的情况。 - 解耦允许你独立的扩展或修改两边的处理过程只要确保它们遵守同样的接口约束 - 异步通信允许用户把一个消息放入队列但并不立即处理它然后在需要的时候再去处理它们。1.2 消息队列的两种模式或者两种模型 点对点模式消费者主动拉取数据消息收到后清除消息 发布/订阅模式 可以有多个topic主题浏览、点赞、收藏、评论等消费者消费数据之后不删除数据每个消费者相互独立都可以消费到数据 ​ 2. kafka基础架构 Producer 消息生产者就是向 Kafka broker 发消息的客户端。Consumer 消息消费者向 Kafka broker 取消息的客户端。Consumer GroupCG 消费者组由多个 consumer 组成。消费者组内每个消费者负责消费不同分区的数据一个分区只能由一个组内消费者消费消费者组之间互不影响。所有的消费者都属于某个消费者组即消费者组是逻辑上的一个订阅者。Broker 一台 Kafka 服务器就是一个 broker。一个集群由多个 broker 组成。一个broker 可以容纳多个 topicTopic 可以理解为一个队列生产者和消费者面向的都是一个 topic。一个非常大的 topic 可以分布到多个 broker即服务器上Partition 分区。一个 topic 可以分为多个 partition每个 partition 是一个有序的队列。Segment partition 物理上由多个 segment 组成Offset 每个partition都由一系列有序的、不可变的消息组成这些消息被连续的追加到partition中。partition中的每个消息都有一个连续的序列号叫做offset,用于partition唯一标识一条消息.Replica 副本。一个 topic 的每个分区都有若干个副本一个 Leader 和若干个Follower。Leader 每个分区多个副本的“主”生产者发送数据的对象以及消费者消费数据的对象都是 Leader。Follower 每个分区多个副本中的“从”实时从 Leader 中同步数据保持和 Leader 数据的同步。Leader 发生故障时某个 Follower 会成为新的 Leader。 3. kafka集群搭建 1安装环境准备 10.1.61.11710.1.61.11810.1.61.119zookeeperzookeeperzookeeperkafkakafkakafkajdk1.8jdk1.8jdk1.8 上传解压kafka安装包以下仅演示一台服务器的操作另外两台也许执行相同步骤 2修改server.properties 进入到kafka目录修改配置文件vim server.properties重点修改如下三个配置 broker.id0broker 的全局唯一编号不能重复只能是数字log.dirsxx运行日志(数据)存放的路径zookeeper.connect10.1.61.121:2181/kafka #broker 的全局唯一编号不能重复只能是数字 broker.id0 #处理网络请求的线程数量 num.network.threads3 #用来处理磁盘 IO 的线程数量 num.io.threads8 #发送套接字的缓冲区大小 socket.send.buffer.bytes102400 #接收套接字的缓冲区大小 socket.receive.buffer.bytes102400 #请求套接字的缓冲区大小 socket.request.max.bytes104857600 #kafka 运行日志(数据)存放的路径路径不需要提前创建kafka自动帮你创建可以配置多个磁盘路径路径与路径之间可以用分隔 log.dirs/opt/module/kafka/datas #topic 在当前 broker 上的分区个数 num.partitions1 #用来恢复和清理 data 下数据的线程数量 num.recovery.threads.per.data.dir1 # 每个 topic 创建分区时的副本数默认时 1 个副本 offsets.topic.replication.factor1 #segment 文件保留的最长时间超时将被删除 log.retention.hours168 #每个 segment 文件的大小默认最大 1G log.segment.bytes1073741824 # 检查过期数据的时间默认 5 分钟检查一次是否数据过期 log.retention.check.interval.ms300000 #配置连接 Zookeeper 集群地址在 zk 根目录下创建/kafka方便管理 zookeeper.connecthadoop102:2181,hadoop103:2181,hadoop104:2181/ka fka注在其他服务器分别执行上述 操作修改server.properties中的broker.id1、broker.id2注broker.id 不得重复整个集群中唯一 3配置环境变量 #在/etc/profile.d/my_env.sh 文件中增加 kafka 环境变量配置 sudo vim /etc/profile.d/my_env.sh-------------- 增加以下内容 ------------------ #KAFKA_HOME export KAFKA_HOME/opt/module/kafka export PATH$PATH:$KAFKA_HOME/bin ---------------------------------------------#刷新一下环境变量 source /etc/profile4zookeeper 集群启停脚本 #!/bin/bashcase $1 in start){for i in hadoop102 hadoop103 hadoop104doecho ------------- zookeeper $i 启动 ------------ssh $i /opt/module/zookeeper-3.5.7/bin/zkServer.sh startdone } ;; stop){for i in hadoop102 hadoop103 hadoop104doecho ------------- zookeeper $i 停止 ------------ssh $i /opt/module/zookeeper-3.5.7/bin/zkServer.sh stopdone } ;; status){for i in hadoop102 hadoop103 hadoop104doecho ------------- zookeeper $i 状态 ------------ssh $i /opt/module/zookeeper-3.5.7/bin/zkServer.sh statusdone } ;; esac5kafka 集群启停脚本 Kafka启动命令 ./kafka-server-start.sh -daemon …/config/server.propertiesKafka关闭命令 ./kafka-server-stop.sh 编写kf启停脚本 linux查看服务器主机名命令hostname。需要提前配置所有服务器的免密登录 #! /bin/bash case $1 in start){for i in node1 node2doecho --------启动 $i Kafka-------ssh -p 19222 $i /home/crbt/local/kafka_2.13-2.7.1/bin/kafka-server-start.sh -daemon /home/crbt/local/kafka_2.13-2.7.1/config/server.propertiesdone };; stop){for i in node1 node2doecho --------停止 $i Kafka-------ssh -p 19222 $i /home/crbt/local/kafka_2.13-2.7.1/bin/kafka-server-stop.sh done };; esac分配权限chmod x kf.sh 注意停止 Kafka 集群时一定要等 Kafka 所有节点进程全部停止后再停止 Zookeeper集群。因为 Zookeeper 集群当中记录着 Kafka 集群相关信息Zookeeper 集群一旦先停止Kafka 集群就没有办法再获取停止进程的信息只能手动杀死 Kafka 进程了。 4. kafka命令行操作 kafka启停命令 Kafka启动命令./kafka-server-start.sh -daemon …/config/server.propertiesKafka关闭命令 ./kafka-server-stop.sh 注意启动时配置文件的路径要能够到 server.properties。先启动 Zookeeper 集群然后启动 Kafka 主题命令行【topic】 参数描述–bootstrap-server String: server toconnect to连接的 Kafka Broker 主机名称和端口号–topic String: topic操作的 topic 名称–create / --delete / --alter / --list创建/删除/修改主题查看所有主题–describe查看主题详细描述–partitions Integer: # of partitions设置分区数–replication-factorInteger: replication factor设置分区副本–config String: namevalue更新系统默认的配置 bin/kafka-topics.sh # 查看指定kafka上的所有主题 ./kafka-topics.sh --bootstrap-server localhost:9092,10.1.61.122:9092 --list# 创建first topic ./kafka-topics.sh --bootstrap-server localhost:9092 --create --partitions 2 --replication-factor 2 --topic first# 查看主题详情 ./kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic first Topic: first PartitionCount: 1 ReplicationFactor: 2 Configs: segment.bytes1073741824 Topic: first Partition: 0 Leader: 2 Replicas: 2,1 Isr: 2,1./kafka-topics.sh --zookeeper 10.1.61.121:2181 --describe --topic first Topic: first PartitionCount: 2 ReplicationFactor: 2 Configs:Topic: first Partition: 0 Leader: none Replicas: 2,1 Isr: 1Topic: first Partition: 1 Leader: none Replicas: 1,2 Isr: 1# 修改主题分区数注意分区数只能增加不能减少 ./kafka-topics.sh --bootstrap-server localhost:9092 --alter --topic first --partitions 2 ./kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic first Topic: first PartitionCount: 2 ReplicationFactor: 2 Configs: segment.bytes1073741824Topic: first Partition: 0 Leader: 2 Replicas: 2,1 Isr: 2,1Topic: first Partition: 1 Leader: 1 Replicas: 1,2 Isr: 1,2# 删除主题 ./kafka-topics.sh --bootstrap-server localhost:9092 --delete --topic first生产者命令行【producer】 参数描述–bootstrap-server String: server toconnect to连接的 Kafka Broker 主机名称和端口号–topic String: topic操作的 topic 名称 kafka-console-producer.sh # 往topic中写入数据 ./kafka-console-producer.sh --bootstrap-server localhost:9092 --topic first消费者命令行【consumer】 参数描述–bootstrap-server String: server toconnect to连接的 Kafka Broker 主机名称和端口号–topic String: topic操作的 topic 名称–from-beginning从头开始消费–group String: consumer group id指定消费者组名称 kafka-console-consumer.sh # 消费主题中新生成的消息 ./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic first# 消费主题中存量消息从最早的消息开始消费 ./kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic first5. kafka生产者 生产者消息发送流程 Producer 发送原理 ​ 在消息发送的过程中涉及到了两个线程——main 线程和 Sender 线程。在 main 线程中创建了一个双端队列 RecordAccumulator。main 线程将消息发送给 RecordAccumulatorSender 线程不断从 RecordAccumulator 中拉取消息发送到 Kafka Broker。 RecordAccumulator 双端队列缓冲区有两个核心参数: batch.size只有数据积累到这个值的大小后sender 线程才会来拉取一波数据默认大小16k发送到 Broker。linger.ms数据未达到 batch.size 大小时等待时间超过 linger.ms 设置的值后也会被发送到 sender 线程中发给 Broker。 Acks: 0生产者发来数据不需要等待消息落盘直接应答。1生产者发来数据leader 收到数据后应答。-1all生产者发来数据后leader 和 ISR 队列里所有节点都收到消息后才应答。 retries发送失败后的重试次数默认时int类型最大值建议设置小一还要设置一个间隔时间。 普通的异步发送 package com.lihw.kafka.producer; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord;import java.util.Properties; public class CustomProducer {public static void main(String[] args) {// 1. 创建 kafka 生产者的配置对象Properties properties new Properties();// 2. 给 kafka 配置对象添加配置信息bootstrap.serversproperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 10.1.61.121:9092);// key,value 序列化必须key.serializervalue.serializerproperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,org.apache.kafka.common.serialization.StringSerializer);properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,org.apache.kafka.common.serialization.StringSerializer);// 3. 创建 kafka 生产者对象KafkaProducerString, String kafkaProducer new KafkaProducerString, String(properties);// 4. 调用 send 方法,发送消息for (int i 0; i 10; i) {kafkaProducer.send(new ProducerRecord(first,lihw i));}// 5. 关闭资源kafkaProducer.close();} }消费first主题 crbtnode2:/home/crbt/local/kafka_2.13-2.7.1/bin./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic first lihw 0 lihw 1 lihw 2 lihw 3 lihw 4带回调函数的异步发送 ​ 回调函数会在 producer 收到 ack 时调用为异步调用该方法有两个参数分别是元数据信息RecordMetadata和异常信息Exception如果 Exception 为 null说明消息发送成功如果 Exception 不为 null说明消息发送失败。 package com.lihw.kafka.producer; import org.apache.kafka.clients.producer.*; import org.apache.kafka.common.serialization.StringSerializer; import java.util.Properties;public class CustomProducerCallback {public static void main(String[] args){// 1. 创建 kafka 生产者的配置对象Properties properties new Properties();// 2. 给 kafka 配置对象添加配置信息bootstrap.serversproperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 10.1.61.121:9092,10.1.61.122:9092);// key,value 序列化必须key.serializervalue.serializerproperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,org.apache.kafka.common.serialization.StringSerializer);properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,org.apache.kafka.common.serialization.StringSerializer);// request.timeout.ms 3000msproperties.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG,3000);// 3. 创建 kafka 生产者对象KafkaProducerString, String kafkaProducer new KafkaProducerString, String(properties);try{// 4. 调用 send 方法,发送消息for (int i 0; i 10; i) {kafkaProducer.send(new ProducerRecord(first, lihw i), new Callback() {Overridepublic void onCompletion(RecordMetadata metadata, Exception e) {if (e null) {// 没有异常,输出信息到控制台System.out.println( 主题 metadata.topic() - 分区 metadata.partition());} else {// 出现异常打印e.printStackTrace();}}});// 延迟一会会看到数据发往不同分区Thread.sleep(2);}}catch (Exception e){e.printStackTrace();}finally {// 5. 关闭资源kafkaProducer.close();}} }主题 first - 分区0主题 first - 分区0主题 first - 分区0主题 first - 分区0主题 first - 分区0主题 first - 分区1主题 first - 分区1主题 first - 分区1主题 first - 分区1主题 first - 分区1注意消息发送失败会自动重试不需要我们在回调函数中手动重试。 同步发送API 只需在异步发送的基础上再调用一下 get()方法即可。 kafkaProducer.send(new ProducerRecord(first,kafka i)).get();生产者重要参数列表 参数名称描述bootstrap.servers生产者连接集群所需的 broker 地址清单 。 例如10.1.61.121:9092,10.1.61.122:9092 可以设置 1 个或者多个中间用逗号隔开。注意这里并非需要所有的 broker 地址因为生产者从给定的 broker 里查找到其他 broker 信息key.serializer 和 value.serializer指定发送消息的 key 和 value 的序列化类型。一定要写全类名。buffer.memoryRecordAccumulator 缓冲区总大小默认 32m。batch.size缓冲区一批数据最大值默认 16k。适当增加该值可以提高吞吐量但是如果该值设置太大会导致数据传输延迟增加。linger.ms如果数据迟迟未达到 batch.sizesender 等待 linger.time之后就会发送数据。单位 ms默认值是 0ms表示没有延迟。生产环境建议该值大小为 5-100ms 之间。acks0生产者发送过来的数据不需要等数据落盘应答。1生产者发送过来的数据Leader 收到数据后应答。-1all生产者发送过来的数据Leader和 isr 队列里面的所有节点收齐数据后应答。默认值是-1-1 和 all是等价的retries当消息发送出现错误的时候系统会重发消息。retries表示重试次数。默认是 int 最大值2147483647。如果设置了重试还想保证消息的有序性需要设置MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION1否则在重试此失败消息的时候其他的消息可能发送成功了。retry.backoff.ms两次重试之间的时间间隔默认是 100ms配合retries使用enable.idempotence是否开启幂等性默认 true开启幂等性。compression.type生产者发送的所有数据的压缩方式。默认是 none 也就是不压缩。 支持压缩类型none、gzip、snappy、lz4 和 zstd。max.in.flight.requests.per.connection用于控制在网络连接上允许的未确认请求的最大数量。该参数指定了在发送新请求之前生产者或消费者可以在单个网络连接上等待的未确认请求的最大数量。 保证消息的有序性开启幂等性 max.in.flight.requests.per.connection 5。 生产者分区 便于合理使用存储资源一个Topic可以有多个partition分区Partition 可以分布在多个 Broker 上存储可以把海量的数据按照分区切割成一块一块数据存储在多台 Broker 上。合理控制分区的任务可以实现负载均衡的效果。提高并行度生产者可以以分区为单位发送数据消费者可以以分区为单位进行消费数据。 生产者发送消息的分区策略 指定partition的发送消息时直接发送到指定的分区中未指定partition但有key的对key进行hash然后对partition数进行取余得出该发往哪个分区key相同的会将消息发往同一分区没有key也未指定partition的采用粘性分区器选择一个分区开始存储待该分区的batch已满或者已完成Kafka再随机一个分区进行使用和上一次的分区不同 public ProducerRecord(String topic, Integer partition, Long timestamp, K key, V value, IterableHeader headers) {... }public ProducerRecord(String topic, Integer partition, Long timestamp, K key, V value){... }public ProducerRecord(String topic, Integer partition, K key, V value, IterableHeader headers) {... }public ProducerRecord(String topic, Integer partition, K key, V value) {... }/** 没有指明partition值但有key的情况下将key的hash值与topic的partition数进行取余得到partition值* 例如key1的hash值5 key2的hash值6 topic的总partition数2那么key1 的value1写入1分区key2的value2写入0分区。 */ public ProducerRecord(String topic, K key, V value) {... }/** 既没有partition值又没有key值的情况下Kafka采用Sticky Partition黏性分区器会随机选择一个分区并尽可能一直使用该分区待该分区的batch已满或者已完成Kafka再随机一个分区进行使用和上一次的分区不同 例如第一次随机选择0号分区等0号分区当前批次满了默认16k或者linger.ms设置的时间到 Kafka再随机一个分区进行使用如果还是0会继续随机*/ public ProducerRecord(String topic, V value) {... }案例一指定分区发送消息 package com.lihw.kafka.producer; import org.apache.kafka.clients.producer.*; import org.apache.kafka.common.serialization.StringSerializer; import java.util.Properties;public class CustomProducerCallbackPartitions {public static void main(String[] args) {// 1. 创建 kafka 生产者的配置对象Properties properties new Properties();// 2. 给 kafka 配置对象添加配置信息bootstrap.serversproperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 10.1.61.121:9092,10.1.61.122:9092);// key,value 序列化必须key.serializervalue.serializerproperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());// request.timeout.ms 3000msproperties.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG,3000);// 3. 创建 kafka 生产者对象KafkaProducerString, String kafkaProducer new KafkaProducerString, String(properties);for (int i 0; i 5; i) {// 指定数据发送到 0 号分区key 为空IDEA 中 ctrl p 查看参数// 参数1主题 参数2分区 参数3key 参数4valuekafkaProducer.send(new ProducerRecord(first,0,,NBA i), new Callback() {Overridepublic void onCompletion(RecordMetadata metadata,Exception e) {if (e null){System.out.println( 主题 metadata.topic() - 分区 metadata.partition());}else {e.printStackTrace();}}});}kafkaProducer.close();} }主题 first - 分区0主题 first - 分区0主题 first - 分区0主题 first - 分区0主题 first - 分区0案例二自定义分区器 自定义分区器实现Partitioner接口重写partition方法方法返回的是分区数 package com.lihw.kafka.producer; import org.apache.kafka.clients.producer.Partitioner; import org.apache.kafka.common.Cluster; import java.util.Map;public class MyPartitioner implements Partitioner {/*** 返回信息对应的分区* param topic 主题* param key 消息的 key* param keyBytes 消息的 key 序列化后的字节数组* param value 消息的 value* param valueBytes 消息的 value 序列化后的字节数组* param cluster 集群元数据可以查看分区信息* return*/Overridepublic int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {// 获取消息String msgValue value.toString();// 创建 partitionint partition;// 判断消息是否包含 lihwif (msgValue.contains(lihw)){partition 0;}else {partition 1;}// 返回分区号return partition;}Overridepublic void close() {}Overridepublic void configure(MapString, ? configs) {} } 发送消息测试最终消息全部打在0号分区 package com.lihw.kafka.producer; import org.apache.kafka.clients.producer.*; import org.apache.kafka.common.serialization.StringSerializer; import java.util.Properties;public class CustomProducerCallbackPartitions {public static void main(String[] args) {// 1. 创建 kafka 生产者的配置对象Properties properties new Properties();// 2. 给 kafka 配置对象添加配置信息bootstrap.serversproperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 10.1.61.121:9092,10.1.61.122:9092);// key,value 序列化必须key.serializervalue.serializerproperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());// request.timeout.ms 3000msproperties.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG,3000);// 3.添加自定义分区器properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,com.lihw.kafka.producer.MyPartitioner);// 4. 创建 kafka 生产者对象KafkaProducerString, String kafkaProducer new KafkaProducerString, String(properties);for (int i 0; i 10; i) {// 指定数据发送到 0 号分区key 为空,根据自定义分区器进行分区kafkaProducer.send(new ProducerRecord(first,lihw i), new Callback() {Overridepublic void onCompletion(RecordMetadata metadata,Exception e) {if (e null){System.out.println( 主题 metadata.topic() - 分区 metadata.partition());}else {e.printStackTrace();}}});}kafkaProducer.close();} }结果消息全部发到了0号分区121节点 crbtnode1:/home/crbt/local/kafka_2.13-2.7.1/bin./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic first lihw 0 lihw 1 lihw 2 lihw 3 lihw 4 lihw 5 lihw 6 lihw 7 lihw 8 lihw 9生产者如何提高吞吐量 batch.size批次大小默认16klinger.ms等待时间修改为5-100mscompression.type压缩snappyRecordAccumulator缓冲区大小修改为64m package com.lihw.kafka.producer;import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.serialization.StringSerializer; import java.util.Properties;public class CustomProducerParameters {public static void main(String[] args) {// 1. 创建 kafka 生产者的配置对象Properties properties new Properties();// 2. 给 kafka 配置对象添加配置信息bootstrap.serversproperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 10.1.61.121:9092,10.1.61.122:9092);// key,value 序列化必须key.serializervalue.serializerproperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());// 提高吞吐量的参数// batch.size批次大小默认 16Kproperties.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);// linger.ms等待时间默认 0properties.put(ProducerConfig.LINGER_MS_CONFIG, 1);// RecordAccumulator缓冲区大小默认 32Mbuffer.memoryproperties.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);// compression.type压缩默认 none可配置值 gzip、snappy、lz4 和 zstdproperties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, snappy);// 3. 创建 kafka 生产者对象KafkaProducerString, String kafkaProducer new KafkaProducerString, String(properties);// 4. 调用 send 方法,发送消息for (int i 0; i 5; i) {kafkaProducer.send(new ProducerRecord(first,1,,lihewei777 i));}// 5. 关闭资源kafkaProducer.close();} } 数据可靠性【ACK】 ACK应答级别 0生产者发送过来的数据不需要等数据落盘应答。生产者发送过来数据就不管了可靠性差效率高1生产者发送过来的数据Leader收到数据后应答。可靠性中等效率中等-1all生产者发送过来的数据Leader 和 ISR 队列里面的所有节点收齐数据后应答。可靠性高效率低 ISR队列 ​ Leader 维护了一个动态的 in-sync replica setISR意为和 Leader 保持同步的 Follower Leader 集合(leader0isr:0,1,2)。如果 Follower 长时间未向 Leader 发送通信请求或同步数据则该 Follower 将被踢出 ISR。该时间阈值由 replica.lag.time.max.ms 参数设定默认30s。例如2号副本超时ISR队列则 (leader:0, isr:0,1)。 重复数据分析 acks -1all生产者发送过来的数据Leader和ISR队列里面的所有节点收齐数据后应答。leader 收到消息并落盘后返回ack之前挂掉此时会重新选选举出新的 leaderproducer 重新发送相同的消息导致数据重复。 设置ACK操作案例 // 设置 acksproperties.put(ProducerConfig.ACKS_CONFIG, all);// 重试次数 retries默认是 int 最大值2147483647properties.put(ProducerConfig.RETRIES_CONFIG, 3);package com.lihw.kafka.producer; import org.apache.kafka.clients.producer.*; import org.apache.kafka.common.serialization.StringSerializer; import java.util.Properties;public class CustomProducerAck {public static void main(String[] args) {// 1. 创建 kafka 生产者的配置对象Properties properties new Properties();// 2. 给 kafka 配置对象添加配置信息bootstrap.serversproperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 10.1.61.121:9092,10.1.61.122:9092);// key,value 序列化必须key.serializervalue.serializerproperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());// 3. 设置 acksproperties.put(ProducerConfig.ACKS_CONFIG, all);// 4. 重试次数 retries默认是 int 最大值2147483647properties.put(ProducerConfig.RETRIES_CONFIG, 3);// 4. 创建 kafka 生产者对象KafkaProducerString, String kafkaProducer new KafkaProducerString, String(properties);for (int i 0; i 10; i) {// 指定数据发送到 0 号分区key 为空,根据自定义分区器进行分区kafkaProducer.send(new ProducerRecord(first,lili i), new Callback() {Overridepublic void onCompletion(RecordMetadata metadata,Exception e) {if (e null){System.out.println( 主题 metadata.topic() - 分区 metadata.partition());}else {e.printStackTrace();}}});}kafkaProducer.close();} } 数据传递的意义 至少一次At Least Once ACK级别设置为-1 分区副本大于等于2 ISR里应答的最小副本数量大于等于2不能保证数据不重复最多一次At Most Once ACK级别设置为0不能保证数据不丢失精确一次Exactly Once对于一些非常重要的信息比如和钱相关的数据要求数据既不能重复也不丢失。Kafka 0.11版本以后引入了一项重大特性幂等性和事务。 Kafka 0.11版本以后引入了一项重大特性幂等性和事务。 幂等性 幂等性就是指 Producer 不论向 Broker 发送多少次重复数据Broker 端都只会持久化一条保证了不重复。 如何保证消息可以发送时精确一次Exactly Once 幂等性开启 至少一次 ack-1 分区副本数2 ISR最小副本数量2 重复数据的判断标准具有 PID, Partition, SeqNumber 相同主键的消息提交时Broker 只会持久化一条。其中 PID 是 Kafka 每次重启都会分配一个新的Partition 表示分区号Sequence Number 是单调自增的 所以幂等性只能保证的是在单分区单会话内不重复。幂等性开启参数 enable.idempotence 默认为 truefalse 关闭。 kafka事务 开启事务必须开启幂等性Producer 在使用事务功能前必须先自定义一个唯一的 transactional.id。有了 transactional.id即使客户端挂掉了它重启后也能继续处理未完成的事务。 默认有50个分区每个分区负责一部分事务。事务划分是根据transactional.id的hashcode值%50计算出该事务属于哪个分区。该分区Leader副本所在的broker节点即为这个transactional.id对应的Transaction Coordinator节点。 Kafka 的事务一共有如下 5 个 API // 1 初始化事务 void initTransactions();// 2 开启事务 void beginTransaction() throws ProducerFencedException;// 3 在事务内提交已经消费的偏移量主要用于消费者 void sendOffsetsToTransaction(MapTopicPartition, OffsetAndMetadata offsets, String consumerGroupId) throws ProducerFencedException;// 4 提交事务 void commitTransaction() throws ProducerFencedException;// 5 放弃事务类似于回滚事务的操作 void abortTransaction() throws ProducerFencedException;单个 Producer使用事务保证消息的仅一次发送 package com.lihw.kafka.producer;import org.apache.kafka.clients.producer.*; import org.apache.kafka.common.serialization.StringSerializer;import java.util.Properties;public class CustomProducerTransactions {public static void main(String[] args) {// 1. 创建 kafka 生产者的配置对象Properties properties new Properties();// 2. 给 kafka 配置对象添加配置信息bootstrap.serversproperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 10.1.61.121:9092,10.1.61.122:9092);// key,value 序列化必须key.serializervalue.serializerproperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());// 设置事务 id必须事务 id 任意起名properties.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, transaction_id_0);// 3. 创建 kafka 生产者对象KafkaProducerString, String kafkaProducer new KafkaProducerString, String(properties);// 初始化事务kafkaProducer.initTransactions();// 开启事务kafkaProducer.beginTransaction();try {// 4. 调用 send 方法,发送消息for (int i 0; i 10; i) {// 发送消息kafkaProducer.send(new ProducerRecord(first,transaction i), new Callback() {Overridepublic void onCompletion(RecordMetadata metadata, Exception e) {if (e null){System.out.println( 主题 metadata.topic() - 分区 metadata.partition());}else {e.printStackTrace();}}});}int i 1 / 0;// 提交事务kafkaProducer.commitTransaction();} catch (Exception e) {// 终止事务kafkaProducer.abortTransaction();} finally {// 5. 关闭资源kafkaProducer.close();}} }如果发生异常则终止事务 org.apache.kafka.common.errors.TransactionAbortedException: Failing batch since transaction was aborted org.apache.kafka.common.errors.TransactionAbortedException: Failing batch since transaction was aborted org.apache.kafka.common.errors.TransactionAbortedException: Failing batch since transaction was aborted org.apache.kafka.common.errors.TransactionAbortedException: Failing batch since transaction was aborted org.apache.kafka.common.errors.TransactionAbortedException: Failing batch since transaction was aborted org.apache.kafka.common.errors.TransactionAbortedException: Failing batch since transaction was aborted org.apache.kafka.common.errors.TransactionAbortedException: Failing batch since transaction was aborted org.apache.kafka.common.errors.TransactionAbortedException: Failing batch since transaction was aborted org.apache.kafka.common.errors.TransactionAbortedException: Failing batch since transaction was aborted org.apache.kafka.common.errors.TransactionAbortedException: Failing batch since transaction was aborted保证消息的有序性 1kafka在1.x版本之前保证数据单分区有序条件如下max.in.flight.requests.per.connection1不需要考虑是否开启幂等性。 2kafka在1.x及以后版本保证数据单分区有序条件如下 开启幂等性max.in.flight.requests.per.connection需要设置小于等于5未开启幂等性max.in.flight.requests.per.connection需要设置为1。 原因说明因为在kafka1.x以后启用幂等后kafka服务端会缓存producer发来的最近5个request的元数据故无论如何都可以保证最近5个request的数据都是有序的 6. kafka-broker zookeeper存储的kafka信息 brokers/topics/主题/partitions/…brokers/ids当前存活的主机controller负责选举副本的Leader Kafka Broker总体工作流程 ARkafka分区中的所有副本统称AR ISR ASRISR记录存活的leader和follower的set集合ASR表示 Follower 与 Leader 副本同步时延迟过多的副本。 Leader选举流程 broker启动后在zk中注册/brokers/ids/ [0,1,2]controller在zk中注册谁先注册谁说了算老大负责监听brokers节点变化决定broker中leader的选举将节点信息上传到zk的ISR中[“lerder:0”,“isr”:“0,1,2”]其他controller从zk同步相关信息假设broker1中leader挂了controller中的老大监听到节点变化获取ISR选举新的Leader在ISR中存活为前提按照AR顺序排在前面的优先更新Leader和ISR队列 broker重要参数 参数名称描述replica.lag.time.max.msISR 中如果 Follower 长时间未向 Leader 发送通信请求或同步数据则该 Follower 将被踢出 ISR。该时间阈值默认30s。auto.leader.rebalance.enable默认是 true。 自动 Leader Partition 平衡。leader.imbalance.per.broker.percentage默认是 10%。每个 broker 允许的不平衡的 leader 的比率。如果每个 broker 超过了这个值控制器会触发 leader 的平衡。leader.imbalance.check.interval.seconds默认值 300 秒。检查 leader 负载是否平衡的间隔时间。log.segment.bytesKafka 中 log 日志是分成一块块存储的此配置是指 log 日志划分成块的大小默认值 1G。log.index.interval.bytes默认 4kbkafka 里面每当写入了 4kb 大小的日志.log然后就往 index 文件里面记录一个索引。log.retention.hoursKafka 中数据保存的时间默认 7 天。log.retention.minutesKafka 中数据保存的时间分钟级别默认关闭。log.retention.msKafka 中数据保存的时间毫秒级别默认关闭。log.retention.check.interval.ms检查数据是否保存超时的间隔默认是 5 分钟。log.retention.bytes默认等于-1表示无穷大也表示关闭。超过设置的所有日志总大小删除最早的 segment。log.cleanup.policy默认是 delete表示所有数据启用删除策略如果设置值为 compact表示所有数据启用压缩策num.io.threads默认是 8。负责写磁盘的线程数。整个参数值要占总核数的 50%。num.replica.fetchers默认是 1。副本拉取线程数这个参数占总核数的 50%的 1/3num.network.threads默认是 3。数据传输线程数这个参数占总核数的50%的 2/3 。log.flush.interval.messages强制页缓存刷写到磁盘的条数默认是 long 的最大值9223372036854775807。一般不建议修改 交给系统自己管理。log.flush.interval.ms每隔多久刷数据到磁盘默认是 null。一般不建议修改交给系统自己管理。replication.factor创建Topic时每个分区的副本数默认是1min.insync.replicas 1用于指定在执行写入操作时所需同步的最小副本数与acks参数冲突时以acks的为主unclean.leader.election.enable当 leader 副本发生故障时就不会从 follower 副本中和 leader 同步程度达不到要求的副本中选择出 leader 这样可降低了消息丢失。 服役新节点和负载均衡 kafka/bin/kafka-reassign-partitions.sh 参数描述–broker-list选择哪些broker “0,1,2,3”–generate创建 副本存储计划–execute执行 副本存储计划–verify验证 副本存储计划–topics-to-move-json-file指定 包含主题信息的json文件–reassignment-json-file指定 包含副本存储计划的json文件 节点服役 准备新的kafka节点node0假设已有node1node2修改server.properties配置文件中的broker.id、log.dirs、zookeeper.connect重启kafka集群然后启动新节点node3负载均衡 服役新节点执行负载均衡操作 创建一个要均衡的主题。 vim topics-to-move.json{topics: [{topic: first}],version: 1 }生成一个负载均衡的计划。kafka-reassign-partitions.sh crbteb61119:/home/crbt/local/kafka_2.13-2.7.1/bin./kafka-reassign-partitions.sh --bootstrap-server localhost:9093 --topics-to-move-json-file topics-to-move.json --broker-list 0,1,2, --generateCurrent partition replica assignment当前分区副本分配情况只使用12没有0节点 {version:1,partitions:[{topic:first,partition:0,replicas:[1,2],log_dirs:[any,any]},{topic:first,partition:1,replicas:[2,1],log_dirs:[any,any]}]}Proposed partition reassignment configuration加入了新的节点broker.id0,建议的分区副本分配情况 {version:1,partitions:[{topic:first,partition:0,replicas:[2,0],log_dirs:[any,any]},{topic:first,partition:1,replicas:[0,1],log_dirs:[any,any]}]}创建副本存储计划将上面生产的负载均衡计划粘贴到此json文件中所有副本存储在 broker0、broker1、broker2中 vim increase-replication-factor.json{version:1,partitions:[{topic:first,partition:0,replicas:[2,0],log_dirs:[any,any]},{topic:first,partition:1,replicas:[0,1],log_dirs:[any,any]}]}执行副本存储计划 kafka-reassign-partitions.sh ./kafka-reassign-partitions.sh --bootstrap-server localhost:9093 --reassignment-json-file increase-replication-factor.json --execute验证副本存储计划 ./kafka-reassign-partitions.sh --bootstrap-server localhost:9093 --reassignment-json-file increase-replication-factor.json --verifyor./kafka-topics.sh --bootstrap-server localhost:9093 --describe --topic first退役旧节点和负载均衡 先按照退役一台节点生成执行计划然后按照服役时操作流程执行负载均衡。 创建一个要均衡的主题。 vim topics-to-move.json{topics: [{topic: first}],version: 1 }创建执行计划 ./kafka-reassign-partitions.sh --bootstrap-server 10.1.61.119:9093 --topics-to-move-json-file topics-to-move.json --broker-list 0,1 --generateCurrent partition replica assignment当前分区副本分配情况目前数据分布在012节点上 {version:1,partitions:[{topic:first,partition:0,replicas:[2,1],log_dirs:[any,any]},{topic:first,partition:1,replicas:[1,0],log_dirs:[any,any]},{topic:first,partition:2,replicas:[1,2],log_dirs:[any,any]}]}Proposed partition reassignment configuration建议的分区副本分配情况数据分布在01节点上 {version:1,partitions:[{topic:first,partition:0,replicas:[1,0],log_dirs:[any,any]},{topic:first,partition:1,replicas:[0,1],log_dirs:[any,any]},{topic:first,partition:2,replicas:[1,0],log_dirs:[any,any]}]} 创建副本存储计划将上方生产的负载均衡策略粘贴到此json文件中所有副本存储在 broker0、broker1中 vim increase-replication-factor.json建议的分区副本分配情况 {version:1,partitions:[{topic:first,partition:0,replicas:[1,0],log_dirs:[any,any]},{topic:first,partition:1,replicas:[0,1],log_dirs:[any,any]},{topic:first,partition:2,replicas:[1,0],log_dirs:[any,any]}]}执行副本存储计划 bin/kafka-reassign-partitions.sh --bootstrap-server 10.1.61.119:9093 --reassignment-json-file increase-replication-factor.json --execute验证副本存储计划 bin/kafka-reassign-partitions.sh --bootstrap-server 10.1.61.119:9093 --reassignment-json-file increase-replication-factor.json --verifyor./kafka-topics.sh --bootstrap-server 10.1.61.119:9093 --describe --topic first7. kafka-副本 kafka副本基本信息 Kafka 副本作用提高数据可靠性。Kafka 默认副本 1 个生产环境一般配置为 2 个保证数据可靠性太多副本会增加磁盘存储空间增加网络上数据传输降低效率Kafka 中副本分为Leader 和 Follower。Kafka 生产者只会把数据发往 Leader然后 Follower 找 Leader 进行同步数据Kafka 分区中的所有副本统称为 ARAssigned Repllicas AR ISR OSR AR是指为每个分区分配的副本集合包括leader副本和follower副本 ISR表示和 Leader 副本保持同步的 Follower 副本的集合。如果 Follower 长时间未向 Leader 发送通信请求或同步数据则该 Follower 将被踢出 ISR。该时间阈值由 replica.lag.time.max.ms参数设定默认 30s。Leader 发生故障之后就会从 ISR 中选举新的 Leader。已同步的Follower副本OSR表示 Follower 与 Leader 副本同步时延迟过多的副本。掉队的Follower副本 Leader选举流程 ​ Kafka 集群中有一个 broker 的 Controller 会被选举为 Controller Leader负责管理集群 broker 的上下线所有 topic 的分区副本分配和 Leader 选举等工作。Controller 的信息同步工作是依赖于 Zookeeper 的。 测试创建一个新的 topic3 个分区3 个副本 # 错误示范由于我的kafka是三台所以无法创建多余3的副本数kafka的副本数量不能大于broker节点数量 ./kafka-topics.sh --bootstrap-server 10.1.61.121:9092 --create --topic lihw --partitions 4 --replication-factor 4 Error while executing topic command : Replication factor: 4 larger than available brokers: 3.[2023-08-24 11:18:29,468] ERROR org.apache.kafka.common.errors.InvalidReplicationFactorException: Replication factor: 4 larger than available brokers: 3.# 创建一个新的 topic3 个分区3 个副本 ./kafka-topics.sh --bootstrap-server 10.1.61.121:9092 --create --topic lihw --partitions 3 --replication-factor 3 Created topic lihw.查看主题分区部署情况 ./kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic lihwTopic: lihw PartitionCount: 3 ReplicationFactor: 3 Configs: segment.bytes1073741824 Topic: lihw Partition: 0 Leader: 1 Replicas: 1,2,0 Isr: 1,2,0 Topic: lihw Partition: 1 Leader: 0 Replicas: 0,1,2 Isr: 0,1,2 Topic: lihw Partition: 2 Leader: 2 Replicas: 2,0,1 Isr: 2,0,1停掉broker0的节点查看分区部署情况 ./kafka-server-stop.sh ./kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic lihw Topic: lihw PartitionCount: 3 ReplicationFactor: 3 Configs: segment.bytes1073741824 Topic: lihw Partition: 0 Leader: 1 Replicas: 1,2,0 Isr: 1,2 Topic: lihw Partition: 1 Leader: 1 Replicas: 0,1,2 Isr: 1,2 Topic: lihw Partition: 2 Leader: 2 Replicas: 2,0,1 Isr: 2,1# 启动broker0的kafka后重新查看主题分区部署情况 Topic: lihw PartitionCount: 3 ReplicationFactor: 3 Configs: segment.bytes1073741824 Topic: lihw Partition: 0 Leader: 1 Replicas: 1,2,0 Isr: 1,2,0 Topic: lihw Partition: 1 Leader: 1 Replicas: 0,1,2 Isr: 1,2,0 Topic: lihw Partition: 2 Leader: 2 Replicas: 2,0,1 Isr: 2,1,0Follower故障处理流程 offset副本中最后一个节点LEO每个副本的最后一个offsetLEO其实就是最新的offset 1。HW所有副本中最小LEO Follower故障 1Follower故障时会被临时提出ISR队列 2这个期间其他Leader和Follower会继续接受数据 3待该Follower恢复后Follower会读取本地磁盘记录上次的HW将log文件高于HW的部分截取掉从HW像Leader进行同步。 4直到达到高水位线时HW即Follower追上Leader之后就可以重新加入ISR了。 Leader故障处理流程 1leader故障时首先踢出ISR队列 2重新选举新的leaderFollower会截取掉比HW高的数据重新从Leader中拉取 注意只能保证数据一致性不能保证数据不丢失或者不重复 手动调整分区副本存储 在生产环境中每台服务器的配置和性能不一致但是Kafka只会根据自己的代码规则创建对应的分区副本就会导致个别服务器存储压力较大。所有需要手动调整分区副本的存储。 需求创建一个新的topic4个分区两个副本名称为three。将该topic的所有副本都存储到broker0和broker1两台服务器上。 创建一个新的 topic名称为 three。 ./kafka-topics.sh --bootstrap-server localhost:9092 --create --partitions 3 --replication-factor 2 --topic three查看分区副本存储情况。 ./kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic threeTopic: three PartitionCount: 3 ReplicationFactor: 2 Configs: segment.bytes1073741824 Topic: three Partition: 0 Leader: 0 Replicas: 0,1 Isr: 0,1 Topic: three Partition: 1 Leader: 2 Replicas: 2,0 Isr: 2,0 Topic: three Partition: 2 Leader: 1 Replicas: 1,2 Isr: 1,2创建副本存储计划将所有副本都指定存储在 broker0中【与服役新节点退役旧节点类似手动分配分区和副本的存储情况】 vim increase-replication-factor.json{version:1,partitions:[{topic:three,partition:0,replicas:[0]},{topic:three,partition:1,replicas:[0]},{topic:three,partition:2,replicas:[0]}] }执行副本存储计划。 ./kafka-reassign-partitions.sh --bootstrap-server localhost:9092 --reassignment-json-file increase-replication-factor.json --executeCurrent partition replica assignment {version:1,partitions:[{topic:three,partition:0,replicas:[0],log_dirs:[any,any]},{topic:three,partition:1,replicas:[0],log_dirs:[any,any]},{topic:three,partition:2,replicas:[0],log_dirs:[any,any]} ]}Save this to use as the --reassignment-json-file option during rollback Successfully started partition reassignments for three-0,three-1,three-2验证副本存储计划 ./kafka-reassign-partitions.sh --bootstrap-server localhost:9092 --reassignment-json-file increase-replication-factor.json --verify./kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic threeTopic: three PartitionCount: 3 ReplicationFactor: 1 Configs: segment.bytes1073741824 Topic: three Partition: 0 Leader: 0 Replicas: 0 Isr: 0 Topic: three Partition: 1 Leader: 0 Replicas: 0 Isr: 0 Topic: three Partition: 2 Leader: 0 Replicas: 0 Isr: 0增加副本因子 在生产环境当中由于某个主题的重要等级需要提升我们考虑增加副本。副本数的增加需要先制定计划然后根据计划执行。 注意kafka中不能通过命令行的方式增加副本可以通过命令行的方式增加分区数只能增加不能减少 #创建副本存储计划所有副本都指定存储在 broker0、broker1、broker2 中。 vim increase-replication-factor.json{version:1,partitions:[{topic:four,partition:0,replicas:[0,1,2]},{topic:four,partition:1,replicas:[0,1,2]},{topic:four,partition:2,replicas:[0,1,2]} ]}#执行副本存储计划 ./kafka-reassign-partitions.sh --bootstrap-server hadoop102:9092 --reassignment-json-file increase-replication-factor.json --executekafka文件存储机制 ​ Topic是逻辑上的概念而partition是物理上的概念每个partition对应于一个log文件该log文件中存储的就是Producer生产的数据。Producer生产的数据会被不断追加到该log文件末端为防止log文件过大导致数据定位效率低下不需要先查出来数据直接往最后追加也是kafka可以高效读写的原因之一Kafka采取了分片和索引机制将每个partition分为多个segment。segment默认大小为1GB每个segment包括“.index”文件、“.log”文件和.timeindex等文件。这些文件位于一个文件夹下该文件夹的命名规则为topic名称分区序号例如first-0。 Topic 数据存储在server.properties配置文件中设置的logs的位置 #kafka 运行日志(数据)存放的路径路径不需要提前创建kafka自动帮你创建可以配置多个磁盘路径路径与路径之间可以用分隔 log.dirs/opt/module/kafka/datassegment 中 index 文件和 log 文件详解 思考四个问题 1.topic中partition存储分布 在Kafka文件存储中同一个topic下有多个不同partition每个partition为一个目录partiton命名规则为topic名称有序序号第一个partiton序号从0开始序号最大值为partitions数量减1。例如first-0、first-1、first-2每个partition下面有多个segment。 2.partiton中文件存储方式 每个partition(目录)相当于一个巨型文件被平均分配到多个大小相等segment(段)数据文件中。但每个段segment file消息数量不一定相等这种特性方便old segment file快速被删除。每个partition只需要支持顺序读写就行了segment文件生命周期由服务端配置参数决定。 3.partiton中segment文件存储结构 segment file由 segment索引文件、数据文件两部分组成这两个文件一一对应后缀是”.index”和“.log”分别表示为segment索引文件、数据文件。segment文件命名规则partition全局的第一个segment从0开始后续每个segment文件名为上一个segment文件最后一条消息的offset值 1。数值最大为64位long大小19位数字字符长度没有数字用0填充。 4.在partition中如何通过offset查找message: segment的索引文件命令规则起始偏移量(offset)为0.后续每个segment文件名为上一个segment文件最后一条消息的offset值所以第二个文件00000000000000000522.index的文件名是上一个log中最大偏移量15211522其他后续文件依次类推只要根据offset 二分查找 文件列表就可以快速定位到具体文件。 当offset600时定位到00000000000000000522.index|log用index文件名上的数字相对offset计算log文件中数据存在的位置52265587522117639587 600 639所以Offset600的数据在position6410的位置往下顺扫。segment index file采取稀疏索引存储方式不会为每条数据创建索引大大的减少索了引文件大小。 在partition中如何通过offset查找message 总结 根据目标Offset定位Segment文件找到小于等于目标Offset的最大Offset对应的索引项根据索引定位到log文件log文件中从当前索引位置往后顺扫找到对应的记录 kafka日志存储参数配置 参数描述log.segment.bytesKafka 中 log 日志是分成一块块存储的此配置是指 log 日志划分成块的大小默认值 1G。log.index.interval.bytes稀疏索引间存储数据的大小默认 4kbkafka 里面每当写入了 4kb 大小的日志.log然后就往 index 文件里面记录一个索引。log.retention.hoursKafka 中日志保留时间单位是天优先级最低默认为 7 天log.retention.minutesKafka 中日志保留时间单位是小时log.retention.msKafka 中日志保留时间最高优先级毫秒log.retention.check.interval.ms检查日志是否过期的周期时间 默认 5 分钟log.cleanup.policy deleteKafka 中提供的日志清理策略有 delete 和 compact 两种log.retention.bytes超过设置的所有日志总大小删除最早的 segment。默认等于-1表示无穷大也表示关闭 日志存储配置删除/压缩 **删除**如果一个 segment 中有一部分数据过期一部分没有过期未消费完的segment会等最大offset消费完成后在删除 **压缩**compact 日志压缩compact日志压缩对于相同key的不同value值只保留最后一个版本。 注意这种策略只适合特殊场景比如消息的key是用户IDvalue是用户的资料通过这种压缩策略整个消息集里就保存了所有用户最新的资料。 kafka为啥可以高效读写数据 为啥kafka快啊 1Kafka 本身是分布式集群采用分区技术并行度高还可以添加Broker来扩展集群的处理能力。 2读数据采用稀疏索引可以快速定位要消费的数据。 3批量处理Kafka生产者和消费者都支持批处理操作减少了网络通信的开销。 4顺序写磁盘producer 生产数据写入到 log 文件中写的过程是一直追加到文件末端为顺序写免了随机写入的开销。另外还支持数据分区与数据压缩等技术进一步提高了磁盘存储的效率。 5页缓存 零拷贝技术Kafka的Broker结合了页缓存和零拷贝技术将要读取或写入的数据缓存到PageCache中避免了频繁的磁盘IO操作同时采用零拷贝技术避免了数据在用户空间和内核空间之间的多次内存拷贝。 PageCache页缓存Kafka重度依赖底层操作系统提供的PageCache功能。当上层有读写操作时操作系统会优先从PageCache中进行读写如果找不到再去磁盘中读取。实际上PageCache是把尽可能多的空闲内存都当做了磁盘缓存来使用。零拷贝Kafka的数据加工处理操作交由Kafka生产者和Kafka消费者处理。Kafka Broker应用层不关心存储的数据所以就不用走应用层传输效率高。传统上数据从应用程序缓冲区传输到网络或磁盘之前需要进行多次内存拷贝。采用零拷贝后Kafka可以直接从页缓存中读取或写入数据避免了数据在用户空间和内核空间之间的IO操作减少了CPU和内存的开销。 8. kafak消费者 消费者总体工作流程 kafka消费方式 consumer采用从broker中主动拉取的方式去消费数据pull。为什么不是broker主动推送呢push是因为由broker决定发送速率很难适应所有消费者。pull模式的缺点是kafka没有数据时消费者可能会陷入循环中一直返回空数据。 消费者组 Consumer GroupCG消费者组由多个consumer组成所有消费者的groupid相同。消费者组内每个消费者消费不同的分区一个分区只能由一个组内的一个消费者消费。消费者组之间互不影响。 offset 每个消费者的offset由消费者提交到系统主题(Topic)保存__consumer_offsets该主题默认有50个分区 消费者组的初始化流程 coordinator 辅助实现消费者组的初始化和分区的分配。 coordinator节点的选择 groupid的hashcode值 % 50 consumer_offsets的分区数量的值就是系统主题consumer_offsets主题所在的分区号。选择该broker上的coordinator作为这个消费者组的老大。消费者组下的所有的消费者提交offset的时候就往这个分区去提交offset。 消费者组初始化流程 groupid的hashcode值 % 50 分区号选择该分区所在的broker上的coordinator作为消费者的老大。每个consumer都发送JoinGroup请求到coordinatorcoordinator选出一个consumer作为leader把要消费的topic情况发送给leader消费者。leader负责制定消费方案把消费方案发送给coordinatorcoordinator把消费方案下发给各个consumer。消费者和coordinator的心跳检测默认3s一旦超过session.timeout.ms45s则该消费者会被移除。或者消费者处理消息的时间过长于max.poll.interval.ms5min都会触发再平衡。 消费者重要参数 参数描述bootstrap.servers向 Kafka 集群建立初始连接用到的 host/port 列表。key.deserializer 和 value.deserializer指定接收消息的 key 和 value 的反序列化类型。一定要写全类名。group.id标记消费者所属的消费者组。enable.auto.commit自动提交offset开关默认值为 true消费者会自动周期性地向服务器提交偏移量。auto.commit.interval.ms消费者偏移量向Kafka提交的频率默认5s。如果设置自动提交offset时才生效auto.offset.reset当 Kafka 中没有初始偏移量或当前偏移量在服务器中不存在如数据被删除了该如何处理 earliest自动重置偏移量到最早的偏移量。latest默认自动重置偏移量为最新的偏移量。none如果消费组原来的previous偏移量offsets.topic.num.partitions__consumer_offsets 的分区数默认是 50 个分区。heartbeat.interval.msKafka 消费者和 coordinator 之间的心跳时间默认 3s。 该条目的值必须小于 session.timeout.ms 也不应该高于session.timeout.ms 的 1/3。☘️session.timeout.msKafka consumer 和 coordinator 之间连接超时时间默认 45s。 超过该值该消费者被移除消费者组执行再平衡。☘️max.poll.interval.ms消费者处理消息的最大时长默认是 5 分钟。超过该值该消费者被移除消费者组执行再平衡。fetch.min.bytes消费者获取服务器端一批消息最小的字节数。 默认 1 个字节fetch.max.wait.ms如果没有从服务器端获取到一批数据的最小字节数。该时间到仍然会返回数据。默认 500ms。fetch.max.bytes默认 Default: 5242880050 m。消费者获取服务器端一批消息最大的字节数。如果服务器端一批次的数据大于该值 50m仍然可以拉取回来这批数据因此这不是一个绝对最大值。一批次的大小受 message.max.bytes broker configor max.message.bytes topic config影响。max.poll.records一次 poll 拉取数据返回消息的最大条数默认是 500 条。 消费者API 订阅主题 创建一个独立消费者消费 first 主题中数据。 package com.lihw.kafka.consumer; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.serialization.StringDeserializer; import java.time.Duration; import java.util.ArrayList; import java.util.Properties;public class CustomConsumer {public static void main(String[] args) {// 1.创建消费者的配置对象Properties properties new Properties();// 2.给消费者配置对象添加参数properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 10.1.61.121:9092,10.1.61.122:9092);// 配置序列化 必须properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());// 配置消费者组组名任意起名 必须properties.put(ConsumerConfig.GROUP_ID_CONFIG, test2);// 创建消费者对象KafkaConsumerString, String kafkaConsumer new KafkaConsumerString, String(properties);// 注册要消费的主题可以消费多个主题ArrayListString topics new ArrayList();topics.add(first);kafkaConsumer.subscribe(topics);// 拉取数据打印while (true) {// 设置 1s 中消费一批数据ConsumerRecordsString, String consumerRecords kafkaConsumer.poll(Duration.ofSeconds(1));// 打印消费到的数据for (ConsumerRecordString, String consumerRecord : consumerRecords) {System.out.println(consumerRecord);}}} }订阅分区 创建一个独立消费者消费 first 主题 0 号分区的数据。 package com.lihw.kafka.consumer;import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.serialization.StringDeserializer; import java.time.Duration; import java.util.ArrayList; import java.util.Properties;public class CustomConsumerPartition {public static void main(String[] args) {// 1.创建消费者的配置对象Properties properties new Properties();// 2.给消费者配置对象添加参数properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 10.1.61.121:9092,10.1.61.122:9092);// 配置序列化 必须properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());// 配置消费者组组名任意起名 必须properties.put(ConsumerConfig.GROUP_ID_CONFIG, test2);// 创建消费者对象KafkaConsumerString, String kafkaConsumer new KafkaConsumerString, String(properties);// 消费某个主题的某个分区数据ArrayListTopicPartition topicPartitions new ArrayList();topicPartitions.add(new TopicPartition(first, 0));kafkaConsumer.assign(topicPartitions);// 拉取数据打印while (true) {// 设置 1s 消费一批数据ConsumerRecordsString, String consumerRecords kafkaConsumer.poll(Duration.ofSeconds(1));// 打印消费到的数据for (ConsumerRecordString, String consumerRecord : consumerRecords) {System.out.println(consumerRecord);}}} }消费者组案例 测试同一个主题的分区数据生产者发送到三个分区的消费情况 如果只有一个消费者会消费三个分区的数据如果有两个消费者其中一个consumer会消费两个分区三个消费者一人一个分区进行消费 分区的分配以及再平衡 一个consumer group中有多个consumer组成一个 topic有多个partition组成现在的问题是到底由哪个consumer来消费哪个partition的数据。Kafka有四种主流的分区分配策略 Range、RoundRobin、Sticky、CooperativeSticky。可以通过配置参数partition.assignment.strategy修改分区的分配策略。默认策略是Range CooperativeSticky。Kafka可以同时使用多个分区分配策略。 参数描述heartbeat.interval.msKafka 消费者和 coordinator 之间的心跳时间默认 3s。该条目的值必须小于 session.timeout.ms也不应该高于session.timeout.ms 的 1/3。session.timeout.msKafka 消费者和 coordinator 之间连接超时时间默认 45s。超过该值消费者被移除消费者组执行再平衡。max.poll.interval.ms消费者处理消息的最大时长默认是 5 分钟。超过该值该消费者被移除消费者组执行再平衡。partition.assignment.strategy消 费 者 分 区 分 配 策 略 默 认 策 略 是 Range CooperativeSticky。Kafka 可以同时使用多个分区分配策略。 可 以 选 择 的 策 略 包 括 Range 、 RoundRobin 、 Sticky 、 Range 分区数 / 消费者数进行平均分多余的分给第一个consumer。 **Range分区分配策略**分区数 / 消费者数 每个消费者应该消费几个分区如果除不尽前面的消费者会多消费一个分区。 例如7 / 3 2 余 1那么每个消费者消费两个分区多余的一个给第一个消费者。8 / 3 2 余 2每个小给这消费2个分区多余的两个分给前两个消费者 **Range再平衡案例**消费者组需要按照超时时间 45s 来判断它是否退出所以需要等待时间到了 45s 后判断它真的退出就会把1任务分配给其他 broker 执行。挂掉的broker全部打到某一个消费者上面 0消费者0 、1、2 1消费者3、4 2消费者5、6 2再次重新发送消息观看结果45s 以后。 1 号消费者消费到 0、1、2、3 号分区数据。 2 号消费者消费到 4、5、6 号分区数据。 注意该分区策略在消费的主题多的情况下容易造成数据倾斜 RoundRobin轮询 RoundRobin分区策略 RoundRobin 针对集群中所有Topic而言。RoundRobin 轮询分区策略是把所有的 partition 和所有的consumer 都列出来然后按照 hashcode 进行排序最后通过轮询算法来分配 partition 给到各个消费者。 RoundRobin再平衡 45s后宕掉的消费者的任务会按照 ’轮询‘ 的方式把数据轮询分给其他消费者进行消费。 修改分区策略 // 修改分区分配策略 properties.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,org.apache.kafka.clients.consumer.RoundRobinAssignor);Sticky粘性 Sticky分区策略 可以理解为分配的结果带有“粘性的”。即在执行一次新的分配之前考虑上一次分配的结果尽量少的调整分配的变动可以节省大量的开销。粘性分区是 Kafka 从 0.11.x 版本开始引入这种分配策略首先会尽量均衡的放置分区到消费者上面在出现同一消费者组内消费者出现问题的时候会尽量保持原有分配的分区不变化。 Sticky再平衡 0 号消费者的任务会按照粘性规则尽可能均衡的随机分配交由其他消费者消费。45s以后会重新分配 修改分区策略 // 修改分区分配策略 ArrayListString startegys new ArrayList(); startegys.add(org.apache.kafka.clients.consumer.StickyAssignor); properties.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, startegys);offset偏移量 __consumer_offsets 主题里面采用 key 和 value 的方式存储数据。key 是 group.idtopic分区号value 就是当前 offset 的值。每隔一段时间kafka 内部会对这个 topic 进行compact也就是每个 group.idtopic分区号就保留最新数据。 自动提交 offset 为了使我们能够专注于自己的业务逻辑Kafka提供了自动提交offset的功能。 自动提交offset的相关参数 enable.auto.commit是否开启自动提交offset功能默认是trueauto.commit.interval.ms自动提交offset的时间间隔默认是5s // 是否自动提交 offsetproperties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);// 提交 offset 的时间周期 1000ms默认 5s properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 5000);手动提交 offset 虽然自动提交offset十分简单便利但由于其是基于时间提交的开发人员难以把握offset提交的时机。因此Kafka还提供了手动提交offset的API。 手动提交offset的方法有两种 commitSync同步提交必须等待offset提交完毕再去消费下一批数据。commitAsync异步提交 发送完提交offset请求后就开始消费下一批数据了。 两者的相同点是都会将本次提交的一批数据最高的偏移量提交不同点是同步提交阻塞当前线程一直到提交成功并且会自动失败重试由不可控因素导致也会出现提交失败而异步提交则没有失败重试机制故有可能提交失败。 同步提交offset 由于同步提交 offset 有失败重试机制故更加可靠但是由于一直等待提交结果提交的效率比较低 public class CustomConsumerByHandSync {public static void main(String[] args) {// 1. 创建 kafka 消费者配置类Properties properties new Properties();// 2. 添加配置参数// 添加连接properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, localhost:9092);// 配置序列化 必须properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,org.apache.kafka.common.serialization.StringDeserializer);properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,org.apache.kafka.common.serialization.StringDeserializer);// 配置消费者组 properties.put(ConsumerConfig.GROUP_ID_CONFIG, test);// 是否自动提交 offsetproperties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);//3. 创建 kafka 消费者KafkaConsumerString, String consumer new KafkaConsumer(properties);//4. 设置消费主题 形参是列表consumer.subscribe(Arrays.asList(first));//5. 消费数据while (true){// 读取消息ConsumerRecordsString, String consumerRecords consumer.poll(Duration.ofSeconds(1));// 输出消息for (ConsumerRecordString, String consumerRecord : consumerRecords) {System.out.println(consumerRecord.value());}// 同步提交 offsetconsumer.commitSync();}} } 异步提交offset 虽然同步提交 offset 更可靠一些但是由于其会阻塞当前线程直到提交成功。因此吞吐量会受到很大的影响 public class CustomConsumerByHandSync {public static void main(String[] args) {// 1. 创建 kafka 消费者配置类Properties properties new Properties();// 2. 添加配置参数// 添加连接properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, localhost:9092);// 配置序列化 必须properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,org.apache.kafka.common.serialization.StringDeserializer);properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,org.apache.kafka.common.serialization.StringDeserializer);// 配置消费者组 properties.put(ConsumerConfig.GROUP_ID_CONFIG, test);// 是否自动提交 offsetproperties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);//3. 创建 kafka 消费者KafkaConsumerString, String consumer new KafkaConsumer(properties);//4. 设置消费主题 形参是列表consumer.subscribe(Arrays.asList(first));//5. 消费数据while (true){// 读取消息ConsumerRecordsString, String consumerRecords consumer.poll(Duration.ofSeconds(1));// 输出消息for (ConsumerRecordString, String consumerRecord : consumerRecords) {System.out.println(consumerRecord.value());}// 异步提交 offsetconsumer.commitAsync();}} } 指定 Offset 消费 auto.offset.reset earliest | latest | none 默认是 latest。 当 Kafka 中没有初始偏移量消费者组第一次消费或服务器上不再存在当前偏移量时例如该数据已被删除该怎么办 1earliest自动将偏移量重置为最早的偏移量–from-beginning。 2latest默认值自动将偏移量重置为最新偏移量。 3none如果未找到消费者组的先前偏移量则向消费者抛出异常。 4任意指定 offset 位移开始消费 任意指定 offset 位移开始消费 public class CustomConsumerByHandSync {public static void main(String[] args) {// 1. 创建 kafka 消费者配置类Properties properties new Properties();// 2. 添加配置参数// 添加连接properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, localhost:9092);// 配置序列化 必须properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,org.apache.kafka.common.serialization.StringDeserializer);properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,org.apache.kafka.common.serialization.StringDeserializer);// 配置消费者组 properties.put(ConsumerConfig.GROUP_ID_CONFIG, test);// 是否自动提交 offsetproperties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);//3. 创建 kafka 消费者KafkaConsumerString, String consumer new KafkaConsumer(properties);// 2 订阅一个主题ArrayListString topics new ArrayList();topics.add(first);kafkaConsumer.subscribe(topics);// 获取消费者分区信息并指定offset进行消费SetTopicPartition assignment new HashSet();while (assignment.size() 0) {kafkaConsumer.poll(Duration.ofSeconds(1));// 获取消费者分区分配信息有了分区分配信息才能开始消费assignment kafkaConsumer.assignment();}// 遍历所有分区并指定 offset 从 1700 的位置开始消费for (TopicPartition tp: assignment) {kafkaConsumer.seek(tp, 1700);}// 消费数据while (true){// 读取消息ConsumerRecordsString, String consumerRecords consumer.poll(Duration.ofSeconds(1));// 输出消息for (ConsumerRecordString, String consumerRecord : consumerRecords) {System.out.println(consumerRecord.value());}}} } 指定时间进行消费 在生产环境中会遇到最近消费的几个小时数据异常想重新按照时间消费。例如要求按照时间消费前一天的数据怎么处理 public class CustomConsumerByHandSync {public static void main(String[] args) {// 1. 创建 kafka 消费者配置类Properties properties new Properties();// 2. 添加配置参数// 添加连接properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, localhost:9092);// 配置序列化 必须properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,org.apache.kafka.common.serialization.StringDeserializer);properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,org.apache.kafka.common.serialization.StringDeserializer);// 配置消费者组 properties.put(ConsumerConfig.GROUP_ID_CONFIG, test);// 是否自动提交 offsetproperties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);//3. 创建 kafka 消费者KafkaConsumerString, String consumer new KafkaConsumer(properties);// 2 订阅一个主题ArrayListString topics new ArrayList();topics.add(first);kafkaConsumer.subscribe(topics);// 获取消费者分区信息并指定offset进行消费SetTopicPartition assignment new HashSet();while (assignment.size() 0) {kafkaConsumer.poll(Duration.ofSeconds(1));// 获取消费者分区分配信息有了分区分配信息才能开始消费assignment kafkaConsumer.assignment();}HashMapTopicPartition, Long timestampToSearch new HashMap(); // 封装集合存储每个分区对应一天前的数据for (TopicPartition topicPartition : assignment) {timestampToSearch.put(topicPartition, System.currentTimeMillis() - 1 * 24 * 3600 * 1000);}// 获取从 1 天前开始消费的每个分区的 offsetMapTopicPartition, OffsetAndTimestamp offsets kafkaConsumer.offsetsForTimes(timestampToSearch);// 遍历每个分区对每个分区设置消费时间。for (TopicPartition topicPartition : assignment) {OffsetAndTimestamp offsetAndTimestamp offsets.get(topicPartition);// 根据时间指定开始消费的位置if (offsetAndTimestamp ! null){kafkaConsumer.seek(topicPartition, offsetAndTimestamp.offset());}}// 消费数据while (true){// 读取消息ConsumerRecordsString, String consumerRecords consumer.poll(Duration.ofSeconds(1));// 输出消息for (ConsumerRecordString, String consumerRecord : consumerRecords) {System.out.println(consumerRecord.value());}}} } 漏消费和重复消费 重复消费已经消费了数据但是 offset 没提交。 场景1重复消费。自动提交offset引起。默认5s一次 漏消费先提交 offset 后消费有可能会造成数据的漏消费。 场景2消费者还没落盘导致消费者漏消费 消费者事务 如果想完成Consumer端的精准一次性消费那么需要Kafka消费端将消费过程和提交offset过程做原子绑定。此时我们需要将Kafka的offset保存到支持事务的自定义介质比 如MySQL 数据积压消费者如何提高吞吐量 1如果是Kafka消费能力不足则可以考虑增加Topic的分区数并且同时提升消费组的消费者数量消费者数 分区数 2如果是下游的数据处理不及时提高每批次拉取的数量。批次拉取数据过少拉取数据/处理时间 生产速度使处理的数据小于生产的数据也会造成数据积压。 fetch.max.bytes消费者一次 poll 抓取一批消息的最大的字节数。默认 Default: 5242880050 mmax.poll.records消费者一次 poll 抓取数据的最大条数默认 500 条
http://www.zqtcl.cn/news/513091/

相关文章:

  • 精品课程网站怎么做建筑图纸符号大全解释
  • 高权重网站 内页做跳转给新网站许昌做网站公司哪家专业
  • 咸阳网站建设工作室网站建设经
  • 网站怎么做短信接口新浪wordpress
  • 方维o2o 2.9蓝色团购网站程序源码模板做一电影网站怎么赚钱
  • 口碑好网站建设资源新昌网站建设
  • 苏州做网站的公司排名泉州网络推广专员
  • 无为县做互联网网站备案的时候网站建设方案书要吗
  • 修改网站的备案主体dede网站地图不显示文章列表
  • 建立个人网站的成本织梦html5手机网站模板
  • 怎么自己建一个网站吗php网页设计培训
  • 深圳大型论坛网站建设wordpress国内加速
  • 仿站怎么做广告装饰公司名字
  • 黄冈网站推广收费标准wordpress导航页面设置密码
  • 做网站会犯法吗贵州省建设厅城乡建设网站
  • 做网站和做公众号资金盘网站怎么建设
  • 全国最好的网站建设案例推广方法视频
  • 嘉兴网站建设策划方案在海口注册公司需要什么条件
  • 旅游网站国际业务怎样做建设企业官方网站企业登录
  • 北京市昌平网站建设小米网络营销案例分析
  • 怎么利用360域名做网站微信商城怎么弄
  • 中山h5网站建设天津网站建设技术托管
  • 建网站买的是什么商城网站建设合同
  • 购物网站制作样例有没有专门学做婴儿衣服的网站
  • 济南网站建设 找小七买友情链接有用吗
  • 南阳网站建设域名公司泉州关键词排名seo
  • 网站建设在线推广宁夏快速自助制作网站
  • 专业网站建设好不好wordpress编辑文章更新失败
  • 河南郑州网站建设哪家公司好html5 网站正在建设中
  • 免费ppt模板下载医学类江门seo网站推广