wordpress站点美化,石家庄最新新闻,美容培训东莞网站建设,建设网站前的市场分析包括哪些内容文章目录 [toc]创建一个演示 topic生产一些数据使用消费者组消费数据增加分区无新数据产生#xff0c;有旧数据未消费有新数据产生#xff0c;有旧数据未消费 增加副本创建 json 文件使用指定的 json 文件增加 topic 的副本数使用指定的 json 文件查看 topic 的副本数增加的进… 文章目录 [toc]创建一个演示 topic生产一些数据使用消费者组消费数据增加分区无新数据产生有旧数据未消费有新数据产生有旧数据未消费 增加副本创建 json 文件使用指定的 json 文件增加 topic 的副本数使用指定的 json 文件查看 topic 的副本数增加的进度查看 topic 情况
文档内出现的 ${KAFKA_BROKERS} 表示 kafka 的连接地址${ZOOKEEPER_CONNECT} 表示 zk 的连接地址需要替换成自己的实际 ip 地址
创建一个演示 topic
kafka-topics.sh --create --zookeeper ${ZOOKEEPER_CONNECT} --replication-factor 1 --partitions 3 --topic test-topic-update查看 topic 详情 kafka-topics.sh --bootstrap-server ${KAFKA_BROKERS} --describe --topic test-topic-update总共是六个 kafka 节点三分区一副本分散在三个不同的 kafka 节点 Topic:test-topic-update PartitionCount:3 ReplicationFactor:1 Configs:segment.bytes1073741824Topic: test-topic-update Partition: 0 Leader: 5 Replicas: 5 Isr: 5Topic: test-topic-update Partition: 1 Leader: 1 Replicas: 1 Isr: 1Topic: test-topic-update Partition: 2 Leader: 0 Replicas: 0 Isr: 0关于输出内容的概念 分区Partition 主题Topic在 Kafka 中的数据被分成一个或多个分区。每个分区是一个有序且持久化的消息日志。分区允许 Kafka 集群进行水平扩展使多个消费者能够并行地处理主题的消息。消费者组中的每个消费者负责处理一个或多个分区的消息。 领导者Leader 每个分区都有一个领导者领导者负责处理该分区的所有读写请求。生产者向领导者发送消息消费者从领导者读取消息。领导者也负责维护分区的复制和同步。 副本Replicas 为了提高数据的冗余和可用性每个分区可以有多个副本包括一个领导者副本和零个或多个追随者副本。领导者副本处理写请求追随者副本用于数据冗余和读请求。 同步副本集In-Sync ReplicasISR 同步副本集是指在分区的所有副本中与领导者副本保持同步的副本。领导者和同步副本集中的副本是可用于读取的其他追随者副本可能会有一些延迟。
生产一些数据
手动生产 300 条数据
kafka-verifiable-producer.sh --broker-list ${KAFKA_BROKERS} --topic test-topic-update --max-messages 300使用消费者组消费数据
消费者组不存在的情况下没有返回被消费的数据过两三秒之后可以中断这个命令然后使用下面的 --describe 来验证
kafka-console-consumer.sh --bootstrap-server ${KAFKA_BROKERS} --topic test-topic-update --group test-topic-update-group查看消费组内的 topic 消费情况 kafka-consumer-groups.sh --bootstrap-server ${KAFKA_BROKERS} --describe --group test-topic-update-group目前三百条都被消费了使用上面的生产数据的命令再生产300条模拟 topic 有数据的场景 GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
test-topic-update-group test-topic-update 2 100 100 0 - - -
test-topic-update-group test-topic-update 0 100 100 0 - - -
test-topic-update-group test-topic-update 1 100 100 0 - - -生产完数据后再次查看返回结果如下 GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
test-topic-update-group test-topic-update 2 100 200 100 - - -
test-topic-update-group test-topic-update 0 100 200 100 - - -
test-topic-update-group test-topic-update 1 100 200 100 - - -增加分区
在增加分区的场景下比较方便直接使用 --alter 就能实现这里将原来的 3 分区改成 12 分区
kafka-topics.sh --bootstrap-server ${KAFKA_BROKERS} --alter --topic test-topic-update --partitions 12无新数据产生有旧数据未消费 查看 topic 情况 kafka-topics.sh --bootstrap-server ${KAFKA_BROKERS} --describe --topic test-topic-update可以看到分区已经更新成 12 个了也可以看出kafka 在动态增加分区的时候是均分的都会按照类似下面的 5-1-0-3-2-4 这样的顺序去均分当然前提是分区数和节点数是倍数关系 Topic:test-topic-update PartitionCount:12 ReplicationFactor:1 Configs:segment.bytes1073741824Topic: test-topic-update Partition: 0 Leader: 5 Replicas: 5 Isr: 5Topic: test-topic-update Partition: 1 Leader: 1 Replicas: 1 Isr: 1Topic: test-topic-update Partition: 2 Leader: 0 Replicas: 0 Isr: 0Topic: test-topic-update Partition: 3 Leader: 3 Replicas: 3 Isr: 3Topic: test-topic-update Partition: 4 Leader: 2 Replicas: 2 Isr: 2Topic: test-topic-update Partition: 5 Leader: 4 Replicas: 4 Isr: 4Topic: test-topic-update Partition: 6 Leader: 5 Replicas: 5 Isr: 5Topic: test-topic-update Partition: 7 Leader: 1 Replicas: 1 Isr: 1Topic: test-topic-update Partition: 8 Leader: 0 Replicas: 0 Isr: 0Topic: test-topic-update Partition: 9 Leader: 3 Replicas: 3 Isr: 3Topic: test-topic-update Partition: 10 Leader: 2 Replicas: 2 Isr: 2Topic: test-topic-update Partition: 11 Leader: 4 Replicas: 4 Isr: 4查看消费组内的分区情况 kafka-consumer-groups.sh --bootstrap-server ${KAFKA_BROKERS} --describe --group test-topic-update-group因为没有新数据进入也没有消费旧数据此时还是显示的原先的信息 GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
test-topic-update-group test-topic-update 2 100 200 100 - - -
test-topic-update-group test-topic-update 0 100 200 100 - - -
test-topic-update-group test-topic-update 1 100 200 100 - - -将未消费的 300 条数据进行消费 kafka-console-consumer.sh --bootstrap-server ${KAFKA_BROKERS} --topic test-topic-update --group test-topic-update-group --max-messages 300消费完成后再次查看消费组的情况 kafka-consumer-groups.sh --bootstrap-server ${KAFKA_BROKERS} --describe --group test-topic-update-group此时就变成正常的 12 分区了 GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
test-topic-update-group test-topic-update 0 100 100 0 - - -
test-topic-update-group test-topic-update 7 0 0 0 - - -
test-topic-update-group test-topic-update 5 0 0 0 - - -
test-topic-update-group test-topic-update 1 100 100 0 - - -
test-topic-update-group test-topic-update 6 0 0 0 - - -
test-topic-update-group test-topic-update 2 100 100 0 - - -
test-topic-update-group test-topic-update 3 0 0 0 - - -
test-topic-update-group test-topic-update 10 0 0 0 - - -
test-topic-update-group test-topic-update 9 0 0 0 - - -
test-topic-update-group test-topic-update 8 0 0 0 - - -
test-topic-update-group test-topic-update 11 0 0 0 - - -
test-topic-update-group test-topic-update 4 0 0 0 - - -这里为了方便验证我把 topic 删了后重建了下面这个删除 topic 的命令大家别随意执行会删除数据的 kafka-topics.sh --bootstrap-server ${KAFKA_BROKERS} --delete --topic test-topic-update有新数据产生有旧数据未消费
同样先扩容分区
kafka-topics.sh --bootstrap-server ${KAFKA_BROKERS} --alter --topic test-topic-update --partitions 12未生产新数据的时候查看消费者组的信息同样是没有更新分区信息 kafka-consumer-groups.sh --bootstrap-server ${KAFKA_BROKERS} --describe --group test-topic-update-group此时手动使用命令模拟新数据进来 kafka-verifiable-producer.sh --broker-list ${KAFKA_BROKERS} --topic test-topic-update --max-messages 100通过命令查看消费者组的情况 kafka-consumer-groups.sh --bootstrap-server ${KAFKA_BROKERS} --describe --group test-topic-update-group此时显示的是老分区而且只显示了 88925 条数据 GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
test-topic-update-group test-topic-update 2 100 108 8 - - -
test-topic-update-group test-topic-update 0 100 108 8 - - -
test-topic-update-group test-topic-update 1 100 109 9 - - -手动消费一下数据试试 kafka-console-consumer.sh --bootstrap-server ${KAFKA_BROKERS} --topic test-topic-update --group test-topic-update-group --max-messages 100发现返回的信息里面只显示25条数据 kafka-consumer-groups.sh --bootstrap-server ${KAFKA_BROKERS} --describe --group test-topic-update-group但是观察消费者组的情况显示的是都消费了看起来应该是和 topic 加入新消费者组的情况一样不展示但实际消费数据了这块是个人的理解具体的原理需要有兴趣的大佬深究一下希望能赐教带我飞 GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
test-topic-update-group test-topic-update 0 108 108 0 - - -
test-topic-update-group test-topic-update 7 9 9 0 - - -
test-topic-update-group test-topic-update 5 8 8 0 - - -
test-topic-update-group test-topic-update 1 109 109 0 - - -
test-topic-update-group test-topic-update 6 9 9 0 - - -
test-topic-update-group test-topic-update 2 108 108 0 - - -
test-topic-update-group test-topic-update 3 8 8 0 - - -
test-topic-update-group test-topic-update 10 8 8 0 - - -
test-topic-update-group test-topic-update 9 8 8 0 - - -
test-topic-update-group test-topic-update 8 8 8 0 - - -
test-topic-update-group test-topic-update 11 8 8 0 - - -
test-topic-update-group test-topic-update 4 9 9 0 - - -增加副本
创建 json 文件
kafka-reassign-partitions.sh 是 Kafka 提供的命令行工具用于重新分配主题分区的副本。这个工具允许你重新定义主题分区副本的分布以实现负载均衡、故障恢复或集群扩展等目的 之前的 topic 是 1 副本12 分区按照之前的 5-1-0-3-2-4 的顺序来分配第一个副本然后按照 4-3-2-0-1-5 的顺序来分配第二个副本我这里的 json 文件就命名为add_rep_test_topic_update.json大家可以以自己实际来命名 {version:1, partitions:[
{topic:test-topic-update,partition:0,replicas:[5,4]},
{topic:test-topic-update,partition:1,replicas:[1,3]},
{topic:test-topic-update,partition:2,replicas:[0,2]},
{topic:test-topic-update,partition:3,replicas:[3,0]},
{topic:test-topic-update,partition:4,replicas:[2,1]},
{topic:test-topic-update,partition:5,replicas:[4,5]},
{topic:test-topic-update,partition:6,replicas:[5,4]},
{topic:test-topic-update,partition:7,replicas:[1,3]},
{topic:test-topic-update,partition:8,replicas:[0,2]},
{topic:test-topic-update,partition:9,replicas:[3,0]},
{topic:test-topic-update,partition:10,replicas:[2,1]},
{topic:test-topic-update,partition:11,replicas:[4,5]}]
}使用指定的 json 文件增加 topic 的副本数
kafka-reassign-partitions.sh --zookeeper ${ZOOKEEPER_CONNECT} --execute --reassignment-json-file add_rep_test_topic_update.json使用指定的 json 文件查看 topic 的副本数增加的进度
kafka-reassign-partitions.sh --zookeeper ${ZOOKEEPER_CONNECT} --verify --reassignment-json-file add_rep_test_topic_update.json通过命令返回的内容可以看出都成功了 Reassignment of partition test-topic-update-0 completed successfully
Reassignment of partition test-topic-update-7 completed successfully
Reassignment of partition test-topic-update-5 completed successfully
Reassignment of partition test-topic-update-1 completed successfully
Reassignment of partition test-topic-update-6 completed successfully
Reassignment of partition test-topic-update-2 completed successfully
Reassignment of partition test-topic-update-3 completed successfully
Reassignment of partition test-topic-update-10 completed successfully
Reassignment of partition test-topic-update-9 completed successfully
Reassignment of partition test-topic-update-8 completed successfully
Reassignment of partition test-topic-update-11 completed successfully
Reassignment of partition test-topic-update-4 completed successfully查看 topic 情况
kafka-topics.sh --bootstrap-server ${KAFKA_BROKERS} --describe --topic test-topic-update现在的 topic 变成了 12 分区2 副本的状态了 Topic:test-topic-update PartitionCount:12 ReplicationFactor:2 Configs:segment.bytes1073741824Topic: test-topic-update Partition: 0 Leader: 5 Replicas: 5,4 Isr: 5,4Topic: test-topic-update Partition: 1 Leader: 1 Replicas: 1,3 Isr: 3,1Topic: test-topic-update Partition: 2 Leader: 0 Replicas: 0,2 Isr: 0,2Topic: test-topic-update Partition: 3 Leader: 3 Replicas: 3,0 Isr: 3,0Topic: test-topic-update Partition: 4 Leader: 1 Replicas: 2,1 Isr: 1,2Topic: test-topic-update Partition: 5 Leader: 4 Replicas: 4,5 Isr: 5,4Topic: test-topic-update Partition: 6 Leader: 5 Replicas: 5,4 Isr: 4,5Topic: test-topic-update Partition: 7 Leader: 1 Replicas: 1,3 Isr: 1,3Topic: test-topic-update Partition: 8 Leader: 0 Replicas: 0,2 Isr: 0,2Topic: test-topic-update Partition: 9 Leader: 3 Replicas: 3,0 Isr: 0,3Topic: test-topic-update Partition: 10 Leader: 1 Replicas: 2,1 Isr: 1,2Topic: test-topic-update Partition: 11 Leader: 4 Replicas: 4,5 Isr: 4,5