商业网站设计,湖南住房和城乡建设厅官网,政务网站建设原则,如何给自己的网站做外链文章目录 基本操作新增Topic查询Topic修改Topic删除Topic 生产者和消费者创建生产者创建消费者 Broker扩展Producer扩展Topic、Partition、Message扩展存储策略容错机制 基本操作
新增Topic 指定两个分区#xff0c;两个副本#xff0c;replication不能大于集群中的broker数… 文章目录 基本操作新增Topic查询Topic修改Topic删除Topic 生产者和消费者创建生产者创建消费者 Broker扩展Producer扩展Topic、Partition、Message扩展存储策略容错机制 基本操作
新增Topic 指定两个分区两个副本replication不能大于集群中的broker数 [roothadoop01 kafka_2.12-2.4.0]# bin/kafka-topics.sh --create --zookeeper hadoop01:2181 --partitions 2 --replication-factor 2 --topic hello
Created topic hello.
查询Topic
[roothadoop01 kafka_2.12-2.4.0]# bin/kafka-topics.sh --list --zookeeper hadoop01:2181
hello# 查看详细信息
[roothadoop01 kafka_2.12-2.4.0]# bin/kafka-topics.sh --describe --zookeeper hadoop01:2181
Topic: hello PartitionCount: 2 ReplicationFactor: 2 Configs: Topic: hello Partition: 0 Leader: 1 Replicas: 1,2 Isr: 1,2Topic: hello Partition: 1 Leader: 2 Replicas: 2,0 Isr: 2,0 修改Topic 修改partition的数量只能增加 [roothadoop01 kafka_2.12-2.4.0]# bin/kafka-topics.sh --alter --zookeeper hadoop01:2181 --partitions 5 --topic hello
WARNING: If partitions are increased for a topic that has a key, the partition logic or ordering of the messages will be affected
Adding partitions succeeded![roothadoop01 kafka_2.12-2.4.0]# bin/kafka-topics.sh --describe --zookeeper hadoop01:2181
Topic: hello PartitionCount: 5 ReplicationFactor: 2 Configs: Topic: hello Partition: 0 Leader: 1 Replicas: 1,2 Isr: 1,2Topic: hello Partition: 1 Leader: 2 Replicas: 2,0 Isr: 2,0Topic: hello Partition: 2 Leader: 0 Replicas: 0,2 Isr: 0,2Topic: hello Partition: 3 Leader: 1 Replicas: 1,2 Isr: 1,2Topic: hello Partition: 4 Leader: 2 Replicas: 2,0 Isr: 2,0
删除Topic 删除topic删除操作是不可逆的从1.0开始默认开启删除功能之前的版本只会标记为删除状态需要设置delete.topic.enable为true才可以真正删除 [roothadoop01 kafka_2.12-2.4.0]# bin/kafka-topics.sh --delete --zookeeper hadoop01:2181 --topic helloTopic hello is marked for deletion.
Note: This will have no impact if delete.topic.enable is not set to true.
生产者和消费者
创建生产者
bin/kafka-console-prodecer.sh创建消费者
bin/kafka-console-consumer.sh[roothadoop01 kafka_2.12-2.4.0]# bin/kafka-topics.sh --create --zookeeper hadoop01:2181 --partitions 5 --replication-factor 2 --topic hello
Created topic hello.# producer
[roothadoop01 kafka_2.12-2.4.0]# bin/kafka-console-producer.sh --broker-list hadoop01:9092 --topic hello# consumer 这个只消费最新的消息
[roothadoop01 kafka_2.12-2.4.0]# bin/kafka-console-consumer.sh --bootstrap-server hadoop01:9092 --topic hello# 消费之前的消息
[roothadoop01 kafka_2.12-2.4.0]# bin/kafka-console-consumer.sh --bootstrap-server hadoop01:9092 --topic hello --from-beginning
Broker扩展 配置文件server.properties # The number of messages to accept before forcing a flush of data to disk
# 根据条数选择刷新磁盘的时机
log.flush.interval.messages10000# 根据消息的间隔时间刷新
# The maximum amount of time a message can sit in a log before we force a flush
log.flush.interval.ms1000# The minimum age of a log file to be eligible for deletion due to age 日志保存时间
log.retention.hours168# A size-based retention policy for logs. Segments are pruned from the log unless the remaining
# segments drop below log.retention.bytes. Functions independently of log.retention.hours.
#log.retention.bytes1073741824# The maximum size of a log segment file. When this size is reached a new log segment will be created.
log.segment.bytes1073741824# The interval at which log segments are checked to see if they can be deleted according
# to the retention policies 每隔5分钟检查文件是否满足删除的条件
log.retention.check.interval.ms300000
Producer扩展
Partitioner根据用户设置的算法(比如根据消息的key来设计算法到底分发到哪个分区里面)来计算发送到哪个分区-Partition默认是随机数据通信方式同步发送和异步发送同步是指生产者发送数据后要等待接收方发回响应后再发送下一个数据的通讯方式异步指发送生产者发送消息后不等接收方响应就立即发送下一条数据的方式通信方式通过acks的配置来控制。 acks默认为1.表示需要Leader节点回复收到消息acksall表示需要所有的Leader节点以及所有的副本节点回复收到消息(acks-1)acks0不需要回复
Topic、Partition、Message扩展
每个Partition在存储层面是Append Log文件新消息都会被直接追加到log文件的尾部每条消息在log文件中的位置称为offset(偏移量)越多的Partition可以容纳更多的Consumer有效提升并发消费的能力业务类型增加了可以增加Topic数据量大需要增加PartitionMessageoffset类型是long表示此消息在一个Partition中的起始位置可以认为offset是Partition中的messageId自增MessageSize类似为int32表示此消息的字节大小data类型为bytes表示message的具体内容 存储策略
在kafka中每个topic包含1到多个partition每个partition存储一部分Message每条Message包含三个属性其中有一个是OffsetOffset相当于这个partition中的message的唯一ID可以通过分段索引的方式去找到这个message分段就是segment文件每个partition由多个segment文件组成索引就是index每个index里面都会记录每个segment文件中的第一条数据的偏移量然后根据这个偏移量就可以去segment文件中找到对应的消息
# The maximum size of a log segment file. When this size is reached a new log segment will be created.
# 这个配置就表示每个segment文件的大小超过这个大小就会再创建一个新的文件
log.segment.bytes1073741824kafka消息的存储流程producer生产的消息会被发送到Topic的多个Partition上面Topic收到消息之后会往partition的最后一个segment文件中添加这条消息文件达到一定大小后会创建新的文件 容错机制 一个Broker宕机后对集群的影响不大 # 模拟节点宕机
[roothadoop01 config]# jps
41728 NameNode
53523 Kafka
42246 ResourceManager
59789 Jps
41998 SecondaryNameNode
52655 QuorumPeerMain
[roothadoop01 config]# kill 53523
[roothadoop01 config]# jps
41728 NameNode
59809 Jps
42246 ResourceManager
41998 SecondaryNameNode
52655 QuorumPeerMain# 连接到kafka
[roothadoop01 zookeeper3.8.4]# bin/zkCli.sh
[zk: localhost:2181(CONNECTED) 0] ls /
[admin, brokers, cluster, config, consumers, controller, controller_epoch, isr_change_notification, latest_producer_id_block, log_dir_event_notification, zookeeper]
[zk: localhost:2181(CONNECTED) 1] ls /brokers
[ids, seqid, topics]
[zk: localhost:2181(CONNECTED) 2] ls /brokers/ids
[1, 2]
[zk: localhost:2181(CONNECTED) 4] get /brokers/ids/1
{listener_security_protocol_map:{PLAINTEXT:PLAINTEXT},endpoints:[PLAINTEXT://hadoop02:9092],jmx_port:-1,host:hadoop02,timestamp:1710206078306,port:9092,version:4}
[zk: localhost:2181(CONNECTED) 5] # zookeeper会重新选举leader
[roothadoop01 kafka_2.12-2.4.0]# bin/kafka-topics.sh --describe --zookeeper hadoop01:2181 --topic hello
Topic: hello PartitionCount: 5 ReplicationFactor: 2 Configs: Topic: hello Partition: 0 Leader: 2 Replicas: 0,2 Isr: 2Topic: hello Partition: 1 Leader: 1 Replicas: 1,0 Isr: 1Topic: hello Partition: 2 Leader: 2 Replicas: 2,1 Isr: 2,1Topic: hello Partition: 3 Leader: 1 Replicas: 0,1 Isr: 1Topic: hello Partition: 4 Leader: 1 Replicas: 1,2 Isr: 1,2
You have new mail in /var/spool/mail/root 当kafka集群中新增一个Broker节点,zookeeper会自动识别并在适当的时机选择此节点提供Leader服务 # 重新启动
[roothadoop01 kafka_2.12-2.4.0]# bin/kafka-server-start.sh -daemon config/server.properties
You have new mail in /var/spool/mail/root
[roothadoop01 kafka_2.12-2.4.0]# jps
41728 NameNode
60640 Kafka
60707 Jps
42246 ResourceManager
41998 SecondaryNameNode
52655 QuorumPeerMain# 进入zookeeper观察
[zk: localhost:2181(CONNECTED) 2] ls /brokers/ids
[0, 1, 2]
[zk: localhost:2181(CONNECTED) 3] get /brokers/ids/0
{listener_security_protocol_map:{PLAINTEXT:PLAINTEXT},endpoints:[PLAINTEXT://hadoop01:9092],jmx_port:-1,host:hadoop01,timestamp:1710221534732,port:9092,version:4}# 查询kafka topic信息
[roothadoop01 kafka_2.12-2.4.0]# bin/kafka-topics.sh --describe --zookeeper hadoop01:2181 --topic hello
Topic: hello PartitionCount: 5 ReplicationFactor: 2 Configs: Topic: hello Partition: 0 Leader: 2 Replicas: 0,2 Isr: 2,0Topic: hello Partition: 1 Leader: 1 Replicas: 1,0 Isr: 1,0Topic: hello Partition: 2 Leader: 2 Replicas: 2,1 Isr: 2,1Topic: hello Partition: 3 Leader: 1 Replicas: 0,1 Isr: 1,0Topic: hello Partition: 4 Leader: 1 Replicas: 1,2 Isr: 1,2
You have new mail in /var/spool/mail/root 新启动的几点不会是任何分区的leader所以要重新均匀分配,其实不分配也可以在kafka中有对应的配置 [roothadoop01 kafka_2.12-2.4.0]# bin/kafka-leader-election.sh --bootstrap-server hadoop01:9092 --election-type preferred --all-topic-partitions
You have new mail in /var/spool/mail/root
[roothadoop01 kafka_2.12-2.4.0]# bin/kafka-topics.sh --describe --zookeeper hadoop01:2181 --topic hello
Topic: hello PartitionCount: 5 ReplicationFactor: 2 Configs: Topic: hello Partition: 0 Leader: 0 Replicas: 0,2 Isr: 2,0Topic: hello Partition: 1 Leader: 1 Replicas: 1,0 Isr: 1,0Topic: hello Partition: 2 Leader: 2 Replicas: 2,1 Isr: 2,1Topic: hello Partition: 3 Leader: 0 Replicas: 0,1 Isr: 1,0Topic: hello Partition: 4 Leader: 1 Replicas: 1,2 Isr: 1,2 在kafka中的Broker是无状态的本身是不保存任何信息的Broker的所有信息都放在zookeeper里面了所以Broker进程挂掉或者启动对集群的影响不大