桓台县旅游网站建设,购物网站建设技术难点,做网站推广员需要,宜兴网站开发Kafka概述
传统定义#xff1a;一个分布式的基于发布/订阅模式的消息队列#xff0c;主要应用于大数据实时处理领域。 最新定义#xff1a;一个开源的分布式事件流平台#xff0c;被数千家公司用于高性能数据管道、流分析、数据集成和关键任务应用。最主要的功能是做数据的…Kafka概述
传统定义一个分布式的基于发布/订阅模式的消息队列主要应用于大数据实时处理领域。 最新定义一个开源的分布式事件流平台被数千家公司用于高性能数据管道、流分析、数据集成和关键任务应用。最主要的功能是做数据的缓冲相较于flume的channel, 能力更强。
应用场景
缓冲/消峰解决生产消息和消费消息的处理速度不一致的情况。解耦只需知道如何连接kafka作用类似于交换机。异步通信可以将事务给kafka自己去处理其他事务。
消息队列的两种模式
点对点模式消费者拉取数据后删除数据。优点是简单速度快缺点是不方便实现多用户需要获取同一数据的情况。发布/订阅模式可以有多个topic主题消费者拉取数据后不删除数据。
Kafka的基础架构
为方便扩展并提高吞吐量一个topic分为多个partition配合分区的设计提出消费者组的概念组内内个消费者并行消费以线程为单位。为提高可用性为每个partiton增加若干副本类似NameNode HA借助zookeeper来实现leader和follower的选举机制leader是原数据follower是副本数据。leader主要用于发送和传输follower主要作为副本保证安全性。
Kafka的安装部署
官网下载地址http://kafka.apache.org/downloads.html
上传安装包解压安装包修改配置文件配置环境变量编写群启群关脚本kf.sh
#! /bin/bashcase $1 in
start){for i in hadoop102 hadoop103 hadoop104doecho --------启动 $i Kafka-------ssh $i /opt/module/kafka/bin/kafka-server-start.sh -daemon /opt/module/kafka/config/server.propertiesdone
};;
stop){for i in hadoop102 hadoop103 hadoop104doecho --------停止 $i Kafka-------ssh $i /opt/module/kafka/bin/kafka-server-stop.sh done
};;
esackafka主题相关操作
kafka-topics.sh脚本里面定义了对应相关操作。
增加主题kafka-topics.sh --bootstrap-server hadoop102:9092 -- create --topic second --replication-facotr 1 --partitions 1删除是标记删除预计在1分钟后完全删除。bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --delete --topic first修改只能增加分区数量。bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --alter --topic first --partitions 3查看bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --list
生产和消费
启动生产者kafka-console-producer.sh --bootstrap-server hadoop102:9092 --topic first启动消费者kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic first如果单独启动生产者发送数据之后再启动消费者默认不发送之前发送的数据。在消费者启动命令后面添加--from-beginning关键字可以修改为从头拿取数据。
发送流程
Kafka Producer生产者 main线程Producer的生产方法interceptors拦截器Serializer序列化器Partitioner分区器按照批次随机分区。每个批次默认是16K默认的等待时间是0ms。 RecordAccumulator里面创建双端队列队列个数等于分区个数sender进程复制双端队列中的数据发送到Kafka集群如果成功接收则返回ack应答否则重新发送最多重试21亿次。
异步发送和同步发送
sender进程发送请求默认是异步执行即向kafka集群发送时不管是否收到回复一直发送由selector来接收ack和关闭对应的请求进程。在Producer的send方法中有一个Callback对象参数该对象需要实现一个onCompletetion方法。可以在里面查看到对应方法参数中的元数据的值里面有主题名称、分区号和偏移量。异步执行时可以发现同一批次分区号是一样的同步时由于需要等待ack同一批次的分区号是不同的。
生产者分区
分区策略 默认分区器 如果指定了分区号到指定分区如果是key-value使用key进行hash分区粘性分区如果上一个有分区跟上一个分区一样直到数据达到分区容量上限或者等待时间上限进行随机更换分区。 UniformStickyPartition分区器如果key值是固定的可以使用该分区器轮询分区器需要维护一个列表效率更低。
生产者如何提高吞吐量
修改从双端队列拿取数据的等待时间从0ms修改为5-100mscompression.type: 压缩snappy修改批次大小默认为16K修改为32KB.
数据可靠性
ack应答级别
0生产者发送过来的数据不需要等数据落盘应答也就是最多一次。1生产者发送过来的数据Leader收到后应答-1all: 生产者发送过来的数据Leader和isr队列里面的所有节点收齐数据并落盘后应答。Leader维护了一个动态的in-sync replica set ISR, 如果有某个节点30s内没有回复则认为该节点死亡。数据有可能重复。
数据的去重
幂等性
指producer不论向Broker发送多少次重复数据Broker都只会持久化一条保证精准一次。重复数据的判断标准根据sqlNumber来判断重发的数据其seqNumber是一样的。 缺点如果生产者中途宕机然后重新建立会话时不能保证不同会话时PID是一样这时候重新发送重复数据时无法保证幂等性。 解决方案在Kafka集群中将生产者的信息保存到集群中的某个主题中如果生产者宕机后重启需要先去读取Kafka集群的状态信息以保证多会话情况下的幂等性。
数据的有序性
因为不能保证多分区之间是有序的只能指定单分区。开启幂等性且元数据request个数小于5个如果发送失败导致顺序异常Kafka会按照SeqNumber重新排序。
Flume和Kafka
为何Kafka全方面碾压flume还会有人使用flume 这是由于flume使用上只需配置一个文件即可使用无需编写代码。并且可以使用flume将数据灌入到kafka中既简单又利用到了kafka的性能flume和kafka结合使用才是日常开发的常用操作。
Kafka Broker总体工作流程
broker在zk中注册controller谁先注册谁说了算由选举出来的Controller监听brokers节点变化Controller决定Leader选举在isr存活为前提轮询选举Controller将节点信息上传到zk其他controller从zk同步相关信息
Broker节点的服役和退役
启动新主机的zookeeper和kafka创建一个要均衡的主题vim topics-to-move.json
{topics: [{topic: first}],version: 1
}生成一个负载均衡的计划bin/kafka-reassign-partitions.sh --bootstrap-server hadoop102:9092 --topics-to-move-json-file topics-to-move.json --broker-list 0,1,2,3 --generate保存生成的计划到文件中执行负载均衡计划在kafka/datas目录下查看是否正确。
kafka副本
为了提高数据可靠性副本数量一般设置为两个。
Follower故障处理
LEOlog End Offset:每个副本的最后一个offsetLEO其实就是最新的offset 1 HW (High Watermark): 所有副本中最小的LEO
高效读取
1. 多分区
2. 稀疏索引
3. 顺序写磁盘
4. 页缓存和零拷贝
页缓存其实就是把尽可能多的空闲内存当做磁盘缓存来使用。 零拷贝数据加工处理操作交给生产者和消费者