怎么制作网站应用,大型网站开发价格,重庆市工程建筑造价信息网,电商系统平台Kafka 工作流程 Kafka 中消息是以 topic 进行分类的#xff0c;生产者生产消息#xff0c;消费者消费消息#xff0c;都是面向 topic的。 topic 是逻辑上的概念#xff0c;而 partition 是物理上的概念#xff0c;每个 partition 对应于一个 log 文件#xff0c;该 log…Kafka 工作流程 Kafka 中消息是以 topic 进行分类的生产者生产消息消费者消费消息都是面向 topic的。 topic 是逻辑上的概念而 partition 是物理上的概念每个 partition 对应于一个 log 文件该 log 文件中存储的就是 producer 生产的数据。Producer 生产的数据会被不断追加到该log 文件末端且每条数据都有自己的 offset。消费者组中的每个消费者都会实时记录自己消费到了哪个 offset以便出错恢复时从上次的位置继续消费
Kafka文件存储机制 由于生产者生产的消息会不断追加到 log 文件末尾为防止 log 文件过大导致数据定位效率低下Kafka 采取了分片和索引机制将每个 partition 分为多个 segment。每个 segment对应两个文件——“.index”文件和“.log”文件。这些文件位于一个文件夹下该文件夹的命名规则为topic 名称分区序号。例如first 这个 topic 有三个分区则其对应的文件夹为 first0,first-1,first-2。
[rootbackup01 logs]# cd /usr/local/hadoop/kafka/kafka_2.12-2.4.1/logs/first-0
[rootbackup01 first-0]# ll
total 8
-rw-r--r--. 1 root root 10485760 Mar 29 16:32 00000000000000000000.index
-rw-r--r--. 1 root root 146 Mar 29 17:34 00000000000000000000.log
-rw-r--r--. 1 root root 10485756 Mar 29 16:32 00000000000000000000.timeindex
-rw-r--r--. 1 root root 8 Mar 29 16:32 leader-epoch-checkpoint
[rootbackup01 first-0]# 00000000000000000000.index
00000000000000000000.log
00000000000000170410.index
00000000000000170410.log
00000000000000239430.index
00000000000000239430.log
index 和 log 文件以当前 segment 的第一条消息的 offset 命名。下图为 index 文件和 log 文件的结构示意图 “.index”文件存储大量的索引信息“.log”文件存储大量的数据索引文件中的元 数据指向对应数据文件中 message 的物理偏移地址。 Kafka中的Message是以topic为基本单位组织的不同的topic之间是相互独立的。每个topic又可以分成几个不同的partition(每个topic有几个partition是在创建topic时指定的)每个partition存储一部分Message。借用官方的一张图可以直观地看到topic和partition的关系。 partition是以文件的形式存储在文件系统中比如创建了一个名为page_visits的topic其有5个partition那么在Kafka的数据目录中(由配置文件中的log.dirs指定的)中就有这样5个目录: page_visits-0 page_visits-1page_visits-2page_visits-3page_visits-4其命名规则为topic_name-partition_id里面存储的分别就是这5个partition的数据。
接下来本文将分析partition目录中的文件的存储格式和相关的代码所在的位置。
Partition的数据文件
Partition中的每条Message由offset来表示它在这个partition中的偏移量这个offset不是该Message在partition数据文件中的实际存储位置而是逻辑上一个值它唯一确定了partition中的一条Message。因此可以认为offset是partition中Message的id。partition中的每条Message包含了以下三个属性 offset MessageSize data 其中offset为long型MessageSize为int32表示data有多大data为message的具体内容。它的格式和Kafka通讯协议中介绍的MessageSet格式是一致。
Partition的数据文件则包含了若干条上述格式的Message按offset由小到大排列在一起。它的实现类为FileMessageSet类图如下
它的主要方法如下
append: 把给定的ByteBufferMessageSet中的Message写入到这个数据文件中。 searchFor: 从指定的startingPosition开始搜索找到第一个Message其offset是大于或者等于指定的offset并返回其在文件中的位置Position。它的实现方式是从startingPosition开始读取12个字节分别是当前MessageSet的offset和size。如果当前offset小于指定的offset那么将position向后移动LogOverHeadMessageSize其中LogOverHead为offsetmessagesize为12个字节。 read准确名字应该是slice它截取其中一部分返回一个新的FileMessageSet。它不保证截取的位置数据的完整性。 sizeInBytes: 表示这个FileMessageSet占有了多少字节的空间。 truncateTo: 把这个文件截断这个方法不保证截断位置的Message的完整性。 readInto: 从指定的相对位置开始把文件的内容读取到对应的ByteBuffer中。 我们来思考一下如果一个partition只有一个数据文件会怎么样
新数据是添加在文件末尾调用FileMessageSet的append方法不论文件数据文件有多大这个操作永远都是O(1)的。 查找某个offset的Message调用FileMessageSet的searchFor方法是顺序查找的。因此如果数据文件很大的话查找的效率就低。 那Kafka是如何解决查找效率的的问题呢有两大法宝1) 分段 2) 索引。
数据文件的分段 Kafka解决查询效率的手段之一是将数据文件分段比如有100条Message它们的offset是从0到99。假设将数据文件分成5段第一段为0-19第二段为20-39以此类推每段放在一个单独的数据文件里面数据文件以该段中最小的offset命名。这样在查找指定offset的Message的时候用二分查找就可以定位到该Message在哪个段中。
为数据文件建索引 数据文件分段使得可以在一个较小的数据文件中查找对应offset的Message了但是这依然需要顺序扫描才能找到对应offset的Message。为了进一步提高查找的效率Kafka为每个分段后的数据文件建立了索引文件文件名与数据文件的名字是一样的只是文件扩展名为.index。 索引文件中包含若干个索引条目每个条目表示数据文件中一条Message的索引。索引包含两个部分均为4个字节的数字分别为相对offset和position。
相对offset因为数据文件分段以后每个数据文件的起始offset不为0相对offset表示这条Message相对于其所属数据文件中最小的offset的大小。举例分段后的一个数据文件的offset是从20开始那么offset为25的Message在index文件中的相对offset就是25-20 5。存储相对offset可以减小索引文件占用的空间。 position表示该条Message在数据文件中的绝对位置。只要打开文件并移动文件指针到这个position就可以读取对应的Message了。 index文件中并没有为数据文件中的每条Message建立索引而是采用了稀疏存储的方式每隔一定字节的数据建立一条索引。这样避免了索引文件占用过多的空间从而可以将索引文件保留在内存中。但缺点是没有建立索引的Message也不能一次定位到其在数据文件的位置从而需要做一次顺序扫描但是这次顺序扫描的范围就很小了。
在Kafka中索引文件的实现类为OffsetIndex它的类图如下 主要的方法有
append方法添加一对offset和position到index文件中这里的offset将会被转成相对的offset。 lookup, 用二分查找的方式去查找小于或等于给定offset的最大的那个offset 小结 我们以几张图来总结一下Message是如何在Kafka中存储的以及如何查找指定offset的Message的。
Message是按照topic来组织每个topic可以分成多个的partition比如有5个partition的名为为page_visits的topic的目录结构为 partition是分段的每个段叫LogSegment包括了一个数据文件和一个索引文件下图是某个partition目录下的文件 可以看到这个partition有4个LogSegment。
借用博主lizhitao博客上的一张图来展示是如何查找Message的。 比如要查找绝对offset为7的Message
首先是用二分查找确定它是在哪个LogSegment中自然是在第一个Segment中。 打开这个Segment的index文件也是用二分查找找到offset小于或者等于指定offset的索引条目中最大的那个offset。自然offset为6的那个索引是我们要找的通过索引文件我们知道offset为6的Message在数据文件中的位置为9807。 打开数据文件从位置为9807的那个地方开始顺序扫描直到找到offset为7的那条Message。 这套机制是建立在offset是有序的。索引文件被映射到内存中所以查找的速度还是很快的。
一句话Kafka的Message存储采用了分区(partition)分段(LogSegment)和稀疏索引这几个手段来达到了高效性。