美妆网站模版,资质升级业绩备案在哪个网站做,网站域名变了怎么查,网站开发公司oaRocketMQ 06 消息存储机制
消息存储
image-20200228140910086.png
磁盘存储速度问题
省去DB层提高性能
RocketMQ 使用文件系统持久化消息。性能要比使用DB产品要高。
M.2 NVME协议磁盘存储
文件写入速度 顺序读写#xff1a;3G左右 随机读写2G
数据零拷贝技术
很多使…RocketMQ 06 消息存储机制
消息存储
image-20200228140910086.png
磁盘存储速度问题
省去DB层提高性能
RocketMQ 使用文件系统持久化消息。性能要比使用DB产品要高。
M.2 NVME协议磁盘存储
文件写入速度 顺序读写3G左右 随机读写2G
数据零拷贝技术
很多使用文件系统存储的高性能中间件都是用了零拷贝技术来发送文件数据比如Nginx
内存映射MappedByteBuffer API
MappedByteBuffer使用虚拟内存因此分配(map)的内存大小不受JVM的-Xmx参数限制但是也是有大小限制的。如果当文件超出1.5G限制时可以通过position参数重新map文件后面的内容。MappedByteBuffer在处理大文件时的确性能很高但也存在一些问题如内存占用、文件关闭不确定被其打开的文件只有在垃圾回收的才会被关闭而且这个时间点是不确定的。 javadoc中也提到A mapped byte buffer and the file mapping that it represents remain* valid until the buffer itself is garbage-collected.
所以为了使用零拷贝技术RocketMQ的文件存储大小默认每个1G超过1G会重新建立一个新文件
存储结构
CommitLog
存储消息的详细内容按照消息收到的顺序所有消息都存储在一起。每个消息存储后都会有一个offset代表在commitLog中的偏移量。
默认配置 MessageStoreConfig
核心方法
putMessage 写入消息
CommitLog内部结构
MappedFileQueue - MappedFile
MappedFile
默认大小 1G // CommitLog file size,default is 1Gprivate int mappedFileSizeCommitLog 1024 * 1024 * 1024;ConsumerQueue
通过消息偏移量建立的消息索引
针对每个Topic创建消费逻辑队列存储位置信息用来快速定位CommitLog中的数据位置
启动后会被加载到内存中加快查找消息速度
以Topic作为文件名称每个Topic下又以queue id作为文件夹分组
默认大小
// ConsumeQueue extend file size, 48M
private int mappedFileSizeConsumeQueueExt 48 * 1024 * 1024;indexFile
消息的Key和时间戳索引
存储路径配置
默认文件会存储在家目录下/root/store/
config
以json格式存储消费信息
consumerFilter.json
消息过滤器
consumerOffset.json
客户端的消费进度
delayOffset.json
延迟消息进度
subscriptionGroup.json
group的订阅数据
topics.json
Topic的配置信息
刷盘机制
在CommitLog初始化时判断配置文件 加载相应的service if (FlushDiskType.SYNC_FLUSH defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {this.flushCommitLogService new GroupCommitService();} else {this.flushCommitLogService new FlushRealTimeService();}写入时消息会不会分割到两个MapedFile中 // Determines whether there is sufficient free spaceif ((msgLen END_FILE_MIN_BLANK_LENGTH) maxBlank) {this.resetByteBuffer(this.msgStoreItemMemory, maxBlank);// 1 TOTALSIZEthis.msgStoreItemMemory.putInt(maxBlank);// 2 MAGICCODE不够放下一个消息的时候用魔术字符代替this.msgStoreItemMemory.putInt(CommitLog.BLANK_MAGIC_CODE);// 3 The remaining space may be any value// Here the length of the specially set maxBlankfinal long beginTimeMills CommitLog.this.defaultMessageStore.now();byteBuffer.put(this.msgStoreItemMemory.array(), 0, maxBlank);return new AppendMessageResult(AppendMessageStatus.END_OF_FILE, wroteOffset, maxBlank, msgId, msgInner.getStoreTimestamp(),queueOffset, CommitLog.this.defaultMessageStore.now() - beginTimeMills);}同步刷盘
消息被Broker写入磁盘后再给producer响应
异步刷盘
消息被Broker写入内存后立即给producer响应当内存中消息堆积到一定程度的时候写入磁盘持久化。
配置选项