郑州网站建设服务,在线设计logo免费网站,网站建设售后支持,wordpress 5.2设置中文目录 Kafka分区机制生产者分区写入策略轮询策略随机策略#xff08;不用#xff09;按key分配策略乱序问题自定义分区策略 消费者组Rebalance机制消费者分区分配策略Range范围分配策略RoundRobin轮询策略Stricky粘性分配策略 Kafka副本机制producer的ACKs参数acks配置为0acks… 目录 Kafka分区机制生产者分区写入策略轮询策略随机策略不用按key分配策略乱序问题自定义分区策略 消费者组Rebalance机制消费者分区分配策略Range范围分配策略RoundRobin轮询策略Stricky粘性分配策略 Kafka副本机制producer的ACKs参数acks配置为0acks配置为1acks配置为-1或者all 高级High LevelAPI与低级Low LevelAPI高级High LevelAPI低级Low LevelAPI 监控工具Kafka-eagle安装Kafka-Eagle Kafka原理分区的leader与followerAR、ISR、OSRControllerController的选举 leader负载均衡Preferred Replica Kafka 生产、消费数据工作流程Kafka数据写入流程Kafka数据消费流程两种消费模式 Kafka的数据存储形式存储日志读取消息删除消息 消息不丢失机制broker数据不丢失生产者数据不丢失消费者数据不丢失 数据积压Lag标签解决数据积压问题 Kafka中数据清理Log Deletion定时日志删除任务基于时间的保留策略基于日志大小的保留策略基于日志起始偏移量保留策略 日志压缩Log Compaction Kafka配额限速机制Quotas限制producer端速率限制consumer端速率取消Kafka的Quota配置 Kafka分区机制
生产者分区写入策略
生产者写入消息到 topicKafka 将依据不同的策略将数据分配到不同的分区中。
轮询分区策略随机分区策略按key分区分配策略自定义分区策略
轮询策略 默认的策略也是使用最多的策略可以最大限度保证所有消息平均分配到一个分区。如果在生产消息时key为null则使用轮询算法均衡地分配分区。
随机策略不用 随机策略每次都随机地将消息分配到每个分区。在较早的版本默认的分区策略就是随机策略也是为了将消息均衡地写入到每个分区。但后续轮询策略表现更佳所以基本上很少会使用随机策略。
按key分配策略 按key分配策略有可能会出现【数据倾斜】例如某个key包含了大量的数据因为key值一样所有所有的数据将都分配到一个分区中造成该分区的消息数量远大于其他的分区。
乱序问题 轮询策略、随机策略都会导致一个问题生产到Kafka中的数据是乱序存储的。 而按key分区可以一定程度上实现数据有序存储——也就是局部有序但这又可能会导致数据倾斜所以在实际生产环境中要结合实际情况来做取舍。
自定义分区策略 创建自定义分区器
public class KeyWithRandomPartitioner implements Partitioner {private Random r;Overridepublic void configure(MapString, ? configs) {r new Random();}Overridepublic int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {//cluster.partitionCountForTopic 表示获取指定topic的分区数量//r.nextInt(1000) 表示从随机数生成器 r 中随机生成一个小于1000的整数其中参数1000指定了生成的随机数的范围即生成的随机数是0到999之间的整数。//在这段代码中生成的随机数将被用于计算消息所在的分区编号。由于模运算 % cluster.partitionCountForTopic(topic) 的结果必须小于分区数量因此这里对1000取模的目的是将随机数的范围缩小到分区数量内以确保不会选择到超出范围的分区编号。return r.nextInt(1000) % cluster.partitionCountForTopic(topic);}Overridepublic void close() {}
}在Kafka生产者配置中自定使用自定义分区器的类名
props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, KeyWithRandomPartitioner.class.getName());消费者组Rebalance机制 Kafka 中的 Rebalance 称之为再均衡是 Kafka 中确保 Consumer group 下所有的consumer 如何达成一致分配订阅的 topic 的每个分区的机制。
Rebalance触发的时机有 消费者组中consumer的个数发生变化。例如有新的consumer加入到消费者组或者是某个consumer停止了。 订阅的topic个数发生变化消费者可以订阅多个主题假设当前的消费者组订阅了三个主题但有一个主题突然被删除了此时也需要发生再均衡。 订阅的topic分区数发生变化 Rebalance的不良影响 发生 Rebalance 时consumer group 下的所有 consumer 都会协调在一起共同参与Kafka 使用分配策略尽可能达到最公平的分配。 Rebalance 过程会对 consumer group 产生非常严重的影响Rebalance 的过程中所有的消费者都将停止工作直到 Rebalance 完成。
消费者分区分配策略
Range范围分配策略RoundRobin轮询策略Stricky粘性分配策略
Range范围分配策略
Range范围分配策略是Kafka默认的分配策略它可以确保每个消费者消费的分区数量是均衡的。 注Range 范围分配策略是针对 每个Topic的。 配置消费者的partition.assignment.strategy为org.apache.kafka.clients.consumer.RangeAssignor
props.put(partition.assignment.strategy, org.apache.kafka.clients.consumer.RangeAssignor);算法公式 n 分区数量 / 消费者数量 m 分区数量 % 消费者数量 前m个消费者消费n1个 剩余消费者消费n个 RoundRobin轮询策略 RoundRobinAssignor 轮询策略是将消费组内所有消费者以及消费者所订阅的 所有topic的partition 按照字典序排序topic和分区的hashcode进行排序然后通过轮询方式逐个将分区以此分配给每个消费者。
配置消费者的partition.assignment.strategy为org.apache.kafka.clients.consumer.RoundRobinAssignor
props.put(partition.assignment.strategy, org.apache.kafka.clients.consumer.RoundRobinAssignor);Stricky粘性分配策略 从Kafka 0.11.x开始引入此类分配策略。主要目的分区分配尽可能均匀。 在发生 rebalance 的时候分区的分配尽可能与上一次分配保持相同。没有发生rebalance 时Striky粘性分配策略和 RoundRobin 分配策略类似。 上面如果 consumer2 崩溃了此时需要进行rebalance。如果是Range分配和轮询分配都会重新进行分配例如 可以发现consumer0 和 consumer1 原来消费的分区大多发生了改变。接下来我们再来看下粘性分配策略。 Striky粘性分配策略保留rebalance之前的分配结果。这样只是将原先consumer2负责的两个分区再均匀分配给consumer0、consumer1。这样可以明显减少系统资源的浪费。 例如之前consumer0、consumer1之前正在消费某几个分区但由于rebalance发生导致consumer0、consumer1需要重新消费之前正在处理的分区导致不必要的系统开销。例如某个事务正在进行就必须要取消了
Kafka副本机制 副本的目的就是冗余备份当某个Broker上的分区数据丢失时依然可以保障数据可用。因为在其他的Broker上的副本是可用的。
producer的ACKs参数 对副本关系较大的就是producer配置的acks参数了acks参数表示当生产者生产消息的时候写入到副本的要求严格程度。它决定了生产者如何在性能和可靠性之间做取舍。
// 配置ACKs参数
props.put(acks, all);acks配置为0
acks 0生产者只管写入不管是否写入成功可能会数据丢失。性能是最好的。 ACK为0基准测试
bin/kafka-producer-perf-test.sh --topic benchmark --num-records 5000000 --throughput -1 --record-size 1000 --producer-props bootstrap.servers10.211.55.8:9092 acks0acks配置为1
生产者会等到leader分区写入成功后返回成功接着发送下一条性能中等。
acks配置为-1或者all
确保消息写入到leader分区、还确保消息写入到对应副本都成功后接着发送下一条性能是最差的。 基准测试结果
指标单分区单副本ack0单分区单副本(ack1)单分区单副本(ack-1/all)吞吐量47359.248314 records/sec 每秒4.7W条记录40763.417279 records/sec 每秒4W条记录540.5 /s 每秒7.3W调记录吞吐速率45.17 MB/sec 每秒约45MB数据38.88 MB/sec 每秒约89MB数据0.52 MB/sec平均延迟时间686.49 ms avg latency799.67 ms avg latency120281.8 ms最大延迟时间1444.00 ms max latency1961.00 ms max latency1884.00 ms 根据业务情况来选择ack机制是要求性能最高一部分数据丢失影响不大可以选择0/1。如果要求数据一定不能丢失就得配置为-1/all。 分区中是有leader和follower的概念为了确保消费者消费的数据是一致的只能从分区leader去读写消息follower做的事情就是同步数据Backup。
高级High LevelAPI与低级Low LevelAPI
高级High LevelAPI
消费Kafka的消息很容易实现写起来比较简单。不需要执行去管理offset直接通过ZK管理也不需要管理分区、副本由Kafka统一管理。消费者会自动根据上一次在ZK中保存的offset去接着获取数据。在ZK中不同的消费者组group同一个topic记录不同的offset这样不同程序读取同一个topic不会受offset的影响。高级API的缺点不能控制offset例如想从指定的位置读取不能细化控制分区、副本、ZK等。
// 消费者程序从test主题中消费数据
public class ConsumerTest {public static void main(String[] args) {// 1. 创建Kafka消费者配置Properties props new Properties();props.setProperty(bootstrap.servers, 192.168.88.100:9092);props.setProperty(group.id, test);props.setProperty(enable.auto.commit, true);props.setProperty(auto.commit.interval.ms, 1000);props.setProperty(key.deserializer, org.apache.kafka.common.serialization.StringDeserializer);props.setProperty(value.deserializer, org.apache.kafka.common.serialization.StringDeserializer);// 2. 创建Kafka消费者KafkaConsumerString, String consumer new KafkaConsumer(props);// 3. 订阅要消费的主题consumer.subscribe(Arrays.asList(test));// 4. 使用一个while循环不断从Kafka的topic中拉取消息while (true) {// 定义100毫秒超时ConsumerRecordsString, String records consumer.poll(Duration.ofMillis(100));for (ConsumerRecordString, String record : records)System.out.printf(offset %d, key %s, value %s%n, record.offset(), record.key(), record.value());}}
}低级Low LevelAPI 通过使用低级API我们可以自己来控制offset想从哪儿读就可以从哪儿读。 而且可以自己控制连接分区对分区自定义负载均衡。而且之前 offset 是自动保存在ZK中使用低级API可以将offset不一定要使用 ZK 存储可以自己来存储offset。 例如存储在文件、MySQL、或者内存中。但是低级API比较复杂需要执行控制offset连接到哪个分区并找到分区的leader。
示例手动消费分区数据 之前高级High LevelAPI我们让Kafka根据消费组中的消费者动态地为topic分配要消费的分区。但在某些时候需要指定要消费的分区例如如果某个程序将某个指定分区的数据保存到外部存储中例如Redis、MySQL那么保存数据的时候只需要消费该指定的分区数据即可。如果某个程序是高可用的在程序出现故障时将自动重启(例如Flink、Spark程序)。这种情况下程序将从指定的分区重新开始消费数据。
不再使用之前的 subscribe 方法订阅主题而使用 assign 方法指定想要消费的消息
String topic test;
TopicPartition partition0 new TopicPartition(topic, 0);
TopicPartition partition1 new TopicPartition(topic, 1);
consumer.assign(Arrays.asList(partition0, partition1));一旦指定了分区就可以就像前面的示例一样在循环中调用 poll 方法消费消息。 注 当手动管理消费分区时即使 GroupID 是一样的Kafka的组协调器都将不再起作用。 如果消费者失败也将不再自动进行分区重新分配。 监控工具Kafka-eagle 开发工作中当业务前提不复杂时可以使用 Kafka 命令来进行一些集群的管理工作。但如果业务变得复杂例如需要增加 group、topic 分区此时使用命令行就感觉很不方便此时如果使用一个可视化的工具帮助完成日常的管理工作将会大大提高对于 Kafka 集群管理的效率而且使用工具来监控消费者在 Kafka 中消费情况。 Kafka Eagle是一款结合了目前大数据 Kafka 监控工具的特点重新研发的一块开源免费的Kafka集群优秀的监控工具。它可以非常方便的监控生产环境中的offset、lag变化、partition分布、owner等。官网地址https://www.kafka-eagle.org/
安装Kafka-Eagle
开启Kafka JMX端口 JMX(Java Management Extensions)是一个为应用程序植入管理功能的框架。JMX是一套标准的代理和服务实际上用户可以在任何Java应用程序中使用这些代理和服务实现管理。很多的一些软件都提供了JMX接口来实现一些管理、监控功能。
在启动Kafka的脚本前添加 【Kafka】消息队列Kafka基础——编写Kafka一键启动/关闭脚本 已添加
cd ${KAFKA_HOME}
export JMX_PORT9988
nohup bin/kafka-server-start.sh config/server.properties 安装Kafka-Eagle
需提前准备好mysql数据库并创建ke数据库。安装JDK并配置好JAVA_HOME。
# 将kafka_eagle上传并解压到 /export/server 目录中。
cd cd /export/software/
tar -xvzf kafka-eagle-bin-3.0.1.tar.gz -C ../server/cd /export/server/kafka-eagle-bin-3.0.1/
tar -xvzf efak-web-3.0.1-bin.tar.gz
cd /export/server/kafka-eagle-bin-3.0.1/efak-web-3.0.1# 配置 kafka_eagle 环境变量。
vim /etc/profile
export KE_HOME/export/server/kafka-eagle-bin-1.4.6/kafka-eagle-web-1.4.6
export PATH$PATH:$KE_HOME/bin
source /etc/profile配置 kafka_eagle。使用 vi 打开 conf 目录下的 system-config.properties
vim conf/system-config.properties
# 修改第4行配置kafka集群别名
kafka.eagle.zk.cluster.aliascluster1
# 修改第5行配置ZK集群地址
cluster1.zk.listnode1.itcast.cn:2181,node2.itcast.cn:2181,node3.itcast.cn:2181
# 注释第6行
#cluster2.zk.listxdn10:2181,xdn11:2181,xdn12:2181
# 修改第32行打开图标统计
kafka.eagle.metrics.chartstrue
kafka.eagle.metrics.retain30# 注释第69行取消sqlite数据库连接配置
#kafka.eagle.driverorg.sqlite.JDBC
#kafka.eagle.urljdbc:sqlite:/hadoop/kafka-eagle/db/ke.db
#kafka.eagle.usernameroot
#kafka.eagle.passwordwww.kafka-eagle.org# 修改第77行开启mysql
kafka.eagle.drivercom.mysql.jdbc.Driver
kafka.eagle.urljdbc:mysql://10.211.55.8:3306/ke?useUnicodetruecharacterEncodingUTF-8zeroDateTimeBehaviorconvertToNull
kafka.eagle.usernameroot
kafka.eagle.password52809329# 启动脚本ke.sh中配置JAVA_HOME
cd /export/server/kafka-eagle-bin-3.0.1/efak-web-3.0.1/binvim ke.sh
# 在第24行添加JAVA_HOME环境配置
export JAVA_HOME/usr/lib/jvm/java-1.8.0-openjdk-arm64# 修改Kafka eagle可执行权限
cd /export/server/kafka-eagle-bin-3.0.1/efak-web-3.0.1/bin
chmod x ke.sh# 启动 kafka_eagle
./ke.sh start#访问Kafka eagle默认用户为admin密码为123456
http://10.211.55.8:8048/keKafka eagle界面中Kafka度量指标 topic list 点击Topic下的List菜单就可以展示当前Kafka集群中的所有topic。
指标意义Brokers Spreadbroker使用率Brokers Skew分区是否倾斜Brokers Leader Skewleader partition是否存在倾斜
Kafka原理
分区的leader与follower 在Kafka中每个 topic 都可以配置多个分区以及多个副本。 每个分区都有一个leader以及0个或者多个 follower。在创建 topic 时Kafka会将每个分区的leader均匀地分配在每个broker上。 正常使用kafka是感觉不到 leader、follower的存在的。但其实所有的读写操作都是由leader处理而所有的follower都复制leader的日志数据文件如果leader出现故障时follower就会被选举为leader。 总结 Kafka中的leader负责处理读写操作而follower只负责副本数据的同步。如果leader出现故障其他follower会被重新选举为leaderfollower像一个consumer一样拉取leader对应分区的数据并保存到日志数据文件中 查看某个partition的leader在哪个服务器中
示例 名为 test 的3个分区、3个副本的topic。 AR、ISR、OSR 在实际环境中leader 有可能会出现一些故障所以 Kafka 一定会选举出新的leader。 Kafka中把follower可以按照不同状态分为三类——AR、ISR、OSR。
分区的所有副本称为 AR Assigned Replicas已分配的副本所有与leader副本保持一定程度同步的副本包括 leader 副本在内组成 ISRIn-Sync Replicas 在同步中的副本由于 follower 副本同步滞后过多的副本不包括 leader 副本组成 OSR Out-of-Sync RepliasAR ISR OSR正常情况下所有的 follower 副本都应该与 leader 副本保持同步即 AR ISROSR集合为空。 在 Kafka中C是ISR 队列中最小的 LEO。 A. LEO B. ISR C. HW D. AR ALEO代表当前日志文件中下一条指的是每个副本最大的offset BISR代表副本同步队列 CHW指的是消费者能见到的最大的 offsetISR 队列中最小的 LEOC正确。 DAR代表所有副本 Controller Kafka启动时会在所有的 broker 中选择一个Controller。前面 leader 和 follower 是针对partition而 Controller 是针对 broker 的。 创建 topic、或者添加分区、修改副本数量之类的管理任务都是由 Controller 完成的。Kafka 分区leader 的选举也是由 Controller 决定的。
Controller的选举 在 Kafka 集群启动的时候每个 broker 都会尝试去 ZooKeeper 上注册成为ControllerZK临时节点但只有一个竞争成功其他的 broker 会注册该节点的监视器。一但该临时节点状态发生变化就可以进行相应的处理。Controller 也是高可用的一旦某个 broker 崩溃其他的 broker 会重新注册为Controller。
找到当前Kafka集群的 Controller Kafka Tools的 Tools 菜单找到 ZooKeeper Brower查看到哪个 broker 是 Controller Controller 选举 partition leader
所有 Partition 的 leader 选举都由 Controller 决定Controller 会将 leader 的改变直接通过 RPC 的方式通知需为此作出响应的 BrokerController 读取到当前分区的 ISR只要有一个 Replica 还幸存就选择其中一个作为 leader 否则则任意选这个一个 Replica 作为leader如果该 partition 的所有 Replica 都已经宕机则新的 leader 为-1 具体来说当一个分区的 leader 副本失效时follower 副本会发现并向其它 broker 节点发送请求申请成为该分区的新 leader。同时每个 broker 节点会周期性地向 controller 节点发送心跳请求汇报自己当前的状态和可用性信息。controller 节点会根据这些信息选择一个健康的、可用的 broker 节点作为该分区的新 leader。
在选举新 leader 的过程中Controller 节点会参考如下因素
副本状态只有处于 ISRin-sync replicas列表中的 follower 副本才有资格成为新 leader因为它们的数据已经与 leader 同步。副本位置Controller 节点会选择与原 leader 副本相同或更靠前的位置作为新 leader 的位置以确保最小化数据丢失。副本健康状况Controller 节点会优先选择健康的、可用的 broker 节点作为新 leader以确保高可用性和服务质量。 总之Controller 节点会综合考虑多个因素选出一个最适合成为新 leader 的 broker 节点从而保障 Kafka 集群的高可用性和稳定性。
为什么不能通过ZK的方式来选举partition的leader Kafka 集群如果业务很多的情况下会有很多的 partition。假设某个 broker 宕机就会出现很多的 partiton 都需要重新选举 leader。如果使用 Zookeeper 选举 leader会给 Zookeeper 带来巨大的压力。所以Kafka 中 leader 的选举不能使用 ZK 来实现。 当之前下线的分区重新上线时要执行Leader 选举选举策略为A。 A. OfflinePartition Leader 选举 B. ReassignPartition Leader 选举 C. PreferredReplicaPartition Leader 选举 D. ControlledShutdownPartition Leader 选举 分区的 Leader 副本选举对用户是完全透明的它是由 Controller 独立完成的。 AOfflinePartition Leader 选举每当有分区上线时就需要执行 Leader选举。所谓的分区上线可能是创建了新分区也可能是之前的下线分区重新上线选项A正确。 BReassignPartition Leader 选举:当你手动运行 kafka-reassign-partitions命令或是调用Admin的 alterPartitionReassignments 方法执行分区副本重分配时可能触发此类选举。 CPreferredReplicaPartition Leader 选举:当你手动运行 kafka-preferred-replica-election 命令或自动触发了 Preferred Leader 选举时该类策略被激活。 DControlledShutdownPartition Leader 选举:当 Broker 正常关闭时该 Broker上的所有 Leader 副本都会下线因此需要为受影响的分区执行相应的 Leader 选举。 Controller 给Broker 发送的请求不包括D。 A. LeaderAndIsrRequest B. StopReplicaRequest C. UpdateMetadataRequest D. ActiveControllerCount Controller会向Broker发送三类请求分别是LeaderAndIsrRequest、StopReplicaRequest和UpdateMetadataRequest。 ALeaderAndIsrRequest告诉 Broker 相关主题各个分区的Leader副本位于哪台Broker上、ISR中的副本都在哪些Broker上。 BStopReplicaRequest告知指定Broker停止它上面的副本对象该请求甚至还能删除副本底层的日志数据。 CUpdateMetadataRequest该请求会更新Broker上的元数据缓存。 DActiveControllerCount是Broker端监控指标不是发送的请求故D选项不包括。 leader负载均衡
Preferred Replica Kafka 中引入了一个叫做 preferred-replica 的概念意思就是优先的 Replica 在 ISR 列表中第一个 replica 就是preferred-replica第一个分区存放的broker肯定就是 preferred-replica。
执行以下脚本可以将preferred-replica设置为 leader均匀分配每个分区的leader。
./kafka-leader-election.sh --bootstrap-server node1.itcast.cn:9092 --topic 主题 --partition1 --election-type preferred确保leader在broker中负载均衡
杀掉 test 主题的某个broker这样 Kafka 会重新分配leader。等到 Kafka 重新分配 leader 之后再次启动 Kafka 进程。此时观察test主题各个分区leader的分配情况。 此时会造成leader分配是不均匀的所以可以执行以下脚本来重新分配leader
bin/kafka-leader-election.sh --bootstrap-server 10.211.55.8:9092 --topic test --partition1 --election-type preferred
# Successfully completed leader election (PREFERRED) for partitions test-1–partition指定需要重新分配leader的partition编号
Kafka 生产、消费数据工作流程
Kafka数据写入流程 生产者先从 zookeeper 的 “/brokers/topics/主题名/partitions/分区名/state”节点找到该 partition 的leader 生产者在ZK中找到该ID找到对应的broker broker进程上的 leader 将消息写入到本地 log 中。follower从 leader 上拉取消息写入到本地 log并向 leader 发送 ACKleader接收到所有的 ISR 中的 Replica 的 ACK 后并向生产者返回 ACK。
Kafka数据消费流程
两种消费模式 Kafka采用拉取模型由消费者自己记录消费状态每个消费者互相独立地顺序拉取每个分区的消息。消费者可以按照任意的顺序消费消息。比如消费者可以重置到旧的偏移量重新处理之前已经消费过的消息或者直接跳到最近的位置从当前的时刻开始消费。
每个 consumer 都可以根据分配策略默认 RangeAssignor 获得要消费的分区获取到 consumer 对应的offset默认从 ZK 中获取上一次消费的 offset 找到该分区的 leader拉取数据消费者提交 offset
Kafka的数据存储形式 一个 topic 由多个分区组成。一个分区partition由多个segment段组成一个segment段由多个文件组成log、index、timeindex。 存储日志 Kafka中的数据是保存在 /export/server/kafka_2.12-2.4.1/data自己安装在配置文件设定中消息是保存在以主题名-分区ID 的文件夹中的数据文件夹中包含以下内容
文件名说明00000000000000000000.index索引文件根据offset查找数据就是通过该索引文件来操作的00000000000000000000.log日志数据文件00000000000000000000.timeindex时间索引leader-epoch-checkpoint持久化每个partition leader对应的LEO log end offset、日志文件中下一条待写入消息的offset 每个日志文件的文件名为起始偏移量因为每个分区的起始偏移量是0所以分区的日志文件都以0000000000000000000.log 开始 默认的每个日志文件最大为 log.segment.bytes 102410241024 1G 为了简化根据 offset 查找消息Kafka 日志文件名设计为开始的偏移量 测试新创建一个topictest_10m该topic每个日志数据文件最大为10M
bin/kafka-topics.sh --create --zookeeper 10.211.55.8 --topic test_10m --replication-factor 2 --partitions 3 --config segment.bytes10485760使用之前的生产者程序往 test_10m 主题中生产数据 写入消息新的消息总是写入到最后的一个日志文件中该文件如果到达指定的大小默认为1GB时将滚动到一个新的文件中。
读取消息
根据 offset 首先需要找到存储数据的 segment 段注意offset指定分区的全局偏移量 然后根据这个 全局分区offset 找到相对于文件的 segment段offset : 最后再根据 segment段offset 读取消息为了提高查询效率每个文件都会维护对应的范围内存查找的时候就是使用简单的二分查找。
删除消息 在Kafka中消息是会被定期清理的。一次删除一个 segment 段的日志文件Kafka 的日志管理器会根据 Kafka 的配置来决定哪些文件可以被删除。
消息不丢失机制
broker数据不丢失生产者数据不丢失消费者数据不丢失
broker数据不丢失 生产者通过分区的leader写入数据后所有在 ISR 中 follower 都会从 leader 中复制数据这样可以确保即使 leader 崩溃了其他的 follower 的数据仍然是可用的。
生产者数据不丢失 生产者连接leader写入数据时可以通过ACK机制来确保数据已经成功写入。ACK机制有三个可选配置 配置ACK响应要求为 -1 时 —— 表示所有的节点都收到数据(leader和follower都接 收到数据 配置ACK响应要求为 1 时 —— 表示leader收到数据 配置ACK影响要求为 0 时 —— 生产者只负责发送数据不关心数据是否丢失这种情 况可能会产生数据丢失但性能是最好的
生产者可以采用同步和异步两种方式发送数据
同步发送一批数据给kafka后等待kafka返回结果异步发送一批数据给kafka只是提供一个回调函数。
说明如果broker迟迟不给ack而buffer又满了开发者可以设置是否直接清空buffer中的数据。
消费者数据不丢失
在消费者消费数据的时候只要每个消费者记录好 offset 值即可就能保证数据不丢失。
数据积压 Kafka 消费者消费数据的速度是非常快的但如果由于处理 Kafka 消息时由于有一些外部 IO、或者是产生网络拥堵就会造成Kafka中的数据积压或称为数据堆积。如果数据一直积压会导致数据出来的实时性受到较大影响。
Lag标签 解决数据积压问题
当Kafka出现数据积压问题时首先要找到数据积压的原因。以下是在企业中出现数据积压的几个类场景。 数据写入MySQL失败 问题描述 某日运维人员找到开发人员说某个topic的一个分区发生数据积压这个topic非常重要而且开始有用户投诉。运维非常紧张赶紧重启了这台机器。重启之后还是无济于事。 问题分析 消费这个topic的代码比较简单主要就是消费topic数据然后进行判断在进行数据库操作。运维通过kafka-eagle找到积压的topic发现该topic的某个分区积压了几十万条的消息。 最后通过查看日志发现由于数据写入到MySQL中报错导致消费分区的offset一自没有提交所以数据积压严重。 因为网络延迟消费失败 问题描述 基于Kafka开发的系统平稳运行了两个月突然某天发现某个topic中的消息出现数据积压大概有几万条消息没有被消费。 问题分析 通过查看应用程序日志发现有大量的消费超时失败。后查明原因因为当天网络抖动通过查看Kafka的消费者超时配置为50ms随后将消费的时间修改为500ms后问题解决。 Kafka中数据清理Log Deletion Kafka 的消息存储在磁盘中为了控制磁盘占用空间Kafka 需要不断地对过去的一些消息进行清理工作。Kafka 的每个分区都有很多的日志文件这样也是为了方便进行日志的清理。在 Kafka 中提供两种日志清理方式 日志删除Log Deletion按照指定的策略直接删除不符合条件的日志。日志删除是以段segment日志为单位来进行定期清理的。 日志压缩Log Compaction按照消息的key进行整合有相同key的但有不同value值只保留最后一个版本。
在 Kafka 的broker或topic配置中
配置项配置值说明log.cleaner.enabletrue默认开启自动清理日志功能log.cleanup.policydelete默认删除日志log.cleanup.policycompaction压缩日志log.cleanup.policydelete,compact同时支持删除、压缩
定时日志删除任务 Kafka 日志管理器中会有一个专门的日志删除任务来定期检测和删除不符合保留条件的日志分段文件这个周期可以通过 broker 端参数log.retention.check.interval.ms来配置默认值为300,000即5分钟。 当前日志分段的保留策略有3种 基于时间的保留策略 基于日志大小的保留策略 基于日志起始偏移量的保留策略
基于时间的保留策略 以下三种配置可以指定如果Kafka中的消息超过指定的阈值就会将日志进行自动清理log.retention.hours、log.retention.minutes、log.retention.ms 。 其中优先级为 log.retention.ms log.retention.minutes log.retention.hours。 默认情况在broker中配置如下log.retention.hours168。也就是默认日志的保留时间为168小时相当于保留7天。 删除日志分段时从日志文件对象中所维护日志分段的跳跃表中移除待删除的日志分段以保证没有线程对这些日志分段进行读取操作。将日志分段文件添加上“.deleted”的后缀也包括日志分段对应的索引文件 Kafka的后台定时任务会定期删除这些“.deleted”为后缀的文件这个任务的延迟执行时间可以通过file.delete.delay.ms参数来设置默认值为60000即1分钟。
设置topic 5秒删除一次
为了方便观察设置段文件的大小为1M 设置topic的删除策略key: retention.ms value: 5000 尝试往topic中添加一些数据等待一会观察日志的删除情况。可以发现日志会定期被标记为删除然后被删除。
基于日志大小的保留策略 日志删除任务会检查当前日志的大小是否超过设定的阈值来寻找可删除的日志分段的文件集合。可以通过 broker 端参数 log.retention.bytes 来配置默认值为-1表示无穷大。如果超过该大小会自动将超出部分删除。 注: log.retention.bytes 配置的是日志文件的总大小而不是单个的日志分段的大小一个日志文件包含多个日志分段。 基于日志起始偏移量保留策略 每个 segment 日志都有它的起始偏移量如果起始偏移量小于 logStartOffset那么这些日志文件将会标记为删除。
日志压缩Log Compaction Log Compaction是默认的日志删除之外的清理过时数据的方式。它会将相同的 key 对应的数据只保留一个版本。
Log Compaction执行后offset将不再连续但依然可以查询SegmentLog Compaction执行前后日志分段中的每条消息偏移量保持不变。Log Compaction会生成一个新的Segment文件Log Compaction是针对key的在使用的时候注意每个消息的key不为空基于Log Compaction可以保留key的最新更新可以基于Log Compaction来恢复消费者的最新状态 关于 Kafka 清理过期数据的说法错误的是B A.清理的策略包括删除和压缩 B. 启用压缩策略后只保留每个 key 指定版本的数据。 C. log.cleanup.policydelete启用删除策略 D. log.cleanup.policycompact启用压缩策略。 B. 启用压缩策略后只保留每个 key 最后一个版本的数据 Kafka配额限速机制Quotas 生产者和消费者以极高的速度生产/消费大量数据或产生请求从而占用 broker 上的全部资源造成网络 IO 饱和。有了配额Quotas就可以避免这些问题。Kafka 支持配额管理从而可以对 Producer 和 Consumer 的 producefetch 操作进行流量限制防止个别业务压爆服务器。
限制producer端速率
为所有client id设置默认值以下为所有producer程序设置其TPS不超过1MB/s即1048576/s命令如下
bin/kafka-configs.sh --zookeeper node1.itcast.cn:2181 --alter --add-config producer_byte_rate1048576 --entity-type clients --entity-default运行基准测试观察生产消息的速率
bin/kafka-producer-perf-test.sh --topic test --num-records 500000 --throughput -1 --record-size 1000 --producer-props bootstrap.serversnode1.itcast.cn:9092,node2.itcast.cn:9092,node3.itcast.cn:9092 acks1# 结果50000 records sent, 1108.156028 records/sec (1.06 MB/sec)限制consumer端速率 对consumer限速与 producer 类似只不过参数名不一样。为指定的 topic 进行限速以下为所有 consumer 程序设置 topic 速率不超过1MB/s即1048576/s。命令如下
bin/kafka-configs.sh --zookeeper node1.itcast.cn:2181 --alter --add-config consumer_byte_rate1048576 --entity-type clients --entity-default运行基准测试
bin/kafka-consumer-perf-test.sh --broker-list node1.itcast.cn:9092,node2.itcast.cn:9092,node3.itcast.cn:9092 --topic test --fetch-size 1048576 --messages 500000#结果为MB.sec1.0743取消Kafka的Quota配置
使用以下命令删除Kafka的Quota配置
bin/kafka-configs.sh --zookeeper node1.itcast.cn:2181 --alter --delete-config producer_byte_rate --entity-type clients --entity-defaultbin/kafka-configs.sh --zookeeper node1.itcast.cn:2181 --alter --delete-config consumer_byte_rate --entity-type clients --entity-default