国外网站需要备案吗,wordpress问答类主题,无主体网站是什么意思,杭州百度seo优化来源#xff1a;B站 目录 Kafka生产者生产经验——生产者如何提高吞吐量生产经验——数据可靠性生产经验——数据去重数据传递语义幂等性生产者事务 生产经验——数据有序生产经验——数据乱序 Kafka BrokerKafka Broker 工作流程Zookeeper 存储的 Kafka 信息Kafka Broker 总…来源B站 目录 Kafka生产者生产经验——生产者如何提高吞吐量生产经验——数据可靠性生产经验——数据去重数据传递语义幂等性生产者事务 生产经验——数据有序生产经验——数据乱序 Kafka BrokerKafka Broker 工作流程Zookeeper 存储的 Kafka 信息Kafka Broker 总体工作流程Broker 重要参数 Kafka 副本副本基本信息Leader 选举流程Leader 和 Follower 故障处理细节生产经验——Leader Partition 负载平衡生产经验——增加副本因子 文件存储文件存储机制文件清理策略 高效读写数据 Kafka 消费者Kafka 消费方式Kafka 消费者工作流程消费者总体工作流程消费者组原理消费者重要参数 消费者 API独立消费者案例订阅主题 生产经验——分区的分配以及再平衡Range 以及再平衡RoundRobin 以及再平衡Sticky 以及再平衡 offset 位移offset 的默认维护位置自动提交 offset手动提交 offset指定 Offset 消费指定时间消费漏消费和重复消费 生产经验——消费者事务生产经验——数据积压消费者如何提高吞吐量 Kafka生产者
生产经验——生产者如何提高吞吐量
package com.jjm.kafka.producer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;
public class CustomProducerParameters {public static void main(String[] args) throws InterruptedException {// 1. 创建 kafka 生产者的配置对象Properties properties new Properties();// 2. 给 kafka 配置对象添加配置信息bootstrap.serversproperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, hadoop102:9092);// key,value 序列化必须key.serializervalue.serializerproperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, org.apache.kafka.common.serialization.StringSerializer);properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, org.apache.kafka.common.serialization.StringSerializer);// batch.size批次大小默认 16Kproperties.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);// linger.ms等待时间默认 0properties.put(ProducerConfig.LINGER_MS_CONFIG, 1);// RecordAccumulator缓冲区大小默认 32Mbuffer.memoryproperties.put(ProducerConfig.BUFFER_MEMORY_CONFIG,33554432);// compression.type压缩默认 none可配置值 gzip、snappy、lz4 和 zstdproperties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG,snappy);// 3. 创建 kafka 生产者对象KafkaProducerString, String kafkaProducer new KafkaProducerString, String(properties);// 4. 调用 send 方法,发送消息for (int i 0; i 5; i) {kafkaProducer.send(new ProducerRecord(first,atguigu i));}// 5. 关闭资源kafkaProducer.close();}
}测试 ①在 hadoop102 上开启 Kafka 消费者。
[jjmhadoop103 kafka]$ bin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic first②在 IDEA 中执行代码观察 hadoop102 控制台中是否接收到消息。
[jjmhadoop102 kafka]$ bin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic first
atguigu 0
atguigu 1
atguigu 2
atguigu 3
atguigu 4生产经验——数据可靠性
0回顾发送流程 1ack 应答原理 ACK应答级别 当acks0时 当acks1时 当acks-1时 思考Leader收到数据所有Follower都开始同步数据但有一个Follower因为某种故障迟迟不能与Leader进行同步那这个问题怎么解决呢 Leader维护了一个动态的in-sync replica setISR意为和Leader保持同步的FollowerLeader集合(leader0isr:0,1,2)。 如果Follower长时间未向Leader发送通信请求或同步数据则该Follower将被踢出ISR。该时间阈值由replica.lag.time.max.ms参数设定默认30s。例如2超时(leader:0, isr:0,1)。这样就不用等长期联系不上或者已经故障的节点。 数据可靠性分析 如果分区副本设置为1个或 者ISR里应答的最小副本数量 min.insync.replicas 默认为1设置为1和ack1的效果是一样的仍然有丢数的风险leader0isr:0。 数据完全可靠条件 ACK级别设置为-1 分区副本大于等于2 ISR里应答的最小副本数量大于等于2 可靠性总结 acks0生产者发送过来数据就不管了可靠性差效率高 acks1生产者发送过来数据Leader应答可靠性中等效率中等 acks-1生产者发送过来数据Leader和ISR队列里面所有Follwer应答可靠性高效率低 在生产环境中acks0很少使用acks1一般用于传输普通日志允许丢个别数据acks-1一般用于传输和钱相关的数据对可靠性要求比较高的场景。
生产经验——数据去重
数据传递语义
至少一次At Least Once ACK级别设置为-1 分区副本大于等于2 ISR里应答的最小副本数量大于等于2最多一次At Most Once ACK级别设置为0总结 At Least Once可以保证数据不丢失但是不能保证数据不重复 At Most Once可以保证数据不重复但是不能保证数据不丢失。 精确一次Exactly Once对于一些非常重要的信息比如和钱相关的数据要求数据既不能重复也不丢失。Kafka 0.11版本以后引入了一项重大特性幂等性和事务。
幂等性
1幂等性原理 幂等性就是指Producer不论向Broker发送多少次重复数据Broker端都只会持久化一条保证了不重复。 精确一次Exactly Once 幂等性 至少一次 ack-1 分区副本数2 ISR最小副本数量2 。 重复数据的判断标准具有PID, Partition, SeqNumber相同主键的消息提交时Broker只会持久化一条。其中PID是Kafka每次重启都会分配一个新的Partition 表示分区号Sequence Number是单调自增的。 所以幂等性只能保证的是在单分区单会话内不重复。2如何使用幂等性 开启参数 enable.idempotence 默认为 truefalse 关闭。
生产者事务
1Kafka 事务原理 说明开启事务必须开启幂等性。 2Kafka 的事务一共有如下 5 个 API
// 1 初始化事务
void initTransactions();
// 2 开启事务
void beginTransaction() throws ProducerFencedException;
// 3 在事务内提交已经消费的偏移量主要用于消费者
void sendOffsetsToTransaction(MapTopicPartition,OffsetAndMetadata offsets, String consumerGroupId) throws ProducerFencedException;
// 4 提交事务
void commitTransaction() throws ProducerFencedException;
// 5 放弃事务类似于回滚事务的操作
void abortTransaction() throws ProducerFencedException;3单个 Producer使用事务保证消息的仅一次发送
package com.atguigu.kafka.producer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;
public class CustomProducerTransactions {public static void main(String[] args) throws InterruptedException {// 1. 创建 kafka 生产者的配置对象Properties properties new Properties();// 2. 给 kafka 配置对象添加配置信息properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,hadoop102:9092);// key,value 序列化properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());// 设置事务 id必须事务 id 任意起名properties.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, transaction_id_0);// 3. 创建 kafka 生产者对象KafkaProducerString, String kafkaProducer new KafkaProducerString, String(properties);// 初始化事务kafkaProducer.initTransactions();// 开启事务kafkaProducer.beginTransaction();try {// 4. 调用 send 方法,发送消息for (int i 0; i 5; i) {// 发送消息kafkaProducer.send(new ProducerRecord(first, jjm i));}// int i 1 / 0;// 提交事务kafkaProducer.commitTransaction();} catch (Exception e) {// 终止事务kafkaProducer.abortTransaction();} finally {// 5. 关闭资源kafkaProducer.close();}}
}生产经验——数据有序
单分区内有序有条件的详见下节 多分区分区与分区间无序
生产经验——数据乱序
1kafka在1.x版本之前保证数据单分区有序条件如下 max.in.flight.requests.per.connection1不需要考虑是否开启幂等性。2kafka在1.x及以后版本保证数据单分区有序条件如下 2开启幂等性 max.in.flight.requests.per.connection需要设置小于等于5。 1未开启幂等性 max.in.flight.requests.per.connection需要设置为1。 原因说明因为在kafka1.x以后启用幂等后kafka服务端会缓producer发来的最近5个request的元数据故无论如何都可以保证最近5个request的数据都是有序的。
Kafka Broker
Kafka Broker 工作流程
Zookeeper 存储的 Kafka 信息
1启动 Zookeeper 客户端。
[jjmhadoop102 zookeeper-3.5.7]$ bin/zkCli.sh2通过 ls 命令可以查看 kafka 相关信息。
[zk: localhost:2181(CONNECTED) 2] ls /kafkaZookeeper中存储的Kafka 信息
Kafka Broker 总体工作流程 1模拟 Kafka 上下线Zookeeper 中数据变化 1查看/kafka/brokers/ids 路径上的节点。
[zk: localhost:2181(CONNECTED) 2] ls /kafka/brokers/ids
[0, 1, 2]2查看/kafka/controller 路径上的数据。
[zk: localhost:2181(CONNECTED) 15] get /kafka/controller
{version:1,brokerid:0,timestamp:1637292471777}3查看/kafka/brokers/topics/first/partitions/0/state 路径上的数据。
[zk: localhost:2181(CONNECTED) 16] get /kafka/brokers/topics/first/partitions/0/state
{controller_epoch:24,leader:0,version:1,leader_epoch:18,isr:[0,1,2]}4停止 hadoop104 上的 kafka。
[jjmhadoop104 kafka]$ bin/kafka-server-stop.sh5再次查看/kafka/brokers/ids 路径上的节点。
[zk: localhost:2181(CONNECTED) 3] ls /kafka/brokers/ids
[0, 1]6再次查看/kafka/controller 路径上的数据。
[zk: localhost:2181(CONNECTED) 15] get /kafka/controller
{version:1,brokerid:0,timestamp:1637292471777}7再次查看/kafka/brokers/topics/first/partitions/0/state 路径上的数据。
[zk: localhost:2181(CONNECTED) 16] get /kafka/brokers/topics/first/partitions/0/state
{controller_epoch:24,leader:0,version:1,leader_epoch:18,isr:[0,1]}8启动 hadoop104 上的 kafka。
[atguiguhadoop104 kafka]$ bin/kafka-server-start.sh -daemon ./config/server.properties9再次观察1、2、3步骤中的内容。
Broker 重要参数
参数名称描述replica.lag.time.max.msISR 中如果 Follower 长时间未向 Leader 发送通信请求或同步数据则该 Follower 将被踢出 ISR。该时间阈值默认 30s。auto.leader.rebalance.enable默认是 true。 自动 Leader Partition 平衡。leader.imbalance.per.broker.percentage默认是 10%。每个 broker 允许的不平衡的 leader的比率。如果每个 broker 超过了这个值控制器会触发 leader 的平衡。leader.imbalance.check.interval.seconds默认值 300 秒。检查 leader 负载是否平衡的间隔时间。log.segment.bytesKafka 中 log 日志是分成一块块存储的此配置是指 log 日志划分 成块的大小默认值 1G。log.index.interval.bytes默认 4kbkafka 里面每当写入了 4kb 大小的日志.log然后就往 index 文件里面记录一个索引。log.retention.hoursKafka 中数据保存的时间默认 7 天。log.retention.minutesKafka 中数据保存的时间分钟级别默认关闭。log.retention.msKafka 中数据保存的时间毫秒级别默认关闭。log.retention.check.interval.ms检查数据是否保存超时的间隔默认是 5 分钟。log.retention.bytes默认等于-1表示无穷大。超过设置的所有日志总大小删除最早的 segment。log.cleanup.policy默认是 delete表示所有数据启用删除策略如果设置值为 compact表示所有数据启用压缩策略。num.io.threads默认是 8。负责写磁盘的线程数。整个参数值要占总核数的 50%。num.replica.fetchers副本拉取线程数这个参数占总核数的 50%的 1/3num.network.threads默认是 3。数据传输线程数这个参数占总核数的50%的 2/3 。log.flush.interval.messages强制页缓存刷写到磁盘的条数默认是 long 的最大值9223372036854775807。一般不建议修改交给系统自己管理。log.flush.interval.ms每隔多久刷数据到磁盘默认是 null。一般不建议修改交给系统自己管理。
Kafka 副本
副本基本信息
1Kafka 副本作用提高数据可靠性。 2Kafka 默认副本 1 个生产环境一般配置为 2 个保证数据可靠性太多副本会增加磁盘存储空间增加网络上数据传输降低效率。 3Kafka 中副本分为Leader 和 Follower。Kafka 生产者只会把数据发往 Leader然后 Follower 找 Leader 进行同步数据。 4Kafka 分区中的所有副本统称为 ARAssigned Repllicas。 AR ISR OSR ISR表示和 Leader 保持同步的 Follower 集合。如果 Follower 长时间未向 Leader 发送通信请求或同步数据则该 Follower 将被踢出 ISR。该时间阈值由 replica.lag.time.max.ms参数设定默认 30s。Leader 发生故障之后就会从 ISR 中选举新的 Leader。 OSR表示 Follower 与 Leader 副本同步时延迟过多的副本。
Leader 选举流程
Kafka 集群中有一个 broker 的 Controller 会被选举为 Controller Leader负责管理集群broker 的上下线所有 topic 的分区副本分配和 Leader 选举等工作。 Controller 的信息同步工作是依赖于 Zookeeper 的。 Leader选举流程
Leader 和 Follower 故障处理细节
Follower故障处理细节 LEOLog End Offset每个副本的最后一个offsetLEO其实就是最新的offset 1。 HWHigh Watermark所有副本中最小的LEO 。 1Follower故障 1 Follower发生故障后会被临时踢出ISR 2 这个期间Leader和Follower继续接收数据 3待该Follower恢复后Follower会读取本地磁盘记录的上次的HW并将log文件高于HW的部分截取掉从HW开始向Leader进行同步。 4等该Follower的LEO大于等于该Partition的HW即Follower追上Leader之后就可以重新加入ISR了。Leader故障处理细节 2Leader故障 1 Leader发生故障之后会从ISR中选出一个新的Leader 2为保证多个副本之间的数据一致性其余的Follower会先将各自的log文件高于HW的部分截掉然后从新的Leader同步数据。 注意这只能保证副本之间的数据一致性并不能保证数据不丢失或者不重复。
生产经验——Leader Partition 负载平衡
Leader Partition自动平衡 正常情况下Kafka本身会自动把Leader Partition均匀分散在各个机器上来保证每台机器的读写吞吐量都是均匀的。但是如果某些broker宕机会导致Leader Partition过于集中在其他少部分几台broker上这会导致少数几台broker的读写请求压力过高其他宕机的broker重启之后都是follower partition读写请求很低造成集群负载不均衡。
auto.leader.rebalance.enable默认是true。自动Leader Partition 平衡leader.imbalance.per.broker.percentage默认是10%。每个broker允许的不平衡的leader的比率。如果每个broker超过了这个值控制器会触发leader的平衡。leader.imbalance.check.interval.seconds默认值300秒。检查leader负载是否平衡的间隔时间。 下面拿一个主题举例说明假设集群只有一个主题如下图所示 针对broker0节点分区2的AR优先副本是0节点但是0节点却不是Leader节点所以不平衡数加1AR副本总数是4 所以broker0节点不平衡率为1/410%需要再平衡。 broker2和broker3节点和broker0不平衡率一样需要再平衡。 Broker1的不平衡数为0不需要再平衡
参数名称描述auto.leader.rebalance.enable默认是 true。 自动 Leader Partition 平衡。生产环境中leader 重选举的代价比较大可能会带来性能影响建议设置为 false 关闭。leader.imbalance.per.broker.percentage默认是 10%。每个 broker 允许的不平衡的 leader的比率。如果每个 broker 超过了这个值控制器会触发 leader 的平衡。leader.imbalance.check.interval.seconds默认值 300 秒。检查 leader 负载是否平衡的间隔时间。
生产经验——增加副本因子
在生产环境当中由于某个主题的重要等级需要提升我们考虑增加副本。副本数的增加需要先制定计划然后根据计划执行。 1创建 topic
[jjmhadoop102 kafka]$ bin/kafka-topics.sh --bootstrap-server
hadoop102:9092 --create --partitions 3 --replication-factor 1 --topic four2手动增加副本存储 1创建副本存储计划所有副本都指定存储在 broker0、broker1、broker2 中。
[jjmhadoop102 kafka]$ vim increase-replication-factor.json输入如下内容
{version:1,partitions:[{topic:four,partition:0,replicas:[0,1,2]},{topic:four,partition:1,replicas:[0,1,2]},{topic:four,partition:2,replicas:[0,1,2]}]}2执行副本存储计划。
[jjmhadoop102 kafka]$ bin/kafka-reassign-partitions.sh --bootstrap-server hadoop102:9092 --reassignment-json-file increase-replication-factor.json --execute文件存储
文件存储机制
1Topic 数据的存储机制 Topic是逻辑上的概念而partition是物理上的概念每个partition对应于一个log文件该log文件中存储的就是Producer生产的数据。Producer生产的数据会被不断追加到该log文件末端为防止log文件过大导致数据定位效率低下Kafka采取了分片和索引机制将每个partition分为多个segment。每个segment包括“.index”文件、“.log”文件和.timeindex等文件。这些文件位于一个文件夹下该文件夹的命名规则为topic名称分区序号例如first-0。2思考Topic 数据到底存储在什么位置 1启动生产者并发送消息。
[jjmhadoop102 kafka]$ bin/kafka-console-producer.sh --
bootstrap-server hadoop102:9092 --topic first
hello world2查看 hadoop102或者 hadoop103、hadoop104的/opt/module/kafka/datas/first-1first-0、first-2路径上的文件。
[jjmhadoop104 first-1]$ ls
00000000000000000092.index
00000000000000000092.log
00000000000000000092.snapshot
00000000000000000092.timeindex
leader-epoch-checkpoint
partition.metadata3index 文件和 log 文件详解 说明日志存储参数配置
参数描述log.segment.bytesKafka 中 log 日志是分成一块块存储的此配置是指 log 日志划分成块的大小默认值 1G。log.index.interval.bytes默认 4kbkafka 里面每当写入了 4kb 大小的日志.log然后就往 index 文件里面记录一个索引。 稀疏索引。
文件清理策略
Kafka 中默认的日志保存时间为 7 天可以通过调整如下参数修改保存时间。
log.retention.hours最低优先级小时默认 7 天。log.retention.minutes分钟。log.retention.ms最高优先级毫秒。log.retention.check.interval.ms负责设置检查周期默认 5 分钟。 那么日志一旦超过了设置的时间怎么处理呢 Kafka 中提供的日志清理策略有 delete 和 compact 两种 1delete 日志删除将过期数据删除 log.cleanup.policy delete 所有数据启用删除策略 1基于时间默认打开。以 segment 中所有记录中的最大时间戳作为该文件时间戳。 2基于大小默认关闭。超过设置的所有日志总大小删除最早的 segment。 log.retention.bytes默认等于-1表示无穷大。 思考如果一个 segment 中有一部分数据过期一部分没有过期怎么处理 2compact 日志压缩 compact日志压缩对于相同key的不同value值只保留最后一个版本。 log.cleanup.policy compact 所有数据启用压缩策略 压缩后的offset可能是不连续的比如上图中没有6当从这些offset消费消息时将会拿到比这个offset大的offset对应的消息实际上会拿到offset为7的消息并从这个位置开始消费。 这种策略只适合特殊场景比如消息的key是用户IDvalue是用户的资料通过这种压缩策略整个消息集里就保存了所有用户最新的资料。
高效读写数据
1Kafka 本身是分布式集群可以采用分区技术并行度高 2读数据采用稀疏索引可以快速定位要消费的数据 3顺序写磁盘 Kafka 的 producer 生产数据要写入到 log 文件中写的过程是一直追加到文件末端为顺序写。官网有数据表明同样的磁盘顺序写能到 600M/s而随机写只有 100K/s。这与磁盘的机械机构有关顺序写之所以快是因为其省去了大量磁头寻址的时间。 4页缓存 零拷贝技术 零拷贝Kafka的数据加工处理操作交由Kafka生产者和Kafka消费者处理。Kafka Broker应用层不关心存储的数据所以就不用走应用层传输效率高。 PageCache页缓存Kafka重度依赖底层操作系统提供的PageCache功 能。当上层有写操作时操作系统只是将数据写入PageCache。当读操作发生时先从PageCache中查找如果找不到再去磁盘中读取。实际上PageCache是把尽可能多的空闲内存都当做了磁盘缓存来使用。
参数描述log.flush.interval.messages强制页缓存刷写到磁盘的条数默认是 long 的最大值9223372036854775807。一般不建议修改交给系统自己管理。log.flush.interval.ms每隔多久刷数据到磁盘默认是 null。一般不建议修改交给系统自己管理。
Kafka 消费者
Kafka 消费方式
pull拉模 式 consumer采用从broker中主动拉取数据。Kafka采用这种方式。push推模式 Kafka没有采用这种方式因为由broker决定消息发送速率很难适应所有消费者的消费速率。例如推送的速度是50m/sConsumer1、Consumer2就来不及处理消息。
pull模式不足之处是如果Kafka没有数据消费者可能会陷入循环中一直返回空数据。
Kafka 消费者工作流程
消费者总体工作流程 消费者组原理
Consumer GroupCG消费者组由多个consumer组成。形成一个消费者组的条件是所有消费者的groupid相同。
消费者组内每个消费者负责消费不同分区的数据一个分区只能由一个组内消费者消费。消费者组之间互不影响。所有的消费者都属于某个消费者组即消费者组是逻辑上的一个订阅者。 消费者组初始化流程 消费者组详细消费流程
消费者重要参数
参数名称描述bootstrap.servers向 Kafka 集群建立初始连接用到的 host/port 列表。key.deserializer 和value.deserializer指定接收消息的 key 和 value 的反序列化类型。一定要写全类名。group.id标记消费者所属的消费者组。enable.auto.commit默认值为 true消费者会自动周期性地向服务器提交偏移量。auto.commit.interval.ms如果设置了 enable.auto.commit 的值为 true 则该值定义了消费者偏移量向 Kafka 提交的频率默认 5s。auto.offset.reset当 Kafka 中没有初始偏移量或当前偏移量在服务器中不存在如数据被删除了该如何处理 earliest自动重置偏移量到最早的偏移量。 latest默认自动重置偏移量为最新的偏移量。 none如果消费组原来的previous偏移量不存在则向消费者抛异常。 anything向消费者抛异常。offsets.topic.num.partitions__consumer_offsets 的分区数默认是 50 个分区。heartbeat.interval.msKafka 消费者和 coordinator 之间的心跳时间默认 3s。该条目的值必须小于 session.timeout.ms 也不应该高于session.timeout.ms 的 1/3。session.timeout.msKafka 消费者和 coordinator 之间连接超时时间默认 45s。超过该值该消费者被移除消费者组执行再平衡。max.poll.interval.ms消费者处理消息的最大时长默认是 5 分钟。超过该值该消费者被移除消费者组执行再平衡。fetch.min.bytes默认 1 个字节。消费者获取服务器端一批消息最小的字节数。fetch.max.wait.ms默认 500ms。如果没有从服务器端获取到一批数据的最小字节数。该时间到仍然会返回数据。fetch.max.bytes默认 Default: 5242880050 m。消费者获取服务器端一批消息最大的字节数。如果服务器端一批次的数据大于该值50m仍然可以拉取回来这批数据因此这不是一个绝对最大值。一批次的大小受 message.max.bytes broker configor max.message.bytes topic config影响。max.poll.records一次 poll 拉取数据返回消息的最大条数默认是 500 条。
消费者 API
独立消费者案例订阅主题
1需求 创建一个独立消费者消费 first 主题中数据。 注意在消费者 API 代码中必须配置消费者组 id。命令行启动消费者不填写消费者组id 会被自动填写随机的消费者组 id。2实现步骤 1创建包名com.jjm.kafka.consumer 2编写代码
package com.jjm.kafka.consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Properties;
public class CustomConsumer {public static void main(String[] args) {// 1.创建消费者的配置对象Properties properties new Properties();// 2.给消费者配置对象添加参数properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, hadoop102:9092);// 配置序列化 必须properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());// 配置消费者组组名任意起名 必须properties.put(ConsumerConfig.GROUP_ID_CONFIG, test);// 创建消费者对象KafkaConsumerString, String kafkaConsumer new KafkaConsumerString, String(properties);// 注册要消费的主题可以消费多个主题ArrayListString topics new ArrayList();topics.add(first);kafkaConsumer.subscribe(topics);// 拉取数据打印while (true) {// 设置 1s 中消费一批数据ConsumerRecordsString, String consumerRecords kafkaConsumer.poll(Duration.ofSeconds(1));// 打印消费到的数据for (ConsumerRecordString, String consumerRecord : consumerRecords) {System.out.println(consumerRecord);}}}
}3测试 1在 IDEA 中执行消费者程序。 2在 Kafka 集群控制台创建 Kafka 生产者并输入数据。
[jjmhadoop102 kafka]$ bin/kafka-console-producer.sh --bootstrap-server hadoop102:9092 --topic first
hello3在 IDEA 控制台观察接收到的数据。
ConsumerRecord(topic first, partition 1, leaderEpoch 3, offset 0, CreateTime 1629160841112, serialized key size -1, serialized value size 5, headers RecordHeaders(headers [], isReadOnly false), key null, value hello)生产经验——分区的分配以及再平衡
1、一个consumer group中有多个consumer组成一个 topic有多个partition组成现在的问题是到底由哪个consumer来消费哪个partition的数据。 2、Kafka有四种主流的分区分配策略 Range、RoundRobin、Sticky、CooperativeSticky。可以通过配置参数partition.assignment.strategy修改分区的分配策略。默认策略是Range CooperativeSticky。Kafka可以同时使用多个分区分配策略。
Range 以及再平衡
1Range 分区策略原理 Range 是对每个 topic 而言的。首先对同一个 topic 里面的分区按照序号进行排序并 对消费者按照字母顺序进行排序。 假如现在有 7 个分区3 个消费者排序后的分区将会是0,1,2,3,4,5,6消费者排序完之后将会是C0,C1,C2。 通过 partitions数/consumer数 来决定每个消费者应该消费几个分区。如果除不尽那么前面几个消费者将会多消费 1 个分区。 例如7/3 2 余 1 除不尽那么 消费者 C0 便会多消费 1 个分区。 8/32余2除不尽那么C0和C1分别多消费一个。 注意如果只是针对 1 个 topic 而言C0消费者多消费1个分区影响不是很大。但是如果有 N 多个 topic那么针对每个 topic消费者 C0都将多消费 1 个分区topic越多C0消费的分区会比其他消费者明显多消费 N 个分区。 容易产生数据倾斜2Range 分区分配再平衡案例 1停止掉 0 号消费者快速重新发送消息观看结果45s 以内越快越好。 1 号消费者消费到 3、4 号分区数据。 2 号消费者消费到 5、6 号分区数据。 0 号消费者的任务会整体被分配到 1 号消费者或者 2 号消费者。 说明0 号消费者挂掉后消费者组需要按照超时时间 45s 来判断它是否退出所以需要等待时间到了 45s 后判断它真的退出就会把任务分配给其他 broker 执行。 2再次重新发送消息观看结果45s 以后。 1 号消费者消费到 0、1、2、3 号分区数据。 2 号消费者消费到 4、5、6 号分区数据。 说明消费者 0 已经被踢出消费者组所以重新按照 range 方式分配。
RoundRobin 以及再平衡
1RoundRobin 分区策略原理 RoundRobin 针对集群中所有Topic而言。 RoundRobin 轮询分区策略是把所有的 partition 和所有的consumer 都列出来然后按照 hashcode 进行排序最后通过轮询算法来分配 partition 给到各个消费者。2RoundRobin 分区分配再平衡案例 1停止掉 0 号消费者快速重新发送消息观看结果45s 以内越快越好。 1 号消费者消费到 2、5 号分区数据 2 号消费者消费到 4、1 号分区数据 0 号消费者的任务会按照 RoundRobin 的方式把数据轮询分成 0 、6 和 3 号分区数据分别由 1 号消费者或者 2 号消费者消费。 说明0 号消费者挂掉后消费者组需要按照超时时间 45s 来判断它是否退出所以需要等待时间到了 45s 后判断它真的退出就会把任务分配给其他 broker 执行。 2再次重新发送消息观看结果45s 以后。 1 号消费者消费到 0、2、4、6 号分区数据 2 号消费者消费到 1、3、5 号分区数据 说明消费者 0 已经被踢出消费者组所以重新按照 RoundRobin 方式分配。
Sticky 以及再平衡
粘性分区定义可以理解为分配的结果带有“粘性的”。即在执行一次新的分配之前考虑上一次分配的结果尽量少的调整分配的变动可以节省大量的开销。 粘性分区是 Kafka 从 0.11.x 版本开始引入这种分配策略首先会尽量均衡的放置分区 到消费者上面在出现同一消费者组内消费者出现问题的时候会尽量保持原有分配的分区不变化。 1Sticky 分区分配再平衡案例 1停止掉 0 号消费者快速重新发送消息观看结果45s 以内越快越好。 1 号消费者消费到 2、5、3 号分区数据。 2 号消费者消费到 4、6 号分区数据。 0 号消费者的任务会按照粘性规则尽可能均衡的随机分成 0 和 1 号分区数据分别 由 1 号消费者或者 2 号消费者消费。 说明0 号消费者挂掉后消费者组需要按照超时时间 45s 来判断它是否退出所以需要等待时间到了 45s 后判断它真的退出就会把任务分配给其他 broker 执行。 2再次重新发送消息观看结果45s 以后。 1 号消费者消费到 2、3、5 号分区数据。 2 号消费者消费到 0、1、4、6 号分区数据。 说明消费者 0 已经被踢出消费者组所以重新按照粘性方式分配。
offset 位移
offset 的默认维护位置 __consumer_offsets 主题里面采用 key 和 value 的方式存储数据。key 是 group.idtopic分区号value 就是当前 offset 的值。每隔一段时间kafka 内部会对这个 topic 进行compact也就是每个 group.idtopic分区号就保留最新数据。
自动提交 offset
为了使我们能够专注于自己的业务逻辑Kafka提供了自动提交offset的功能。 自动提交offset的相关参数
enable.auto.commit是否开启自动提交offset功能默认是trueauto.commit.interval.ms自动提交offset的时间间隔默认是5s
手动提交 offset
虽然自动提交offset十分简单便利但由于其是基于时间提交的开发人员难以把握offset提交的时机。因此Kafka还提供了手动提交offset的API。 手动提交offset的方法有两种分别是commitSync同步提交和commitAsync异步提交。两者的相同点是都会将本次提交的一批数据最高的偏移量提交不同点是同步提交阻塞当前线程一直到提交成功并且会自动失败重试由不可控因素导致也会出现提交失败而异步提交则没有失败重试机制故有可能提交失败。
commitSync同步提交必须等待offset提交完毕再去消费下一批数据。commitAsync异步提交 发送完提交offset请求后就开始消费下一批数据了。
指定 Offset 消费
auto.offset.reset earliest | latest | none 默认是 latest。 当 Kafka 中没有初始偏移量消费者组第一次消费或服务器上不再存在当前偏移量 时例如该数据已被删除该怎么办 1earliest自动将偏移量重置为最早的偏移量–from-beginning。 2latest默认值自动将偏移量重置为最新偏移量。 3none如果未找到消费者组的先前偏移量则向消费者抛出异常。
指定时间消费
需求在生产环境中会遇到最近消费的几个小时数据异常想重新按照时间消费。 例如要求按照时间消费前一天的数据怎么处理 操作步骤
package com.atguigu.kafka.consumer;
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration;
import java.util.*;
public class CustomConsumerForTime {public static void main(String[] args) {// 0 配置信息Properties properties new Properties();// 连接properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, hadoop102:9092);// key value 反序列化properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());properties.put(ConsumerConfig.GROUP_ID_CONFIG, test2);// 1 创建一个消费者KafkaConsumerString, String kafkaConsumer new KafkaConsumer(properties);// 2 订阅一个主题ArrayListString topics new ArrayList();topics.add(first);kafkaConsumer.subscribe(topics);SetTopicPartition assignment new HashSet();while (assignment.size() 0) {kafkaConsumer.poll(Duration.ofSeconds(1));// 获取消费者分区分配信息有了分区分配信息才能开始消费assignment kafkaConsumer.assignment();}HashMapTopicPartition, Long timestampToSearch new HashMap();// 封装集合存储每个分区对应一天前的数据for (TopicPartition topicPartition : assignment) {timestampToSearch.put(topicPartition, System.currentTimeMillis() - 1 * 24 * 3600 * 1000);}// 获取从 1 天前开始消费的每个分区的 offsetMapTopicPartition, OffsetAndTimestamp offsets kafkaConsumer.offsetsForTimes(timestampToSearch);// 遍历每个分区对每个分区设置消费时间。for (TopicPartition topicPartition : assignment) {OffsetAndTimestamp offsetAndTimestamp offsets.get(topicPartition);// 根据时间指定开始消费的位置if (offsetAndTimestamp ! null){kafkaConsumer.seek(topicPartition, offsetAndTimestamp.offset());}}// 3 消费该主题数据while (true) {ConsumerRecordsString, String consumerRecords kafkaConsumer.poll(Duration.ofSeconds(1));for (ConsumerRecordString, String consumerRecord : consumerRecords) {System.out.println(consumerRecord);}}}
}漏消费和重复消费
重复消费已经消费了数据但是 offset 没提交。 漏消费先提交 offset 后消费有可能会造成数据的漏消费。 1场景1重复消费。自动提交offset引起。 2场景1漏消费。设置offset为手动提交当offset被提交时数据还在内存中未落盘此时刚好消费者线程被kill掉那么offset已经提交但是数据未处理导致这部分内存中的数据丢失。 思考怎么能做到既不漏消费也不重复消费呢详看消费者事务。
生产经验——消费者事务
如果想完成Consumer端的精准一次性消费那么需要Kafka消费端将消费过程和提交offset过程做原子绑定。此时我们需要将Kafka的offset保存到支持事务的自定义介质比如MySQL。
生产经验——数据积压消费者如何提高吞吐量
1如果是Kafka消费能力不足则可以考虑增加Topic的分区数并且同时提升消费组的消费者数量消费者数 分区数。两者缺一不可 2如果是下游的数据处理不及时提高每批次拉取的数量。批次拉取数据过少拉取数据/处理时间 生产速度使处理的数据小于生产的数据也会造成数据积压。
参数名称描述fetch.max.bytes默认 Default: 5242880050 m。消费者获取服务器端一批消息最大的字节数。如果服务器端一批次的数据大于该值50m仍然可以拉取回来这批数据因此这不是一个绝对最大值。一批次的大小受 message.max.bytes broker configor max.message.bytes topic config影响。max.poll.records一次 poll 拉取数据返回消息的最大条数默认是 500 条