国外做网站被动收入,静态网站什么样,前端做视频直播网站,合肥网站建设网站模板架构原理
一、高性能读写架构原理——顺序写零拷贝
首先了解两个专业术语#xff0c;研究kafka这个东西#xff0c;你必须得搞清楚这两个概念#xff0c;吞吐量#xff0c;延迟。
写数据请求发送给kafka一直到他处理成功#xff0c;你认为写请求成功#xff0c;假设是…架构原理
一、高性能读写架构原理——顺序写零拷贝
首先了解两个专业术语研究kafka这个东西你必须得搞清楚这两个概念吞吐量延迟。
写数据请求发送给kafka一直到他处理成功你认为写请求成功假设是1毫秒这个就说明性能很高这个就是延迟。
kafka每毫秒可以处理1条数据每秒可以处理1000条数据这个单位时间内可以处理多少条数据就叫做吞吐量1000条数据每条数据10kb10mb吞吐量相当于是每秒处理10mb的数据
1. Kafka是如何利用顺序磁盘写机制实现单机每秒几十万消息写入的 kafka的特点高吞吐低延迟
直接写入os的page cache中
文件kafka仅仅是追加数据到文件末尾磁盘顺序写性能极高几乎跟写内存是一样高的。磁盘随机写你要随机在文件的某个位置修改数据这个叫做磁盘随机写性能是很低的磁盘顺序写仅仅追加数据到文件末尾
而且写磁盘的方式是顺序写不是随机写性能跟内存写几乎一样。就是仅仅在磁盘文件的末尾追加写不能在文件随机位置写入
假设基于上面说的os cache写 磁盘顺序写0.01毫秒低延迟高吞吐每毫秒可以处理100条数据每秒可以处理10万条数据不需要依托类似spark straeming那种batch微批处理的机制
正是依靠了这个超高的写入性能单物理机可以做到每秒几十万条消息写入Kafka
这种方式让kafka的写性能极高最大程度减少了每条数据处理的时间开销反过来就大幅度提升了每秒处理数据的吞吐量一般kafka部署在物理机上单机每秒写入几万到几十万条消息是没问题的
这种方式是不是就兼顾了低延迟和高吞吐两个要求尽量把每条消息的写入性能压榨到极致就可以实现低延迟的写入同时对应的每秒的吞吐量自然就提升了
所以这是kafka非常核心的一个底层机制
而且这里很关键的一点比如rabbitmq这种消息中间件他会先把数据写入内存里然后到了一定时候再把数据一次性从内存写入磁盘里但是kafka不是这种机制他收到数据直接写磁盘
只不过是先写的page cache然后是磁盘顺序写所以写入的性能非常高而且这样不需要让kafka自身的jvm进程占用过多内存可以更多的把内存空间留给os的page cache来缓存磁盘文件的数据
只要能让更多的磁盘数据缓存在os cache里那么后续消费数据从磁盘读的时候就可以直接走os cache读数据了性能是非常高的
2. Kafka是如何利用零拷贝和页缓存技术实现高性能读取的 那么在消费数据的时候需要从磁盘文件里读取数据后通过网络发送出去这个时候怎么提升性能呢
首先就是利用了page cache技术之前说过kafka写入数据到磁盘文件的时候实际上是写入page cache的没有直接发生磁盘IO所以写入的数据大部分都是停留在os层的page cache里的
这个本质其实跟elasticsearch的实现原理是类似的
然后在读取的时候如果正常情况下从磁盘读取数据先尝试从page cache读读不到才从磁盘IO读读到数据以后先会放在os层的一个page cache里接着会发生上下文切换到系统那边把os的读缓存数据拷贝到应用缓存里
接着再次发生上下文二切换到os层把应用缓存的数据拷贝到os的socket缓存中最后数据再发送到网卡上
这个过程里发生了好几次上下文切换而且还涉及到了好几次数据拷贝如果不考虑跟硬件之间的交互起码是从os cache到用户缓存从用户缓存到socket缓存有两次拷贝是绝对没必要的 但是如果用零拷贝技术就是linux的sendfile就可以直接把操作交给osos看page cache里是否有数据如果没有就从磁盘上读取如果有的话直接把os cache里的数据拷贝给网卡了中间不用走那么多步骤了
对比一下是不是所谓的零考别了
所以呢通过零拷贝技术来读取磁盘上的数据还有page cahce的帮助这个性能就非常高了
3. Kafka的底层数据存储结构日志文件以及offset 基本上可以认为每个partition就是一个日志文件存在于某台Kafka服务器上然后这个日志里写入了很多消息每个消息在partition日志文件里都有一个序号叫做offset代表这个消息是日志文件里的第几条消息
但是在消费消息的时候也有一个所谓的offset这个offset是代表消费者目前在partition日志文件里消费到了第几条消息是两回事儿
4. Kafka是如何通过精心设计消息格式节约磁盘空间占用开销的
kafka的消息格式如下 crc32magicattribute时间戳key长度keyvalue长度value
kafka是直接通过NIO的ByteBuffer以二进制的方式来保存消息的这种二级制紧凑保存格式可以比使用Java对象保存消息要节约40%的内存空间
然后这个消息实际上是封装在一个log entry里的你可以认为是一个日志条目吧在kafka里认为每个partition实际上就是一个磁盘上的日志文件写到parttion里去的消息就是一个日志所以log entry就是一个日志
这个日志条目包含了一个offset一个消息的大小然后是消息自身就是上面那个数据结构但是这里要注意的一点就是这个message里可能会包含多条消息压缩在一起所以可能找一条消息需要从这个压缩数据里遍历搜索
而且这里还有一个概念就是消息集合一个消息集合里包含多个日志最新名称叫做RecordBatch
后来消息格式演化为了如下所示 1消息总长度 2属性废弃了已经不用 3时间戳增量跟RecordBatch的时间戳的增量差值 4offset增量跟RecordBatch的offset的增量差值 5key长度 6key 7value长度 8value 9header个数 10header自定义的消息元数据key-value对
通过时间戳、offset、key长度等都用可变长度来尽可能减少空间占用v2版本的数据格式比v1版本的数据格式要节约很多磁盘开销
5. 如何实现TB量级的数据在Kafka集群中分布式的存储
但是这里有一个很大的问题就是不可能说把TB量级的数据都放在一台Kafka服务器上吧这样肯定会遇到容量有限的问题所以Kafka是支持分布式存储的也就是说你的一个topic代表了逻辑上的一个数据集
你大概可以认为一个业务上的数据集合吧比如说用户行为日志都走一个topic数据库里的每个表的数据分别是一个topic订单表的增删改的变更记录进入一个topic促销表的增删改的变更记录进入一个topic
每个topic都有很多个partition你认为是数据分区或者是数据分片大概这些意思都可以就是说这个topic假设有10TB的数据量需要存储在磁盘上此时你给他分配了5个partition那么每个partition都可以存放2TB的数据
然后每个partition不就可以放在一台机器上通过这个方式就可以实现数据的分布式存储了每台机器上都运行一个Kafka的进程叫做Broker以后大家记住broker就是一个kafka进程在一台服务器上就可以了
二、高可用架构原理——异步复制ISR列表
1. 如何基于多副本冗余机制保证Kafka宕机时还具备高可用性
但是这里就有一个问题了如果此时Kafka某台机器宕机了那么一个topic就丢失了一个partition的数据此时不就导致数据丢失了吗所以啊所以对数据做多副本冗余也就是每个parttion都有副本
比如最基本的就是每个partition做一个副本副本放在另外一台机器上 然后呢kafka自动从一个partition的多个副本中选举出来一个leader partition这个leader partition就负责对外提供这个partiton的数据读写接收到写过来的数据就可以把数据复制到副本partition上去
这个时候如果说某台机器宕机了上面的leader partition没了此时怎么办呢通过zookeeper来维持跟每个kafka的会话如果一个kafka进程宕机了此时kafka集群就会重新选举一个leader partition就是用他的某个副本partition即可
通过副本partition可以继续体统这个partition的数据写入和读取这样就可以实现容错了这个副本partition的专业术语叫做follower partition所以每个partitino都有多个副本其中一个是leader是选举出来的其他的都是follower partition
多副本冗余的机制就可以实现Kafka高可用架构
2. 保证写入Kafka的数据不丢失ISR机制到底是什么意思 光是依靠多副本机制能保证Kafka的高可用性但是能保证数据不丢失吗不行因为如果leader宕机但是leader的数据还没同步到follower上去此时即使选举了follower作为新的leader当时刚才的数据已经丢失了
ISR是in-sync replica就是跟leader partition保持同步的follower partition的数量只有处于ISR列表中的follower才可以在leader宕机之后被选举为新的leader因为在这个ISR列表里代表他的数据跟leader是同步的
如果要保证写入kafka的数据不丢失首先需要保证ISR中至少有一个follower其次就是在一条数据写入了leader partition之后要求必须复制给ISR中所有的follower partition才能说代表这条数据已提交绝对不会丢失这是Kafka给出的承诺
3. 如何让Kafka集群处理请求的时候实现负载均衡的效果
假如说很多partition的leader都在一台机器上那么不就会导致大量的客户端都请求那一台机器这样是不对的kafka集群会自动实现负载均衡的算法尽量把leader partition均匀分布在集群各个机器上
然后客户端在请求的时候就会尽可能均匀的请求到kafka集群的每一台机器上去了假如出现了partition leader的变动那么客户端会感知到然后下次就可以请求最新的那个leader partition了
4. 基于ZooKeeper实现Kafka无状态可伸缩的架构设计思路 5. Kafka集群是如何基于Zookeeper实现节点发现与故障感知的 6. 再看ISR机制Leader宕机时只能选举同步的Follower
7. Partition的几个核心offset高水位offset、LEO代表了什么 实际上来说每次leader接收到一条消息都会更新自己的LEO也就是log end offset把最后一位offset 1这个大家都能理解吧接着各个follower会从leader请求同步数据这是持续进行的
offset 0 ~ offset 4LEO 5代表了最后一条数据后面的offset下一次将要写入的数据的offsetLEO你一定要明白他的名词
然后follower同步到数据之后就会更新自己的LEO
并不是leader主动推送数据给follower他实际上是follower主动向leader尝试获取数据不断的发送请求到leader来fetch最新的数据
然后对于接收到的某一条数据所有follower的LEO都更新之后leader才会把自己的HWHigh Water Mark高水位offset 1这个高水位offset表示的就是最新的一条所有follower都同步完成的消息
partition中最开始的一条数据的offset是base offset
LEO和HW分别是干什么的呢
LEO很重要的一个功能是负责用来更新HW的就是如果leader和follower的LEO同步了此时HW就可以更新
所有对于消费者来说他只能看到base offset到HW offset之间的数据因为只有这之间的数据才表明是所有follower都同步完成的这些数据叫做“已提交”的也就是committed是可以被消费到的
HW offset到LEO之间的数据是“未提交的”这时候消费者是看不到的
HW offset表示的是当前已经提交的数据offsetLEO表示的是下一个要写入的数据的offset
8. 深入探究Leader与Follower上的LEO是如何更新的 首先leader接收到数据字后就会更新自己的LEO值
接着follower会不断的向leader发送fetch请求同步数据然后每次一条数据同步到follower之后他的LEO就会更新同时leader发送数据给follower的时候在leader端会维护所有follower的LEO值
follower发送fetch请求给leader的时候会带上自己的LEO值然后leader每次收到一个fetch请求就会更新自己维护的每个follower的LEO值
所以这里大家要知道的是leader上是会保存所有follower的LEO值的这个是非常关键和核心的一点
9. 深入探究Leader与Follower上的高水位offset是如何更新的 每次leader发送数据给follower的时候都会发送自己的HW值然后follower获取到leader HW之后就会跟自己的LEO比较一下取里面小的那个值作为自己的HW值换句话说如果follower的LEO比leader HW大了那么follower的HW就是leader HW
但是如果follower的LEO比leader HW小说明自己明显落后于leader那么follower的HW就是自己的LEO值
然后leader上的HW就很明显了那就是主要是他在接收follower的fetch请求的时候就会在更新自己维护的所有follower的LEO之后判断一下当前自己的LEO是否跟所有follower都保持一致那么就会自动更新自己的HW值
这个leader partition的HW值代表了从这个partition的哪个offset之前可以被消费数据
10. 用真实场景图解剖析Leader与Follower的LEO与高水位如何更新 假设leader收到第一条数据此时leader LEO 1HW 0因为他发现其他follower的LEO也是0所以HW必须是0
接着follower来发送fetch请求给leader同步数据带过去follower的LEO 0所以leader上维护的follower LEO 0更新了一下此时发现follower的LEO还是0所以leader的HW继续是0
接着leader发送一条数据给follower这里带上了leader的HW 0因为发现leader的HW 0此时follower LEO更新为1但是follower HW 0取leader HW
接着下次follower再次发送fetch请求给leader的时候就会带上自己的LEO 1leader更新自己维护的follower LEO 1此时发现follower跟自己的LEO同步了那么leader的HW更新为1
接着leader发送给follower的数据里包含了HW 1此时follower发现leader HW 1自己的LEO 1此时follower的HW有更新为1
5个数据全部都要往前推进更新需要2次请求第一次请求是仅仅是更新两边的LEO第二次请求是更新另外leader管理的follower LEO以及两个HW
11. 高水位机制可能导致leader切换时发生数据丢失问题
基于之前说的高水位机制可能会导致一些问题比如数据丢失
假如说生产者的min.insync.replicas设置为1这个就会导致说生产者发送消息给leaderleader写入log成功后生产者就会认为写成功了此时假设生产者发送了两条数据给leaderleader写成功了
此时leader的LEO 1HW 0因为follower还没同步HW肯定是0
接着follower发送fetch请求此时leader发现follower LEO 0所以HW还是0给follower带回去的HW也是0然后follower开始同步数据也写入了两条数据自己的LEO 1但是HW 0因为leader HW为0
接着follower再次发送fetch请求过来自己的LEO 1leader发现自己LEO 1follower LEO 1所以HW更新为1同时会把HW 1带回给follower但是此时follower还没更新HW的时候HW还是0
这个时候假如说follower机器宕机了重启机器之后follower的LEO会自动被调整为0因为会依据HW来调整LEO而且自己的那两条数据会被从日志文件里删除数据就没了
这个时候如果leader宕机就会选举follower为leader此时HW 0接着leader那台机器被重启后作为follower这个follower会从leader同步HW是0此时会截断自己的日志删除两条数据
这种场景就会导致数据的丢失
非常极端的一个场景数据可能会莫名其妙的丢失
12. 高水位机制可能导致leader切换时发生数据不一致问题 假设min.insync.replicas 1那么只要leader写入成功生产者而就会认为写入成功
如果leader写入了两条数据但是follower才同步了一条数据第二条数据还没同步假设这个时候leader HW 2follower HW 1因为follower LEO小于leader HW所以follower HW取自己的LEO
这个时候如果leader挂掉切换follower变成leader此时HW 1就一条数据然后生产者又发了一条数据给新leader此时HW变为2但是第二条数据是新的数据。接着老leader重启变为follower这个时候发现两者的HW都是2
所以他们俩就会继续运行了
这个时候他们俩数据是不一致的本来合理的应该是新的follower要删掉自己原来的第二条数据跟新leader同步的让他们俩的数据一致但是因为依赖HW发现一样所以就不会截断数据了
13. Kafka 0.11.x版本引入leader epoch机制解决高水位机制弊端 所谓的leader epoch大致理解为每个leader的版本号以及自己是从哪个offset开始写数据的类似[epoch 0, offset 0]这个就是epoch是版本号的意思接着的话按照之前的那个故障场景
假如说follower先宕机再重启他会找leader继续同步最新的数据更新自己的LEO和HW不会截断数据因为他会看看自己这里有没有[epoch, offset]对如果有的话除非是自己的offset大于了leader的offset才会截断自己的数据
而且人家leader的最新offset 1自己的offset 0明显自己落后于人家有什么资格去截断数据呢对不对就是这个道理。而且还会去从leader同步最新的数据过来此时自己跟Leader数据一致。
如果此时leader宕机切换到follower上此时就会更新自己的[epoch 1, offset 2]意思是自己的leader版本号是epoch 1自己从offset 2开始写数据的
然后接着老leader恢复变为follower从新leader看一下epoch跟自己对比人家offset 2自己的offset 0也不需要做任何数据截断直接同步人家数据就可以了
然后针对数据不一致的场景如果说老leader恢复之后作为follower从新leader看到[epoch 1, offset 1]此时会发现自己的offset也是1但是人家新leader是从offset 1开始写的自己的offset 1怎么已经有数据了呢
此时就会截断掉自己一条数据然后跟人家同步保持数据一致
14. Kafka为Partition维护ISR列表的底层机制是如何设计的
很多公司比较常用的一个kafka的版本是0.8.2.x系列这个系列的版本是非常经典的在过去几年相当大比例的公司都是用这个版本的kafka。当然现在很多公司开始用更新版本的kafka了就是0.9.x或者是1.x系列的
我们先说说在0.9.x之前的版本里这个kafka到底是如何维护ISR列表的什么样的follower才有资格放到ISR列表里呢
在之前的版本里有一个核心的参数replica.lag.max.messages。这个参数就规定了follower如果落后leader的消息数量超过了这个参数指定的数量之后就会认为follower是out-of-sync就会从ISR列表里移除了
咱们来举个例子好了假设一个partition有3个副本其中一个leader两个follower然后replica.lag.max.messages 3刚开始的时候leader和follower都有3条数据此时HW和LEO都是offset 2的位置大家都同步上来了
现在来了一条数据leader和其中一个follower都写入了但是另外一个follower因为自身所在机器性能突然降低导致没及时去同步数据follower所在机器的网络负载、内存负载、磁盘负载过高导致整体性能下降了此时leader partition的HW还是offset 2的位置没动但是LEO变成了offset 3的位置
依托LEO来更新ISR的话在每个follower不断的发送Fetch请求过来的时候就会判断leader和follower的LEO相差了多少如果差的数量超过了replica.lag.max.messages参数设置的一个阈值之后就会把follower给踢出ISR列表
但是这个时候第二个follower的LEO就落后了leader才1个offset还没到replica.lag.max.messages 3所以第二个follower实际上还在ISR列表里只不过刚才那条消息没有算“提交的”在HW外面所以消费者是读不到的
而且这个时候生产者写数据的时候如果默认值是要求必须同步所有follower才算写成功的可能这个时候会导致生产者一直卡在那儿认为自己还没写成功这个是有可能的
一共有3个副本1个leaderr2个是follower此时其中一个follower落后被ISR踢掉了ISR里还有2个副本此时一个leader和另外一个follower都同步成功了此时就可以让那些卡住的生产者就可以返回认为写数据就成功了
min.sync.replicas 2ack -1生产者要求你必须要有2个副本在isr里才可以写此外必须isr里的副本全部都接受到数据才可以算写入成功了一旦说你的isr副本里面少于2了其实还是可能会导致你生产数据被卡住的
假设这个时候第二个follower fullgc持续了几百毫秒然后结束了接着从leader同步了那条数据此时大家LEO都一样而且leader发现所有follower都同步了这条数据leader就会把HW推进一位HW变成offset 3
这个时候消费者就可以读到这条在HW范围内的数据了而且生产者认为写成功了
但是要是此时follower fullgc一直持续了好几秒钟此时其他的生产者一直在发送数据过来leader和第一个follower的LEO又推进了2位LEO offset 5但是HW还是停留在offset 2这个时候HW后面的数据都是消费不了的而且HW后面的那几条数据的生产者可能都会认为写未成功
现在导致第二个follower的LEO跟leader的LEO差距超过3了此时触发阈值follower认为是out-of-sync就会从ISR列表里移除了
一旦第二个follower从ISR列表里移除了世界清静了此时ISR列表里就leader和第一个follower两个副本了此时leader和第一个follower的LEO都是offset 5是同步的leader就会把HW推进到offset 5此时消费者就可以消费全部数据了生产者也认为他们的写操作成功了
那如果第二个follower后来他的fullgc结束了开始大力追赶leader的数据慢慢LEO又控制在replica.lag.max.messages限定的范围内了此时follower会重新加回到ISR列表里去
上面就是ISR的工作原理和机制一般导致follower跟不上的情况主要就是以下三种
1follower所在机器的性能变差比如说网络负载过高IO负载过高CPU负载过高机器负载过高都可能导致机器性能变差同步 过慢这个时候就可能导致某个follower的LEO一直跟不上leader就从ISR列表里移除了
我们生产环境遇到的一些问题kafka机器层面某台机器磁盘坏了物理机的磁盘有故障写入性能特别差此时就会导致followerCPU负载太高了线程间的切换太频繁了CPU忙不过来了网卡被其他的程序给打满了就导致网络传输的速度特别慢
2follower所在的broker进程卡顿常见的就是fullgc问题
kafka自己本身对jvm的使用是很有限的生产集群部署的时候他主要是接收到数据直接写本地磁盘写入os cache他一般不怎么在自己的内存里维护过多的数据主要是依托os cache缓存来提高读和写的性能的
3kafka是支持动态调节副本数量的如果动态增加了partition的副本就会增加新的follower此时新的follower会拼命从leader上同步数据但是这个是需要过程的所以此时需要等待一段时间才能跟leader同步
replica.lag.max.messages主要是解决第一种情况的还有一个replica.lag.time.max.ms是解决第二种情况的比如设置为500ms那么如果在500ms内follower没法送请求找leader来同步数据说明他可能在fullgc此时就会从ISR里移除
15. Kafka 0.8.2.x版本的ISR机制在生产环境有什么缺陷
之前说的那套ISR机制是kafka 0.8.x系列的机制其实是有缺陷的那个参数默认的值是4000也就是follower落后4000条数据就认为是out-of-sync但是这里有一个问题就是这个数字是固定死的
如果说现在生产端突然之间涌入几万条数据呢是不是有可能leader瞬间刚接收到几万条消息然后所有follower还没来得及同步过去此时所有follower都会被踢出ISR列表然后同步了之后再回到ISR列表
所以这种依靠固定参数判断的机制会导致可能在系统高峰时期follower会频繁的踢出ISR列表再回到ISR列表这种完全无意义的事情
一般来说在kafka 0.8.2.x系列版本上生产的时候一遍都会把这个ISR落后 判定阈值设置的大一些避免上述的情况出现你可以设置个几万10万4000如果你公司里没那么大的高峰并发量每秒就是几千的并发那就没问题了
16. Kafka 0.9x版本之后的ISR机制做了哪些优化适应生产环境
kafka 0.9.x之后去掉了原来的replica.lag.max.messages参数引入了一个新的replica.lag.time.max.ms参数默认值是10秒这个就不按照落后的条数来判断了而是说如果某个follower的LEO一直落后leader超过了10秒那么才判定这个follower是out-of-sync的
这样假如说线上出现了流量洪峰一下子导致几个follower都落后了不少数据但是只要尽快追上来在10秒内别一直落后就不会认为是out-of-sync这个机制在线上实践会发现效果要好多了
三、稀疏索引文件底层原理
1. 深入看看Kafka在磁盘上是如何采用分段机制保存日志的
每个分区对应的目录就是“topic-分区号”的格式比如说有个topic叫做“order-topic”那么假设他有3个分区每个分区在一台机器上那么3台机器上分别会有3个目录“order-topic-0”“order-topic-1”“order-topic-2”
每个分区里面就是很多的log segment file也就是日志段文件每个分区的数据会被拆分为多个段放在多个文件里每个文件还有自己的索引文件大概格式可能如下所示
00000000000000000000.index 00000000000000000000.log 00000000000000000000.timeindex
00000000000005367851.index 00000000000005367851.log 00000000000005367851.timeindex
00000000000009936472.index 00000000000009936472.log 00000000000009936472.timeindex
这个9936472之类的数字就是代表了这个日志段文件里包含的起始offset也就说明这个分区里至少都写入了接近1000万条数据了
kafka broker有一个参数log.segment.bytes限定了每个日志段文件的大小最大就是1GB一个日志段文件满了就自动开一个新的日志段文件来写入避免单个文件过大影响文件的读写性能这个过程叫做log rolling
正在被写入的那个日志段文件叫做active log segment
2. 引入索引文件之后如何基于二分查找快速定位数据
日志段文件.log文件会对应一个.index和.timeindex两个索引文件
kafka在写入日志文件的时候同时会写索引文件就是.index和.timeindex一个是位移索引一个是时间戳索引是两种索引
默认情况下有个参数log.index.interval.bytes限定了在日志文件写入多少数据就要在索引文件写一条索引默认是4KB写4kb的数据然后在索引里写一条索引所以索引本身是稀疏格式的索引不是每条数据对应一条索引的
而且索引文件里的数据是按照位移和时间戳升序排序的所以kafka在查找索引的时候会用二分查找时间复杂度是O(logN)找到索引就可以在.log文件里定位到数据了
.index
44576 物理文件.log位置 57976 物理文件.log位置 64352 物理文件.log位置
offset 58892 57976这条数据对应的.log文件的位置
接着就可以从.log文件里的57976这条数对应的位置开始查找去找offset 58892这条数据在.log里的完整数据
.timeindex是时间戳索引文件如果要查找某段时间范围内的时间先在这个文件里二分查找找到offset然后再去.index里根据offset二分查找找对应的.log文件里的位置最后就去.log文件里查找对应的数据
3. 磁盘上的日志文件是按照什么策略定期清理腾出空间的
大家可以想不可能说每天涌入的数据都一直留存在磁盘上本质kafka是一个流式数据的中间件不需要跟离线存储系统一样保存全量的大数据所以kafka是会定期清理掉数据的这里有几个清理策略
kafka默认是保留最近7天的数据每天都会把7天以前的数据给清理掉包括.log、.index和.timeindex几个文件log.retention.hours参数可以自己设置数据要保留多少天你可以根据自己线上的场景来判断一下
只要你的数据保留在kafka里你随时可以通过offset的指定随时可以从kafka楼出来几天之前的数据数据回放一遍下游的数据有多么的重要如果是特别核心的数据在kafka这个层面可以保留7天甚至是15天的数据
下游的消费者消费了数据之后数据丢失了你需要从kafka里楼出来3天前的数据重新来回放处理一遍
在大数据的实时分析的项目里其实就会涉及到这个东西的一个使用如果你今天实时分析的一些数据出错了此时你就需要把过去几天的数据重新楼出来回放一遍重新来算一遍。实时数据分析的结果和hadoop离线分析的结果做一个比对
你每天都会从kafka里搂出来几天前的数据算一下跟离线数据的结果做一个比对
kafka broker会在后台启动线程异步的进行日志清理的工作
四、Reactor网络模型
1. Kafka是如何自定义TCP之上的通信协议以及使用长连接通信的
kafka的通信主要发生于生产端和broker之间broker和消费端之间broker和broker之间这些通信都是基于TCP协议进行的大家自己看看网络课程底层基于TCP连接和传输数据应用层的协议是Kafka自己自定义的
所谓自定义协议就是定好传输数据的格式请求格式、响应格式这样大家就可以统一按照规定好的格式来封装、传输和解析数据了
生产端发送数据到kafka broker来此时发送的数据是这样子的
sent data: 一大串数据
kafka broker直接就从sent data:截取一大段数据就可以用了如果你没有自定义一套完整的协议是没办法进行通信的
http协议生产端也可以发送http协议的数据给kafka brokerhttp请求http响应。应用层的协议规定了数据请求和响应的种种复杂的格式大家全部按照这个格式和规范来走不要乱来
request v1.1
isCache: true
一大串数据
对于生产端和broker消费端和broker来说还会基于TCP建立长连接具体见网络课程也就是维护一批长连接然后通过固定的连接不断的传输数据避免频繁的创建连接和销毁连接的开销
broker端会构造一个请求队列然后不停的获取请求放入队列后台再搞一堆的线程来获取请求进行处理
2. Broker是如何基于Reactor模式进行多路复用请求处理的 每个broker上都有一个acceptor线程和很多个processor线程可以用num.network.threads参数设置processor线程的数量默认是3client跟一个broker之间只会创建一个socket长连接他会复用
然后broker就用一个acceptor来监听每个socket连接的接入分配这个socket连接给一个processor线程processor线程负责处理这个socket连接监听socket连接的数据传输以及客户端发送过来的请求acceptor线程会不停的轮询各个processor来分配接入的socket连接
proessor需要处理多个客户端的socket连接就是通过java nio的selector多路复用思想来实现的用一个selector监听各个socket连接看其是否有请求发送过来这样一个processor就可以处理多个客户端的socket连接了
processor线程会负责把请求放入一个broker全局唯一的请求队列默认大小是500是queued.max.requests参数控制的所以那几个processor会不停的把请求放入这个请求队列中
接着就是一个KafkaRequestHandler线程池负责不停的从请求队列中获取请求来处理这个线程池大小默认是8个由num.io.threads参数来控制处理完请求后的响应会放入每个processor自己的响应队列里
每个processor其实就是负责对多个socket连接不停的监听其传入的请求放入请求队列让KafkaRequestHandler来处理然后会监听自己的响应队列把响应拿出来通过socket连接发送回客户端
五、Controller选举与故障转移原理剖析
1. 如何对Kafka集群进行整体控制Controller是什么东西
不知道大家有没有思考过一个问题就是Kafka集群中某个broker宕机之后是谁负责感知到他的宕机以及负责进行Leader Partition的选举如果你在Kafka集群里新加入了一些机器此时谁来负责把集群里的数据进行负载均衡的迁移
包括你的kafka集群的各种元数据比如说每台机器上有哪些partition谁是leader谁是follower是谁来管理的如果你要删除一个topic那么背后的各种partition如何删除是谁来控制
还有就是比如kafka集群扩容加入一个新的broker是谁负责监听这个broker的加入如果某个broker崩溃了是谁负责监听这个broker崩溃
这里就需要一个kafka集群的总控组件Controller。他负责管理整个kafka集群范围内的各种东西
2. 如何基于Zookeeper实现Controller的选举以及故障转移
在kafka集群启动的时候会自动选举一台broker出来承担controller的责任然后负责管理整个集群这个过程就是说集群中每个broker都会尝试在zk上创建一个/controller临时节点
zk的一些基础知识和临时节点是什么百度一下zookeeper入门
但是zk会保证只有一个人可以创建成功这个人就是所谓controller角色
一旦controller所在broker宕机了此时临时节点消失集群里其他broker会一直监听这个临时节点发现临时节点消失了就争抢再次创建临时节点保证有一台新的broker会成为controller角色
3. 创建Topic时Kafka Controller是如何完成Leader选举的呢
如果你现在创建一个Topic肯定会分配几个Partition每个partition还会指定几个副本这个时候创建的过程中就会在zookeeper中注册对应的topic的元数据包括他有几个partition每个partition有几个副本每个partition副本的状态此时状态都是NonExistentReplica
然后Kafka Controller本质其实是会监听zk上的数据变更的所以此时就会感知到topic变动接着会从zk中加载所有partition副本到内存里把这些partition副本状态变更为NewReplica然后选择的第一个副本作为leader其他都是follower并且把他们都放到partition的ISR列表中
比如说你创建一topicorder_topic3个partition每个partition有2个副本写入zk里去
/topics/order_topic
partitions 3, replica_factor 2
[partition0_1, partition0_2] [partition1_1, partition1_2] [partition2_1, partition2_2]
从每个parititon的副本列表中取出来第一个作为leader其他的就是follower把这些东西给放到partition对应的ISR列表里去
每个partition的副本在哪台机器上呢会做一个均匀的分配把partition分散在各个机器上面通过算法来保证尽可能把每个leader partition均匀分配在各个机器上读写请求流量都是打在leader partition上的
同时还会设置整个Partition的状态OnlinePartition
接着Controller会把这个partition和副本所有的信息包括谁是leader谁是followerISR列表都发送给所有broker让他们知晓在kafka集群里controller负责集群的整体控制但是每个broker都有一份元数据
4. 删除Topic时又是如何通过Kafka Controller控制数据清理
如果你要是删除某个Topic的话Controller会发送请求给这个Topic所有Partition所在的broker机器通知设置所有Partition副本的状态为OfflineReplica也就是让副本全部下线接着Controller接续将全部副本状态变为ReplicaDeletionStarted
然后Controller还要发送请求给broker把各个partition副本的数据给删了其实对应的就是删除磁盘上的那些文件删除成功之后副本状态变为ReplicaDeletionSuccessful接着再变为NonExistentReplica
而且还会设置分区状态为Offline
5. Kafka Controller是如何基于ZK感知Broker的上线以及崩溃的