阜新网站开发,百度搜索官方网站,wordpress缩略图路径错误,wordpress返回上页kafka介绍
Kafka是最初由Linkedin公司开发#xff0c;是一个分布式、支持分区的#xff08;partition#xff09;、多副本的 #xff08;replica#xff09;#xff0c;基于zookeeper协调的分布式消息系统#xff0c;它的最大的特性就是可以实时的处理大量数据以满足各…kafka介绍
Kafka是最初由Linkedin公司开发是一个分布式、支持分区的partition、多副本的 replica基于zookeeper协调的分布式消息系统它的最大的特性就是可以实时的处理大量数据以满足各种需求场景比如基于hadoop的批处理系统、低延迟的实时系统、 Storm/Spark流式处理引擎web/nginx日志、访问日志消息服务等等用scala语言编写Linkedin于2010年贡献给了Apache基金会并成为顶级开源项目。
1.使用场景 日志收集可以用Kafka收集各种服务的log通过kafka以统⼀接口服务的方式开放给各种consumer例如hadoop、Hbase、Solr等。 消息系统解耦和生产者和消费者、缓存消息等。 用户活动跟踪Kafka经常被用来记录web用户或者app用户的各种活动如浏览网页、 搜索、点击等活动这些活动信息被各个服务器发布到kafka的topic中然后订阅者通过订阅这些topic来做实时的监控分析或者装载到hadoop、数据仓库中做离线分析和挖掘。 运营指标Kafka也经常用来记录运营监控数据。包括收集各种分布式应用的数据生产各种操作的集中反馈比如报警和报告。
2.kafka基本概念 整个流程应该是producer通过网络发送消息到Kafka集群然后consumer 来进行消费如下图 服务端(brokers)和客户端(producer、consumer)之间通信通过TCP协议来完成。
kafka基本使用
1.安装关闭
以下所有操作全部基于kafka_2.13-3.0.1.tgz 3.0.1版本 这个版本
配置文件server.properties主要修改以下配置
#broker.id属性在kafka集群中必须要是唯⼀
broker.id0
listenersPLAINTEXT://xx.xx.xx.xx(服务器内网IP地址):9092
advertised.listenersPLAINTEXT://xx.xx.xx.xx(服务器对外IP地址):9092
#kafka的消息存储⽂件
log.dir/usr/local/kafka/data/kafka-logs
#kafka连接zookeeper的地址
zookeeper.connect192.168.65.60:2181启动
./kafka-server-start.sh -daemon ../config/server.properties验证
# 查看端口是否占用
netstat -ntlp 或者
进入到zk内查看是否有kafka的节点/brokers/ids/0
./zkCli.sh关闭kafka
./kafka-server-stop.sh stop ../config/server.properties2.创建topic
执行以下命令创建名为“test”的topic这个topic只有⼀个partition并且备份因子也设置为 1
./kafka-topics.sh --bootstrap-server kafkahost:9092 --create --topic test --partitions 1 --replication-factor 1-- 新版本的kafka已经不需要依赖zookeeper来创建topic新版的kafka创建topic指令如下
./kafka-topics.sh --bootstrap-server 124.222.253.33:9092 --create --topic test --partitions 1 --replication-factor 13.查看kafka中所有的主题
./kafka-topics.sh --bootstrap-server kafkahost:9092 --list./kafka-topics.sh --bootstrap-server 124.222.253.33:9092 --list4.发送消息
kafka自带了⼀个producer命令客户端可以从本地文件中读取内容或者以命令行中直接输入内容并将这些内容以消息的形式发送到kafka集群中。在默认情况下每⼀个行会被当做成⼀个独立的消息。使用kafka的发送消息的客户端指定发送到的kafka服务器地址和topic
把消息发送给broker中的某个topic打开⼀个kafka发送消息的客户端然后开始用客户端向kafka服务器发送消息
./kafka-console-producer.sh --bootstrap-server 124.222.253.33:9092 --topic test5.消费消息
消费消息两种方式
对于consumerkafka同样也携带了⼀个命令行客户端会将获取到内容在命令中进行输出默认是消费最新的消息。 使用kafka的消费者消息的客户端从指定kafka服务器的指定 topic中消费消息 从当前主题中的最后⼀条消息的offset偏移量位置1开始消费 ./kafka-console-consumer.sh --bootstrap-server 124.222.253.33:9092 --topic test从当前主题中的第⼀条消息开始消费 ./kafka-console-consumer.sh --bootstrap-server 124.222.253.33:9092 --from-beginning --topic test6.消息的细节 生产者将消息发送给brokerbroker会将消息保存在本地的日志文件中
/usr/local/kafka/data/kafka-logs/主题-分区/00000000.log消息的保存是有序的通过offset偏移量来描述消息的有序性消费者消费消息时可以通过offset来描述当前要消费的那条消息的位置
7.单播多播消息
单播还是多播消息取决于topic有多少消费组
1单播
如果多个消费者在同⼀个消费组那么只有⼀个消费者可以收到订阅的topic中的消息。同⼀个消费组中只能有⼀个消费者收到订阅topic中的消息。
./kafka-console-consumer.sh --bootstrap-server 124.222.253.33:9092 --consumer-property group.idtestGroup --topic test2多播
不同的消费组订阅同⼀个topic那么不同的消费组中只有⼀个消费者能收到消息。实际上也是多个消费组中的多个消费者收到了同⼀个消息。
./kafka-console-consumer.sh --bootstrap-server 124.222.253.33:9092 --consumer-property group.idtestGroup1 --topic test
./kafka-console-consumer.sh --bootstrap-server 124.222.253.33:9092 --consumer-property group.idtestGroup2 --topic test3区别 8.查看消费组详细信息
# 查看当前主题下有哪些消费组
./kafka-consumer-groups.sh --bootstrap-server 124.222.253.33:9092 --list# 查看消费组中的具体信息⽐如当前偏移量、最后⼀条消息的偏移量、堆积的消息数量
./kafka-consumer-groups.sh --bootstrap-server 124.222.253.33:9092 --describe --group testGroupCurrennt-offset当前消费组的已消费偏移量最后被消费的消息的偏移量Log-end-offset主题对应分区消息的结束偏移量(HW) 【消息总量最后一条消息偏移量】Lag当前消费组未消费的消息数积压消息量
Kafka中主题和分区的概念
1.主题
主题-topic在kafka中是⼀个逻辑的概念kafka通过topic将消息进⾏分类。不同的topic会被订阅该topic的消费者消费。
但是有⼀个问题如果说这个topic中的消息非常非常多多到需要几T来存因为消息是会被保存到log日志文件中的。为了解决单个文件过大的问题kafka提出了Partition分区的概念。
2.分区
1分区的概念
通过partition将⼀个topic中的消息分区来存储。
这样的好处有多个
分区存储可以解决统⼀存储文件过大的问题提高了读写的吞吐量读和写可以同时在多个分区中进行 2创建多分区的主题
./kafka-topics.sh --bootstrap-server 124.222.253.33:9092 --create --topic test1 --partitions 2 --replication-factor 13.kafka中消息日志文件中保存的内容 00000.log 这个文件中保存的就是消息 __consumer_offsets-49 kafka内部自己创建了__consumer_offsets主题包含了50个分区。这个主题用来存放消费者消费某个主题的偏移量。因为每个消费者都会自己维护着消费的主题的偏移量也就是说每个消费者会把消费的主题的偏移量自主上报给kafka中的默认主题consumer_offsets。因此kafka为了提升这个主题的并发性默认设置了50个分区(可以通过offsets.topic.num.partitions设置)。 提交到哪个分区通过hash函数hash(consumerGroupId) % __consumer_offsets 主题的分区数提交到该主题中的内容是key是consumerGroupIdtopic分区号value就是当前offset的值 文件中保存的消息默认保存7天。七天到后消息会被删除最后就保留最新的那条数据。
Kafka集群操作
1.搭建kafka集群三个broker
创建三个server.properties文件
# 0 1 2
broker.id2
# 9092 9093 9094
listenersPLAINTEXT://xx.xx.xx.xx(服务器内网IP地址):9094
advertised.listenersPLAINTEXT://xx.xx.xx.xx(服务器对外IP地址):9094
# kafka-logs kafka-logs-1 kafka-logs-2
log.dir/usr/local/data/kafka-logs-2通过命令来启动三台broker
./kafka-server-start.sh -daemon ../config/server.properties
./kafka-server-start.sh -daemon ../config/server1.properties
./kafka-server-start.sh -daemon ../config/server2.properties校验是否启动成功
进入到zk中查看/brokers/ids中过是否有三个znode012
2.副本的概念
在创建主题时除了指明了主题的分区数以外还指明了副本数 replication-factor参数
如下主题创建了两分区、三副本副本对应集群中broker数量
./kafka-topics.sh --bootstrap-server 124.222.253.33:9092 --create --topic my-replicated-topic --partitions 2 --replication-factor 3副本是为了为主题中的分区创建多个备份多个副本在kafka集群的多个broker中会有⼀个副本作为leader其他是follower。
查看topic情况
# 查看topic情况
./kafka-topics.sh --describe --bootstrap-server 124.222.253.33:9092 --topic my-replicated-topicleader
kafka的写和读的操作都发生在leader上。leader负责把数据同步给follower。当leader挂了经过主从选举从多个follower中选举产⽣⼀个新的leaderfollower通过poll的方式来同步数据
follower
接收leader的同步的数据leader挂了参与leader选举
replicas
当前副本存在的broker节点
isr
可以同步和已同步的broker节点会被存入到isr集合中。如果isr中的broker节点性能较差会被踢出isr集合。
3.broker、主题、分区、副本
综上broker、主题、分区、副本概念已全部展示
集群中有多个broker创建主题时可以指明主题有多个分区把消息拆分到不同的分区中存储可以为分区创建多个副本不同的副本存放在不同的broker⾥。
4.kafka集群消息的发送
./kafka-console-producer.sh --broker-list 124.222.253.33:9092,124.222.253.33:9093,124.222.253.33:9094 --topic my-replicated-topic5.kafka集群消息的消费
1普通消费
./kafka-console-consumer.sh --bootstrap-server 124.222.253.33:9092,124.222.253.33:9093,124.222.253.33:9094 --from-beginning --topic my-replicated-topic2指定消费组消费
./kafka-console-consumer.sh --bootstrap-server 124.222.253.33:9092,124.222.253.33:9093,124.222.253.33:9094 --from-beginning --consumer-property group.idtestGroup1 --topic my-replicated-topic6.分区分消费组的集群消费中的细节 ⼀个partition只能被⼀个消费组中的⼀个消费者消费目的是为了保证消费的顺序性但是多个partion的多个消费者消费的总的顺序性是得不到保证的那怎么做到消费的总顺序性呢Kafka只在partition的范围内保证消息消费的局部顺序性不能在同⼀个topic中的多个partition中保证总的消费顺序性。 ⼀个消费者可以消费多个partition。 partition的数量决定了消费组中消费者的数量建议同⼀个消费组中消费者的数量不要超过partition的数量否则多的消费者消费不到消息 如果消费者挂了那么会触发rebalance机制会让其他消费者来消费该分区