当前位置: 首页 > news >正文

用静态网站更新做网站需要用到哪些开发软件

用静态网站更新,做网站需要用到哪些开发软件,天津智能网站建设方案,网站sem怎么做本文基于RocketMQ 4.6.0进行源码分析 一. 存储概要设计 RocketMQ存储的文件主要包括CommitLog文件、ConsumeQueue文件、Index文件。RocketMQ将所有topic的消息存储在同一个文件中#xff0c;确保消息发送时按顺序写文件#xff0c;尽最大的能力确保消息发送的高性能与高吞吐… 本文基于RocketMQ 4.6.0进行源码分析 一. 存储概要设计 RocketMQ存储的文件主要包括CommitLog文件、ConsumeQueue文件、Index文件。RocketMQ将所有topic的消息存储在同一个文件中确保消息发送时按顺序写文件尽最大的能力确保消息发送的高性能与高吞吐量。因为消息中间件一般是基于消息主题的订阅机制所以给按照消息主题检索消息带来了极大的不便。为了提高消息消费的效率RocketMQ引入了ConsumeQueue消息消费队列文件每个topic包含多个消息消费队列每一个消息队列有一个消息文件。Index索引文件的设计理念是为了加速消息的检索性能根据消息的属性从CommitLog文件中快速检索消息。 CommitLog 消息存储文件所有topic的消息都存储在CommitLog 文件中。 ConsumeQueue 消息消费队列消息到达CommitLog 文件后将异步转发到消息消费队列供消息消费者消费。 IndexFile 消息索引文件主要存储消息Key 与Offset 的对应关系。 1.1 CommitLog RocketMQ 在消息写入过程中追求极致的磁盘顺序写所有topic的消息全部写入一个文件即 CommitLog 文件。所有消息按抵达顺序依次 追加到 CommitLog 文件中消息一旦写入不支持修改。CommitLog 文件默认创建的大小为 1GB。 一个文件为 1GB 大小也即 1024 * 1024 * 1024 1073741824 字节CommitLog 每个文件的命名是按照总的字节偏移量来命名的。例如第一个文件偏移量为 0那么它的名字为 00000000000000000000当前这 1G 文件被存储满了之后就会创建下一个文件下一个文件的偏移量则为 1GB那么它的名字为 00000000001073741824以此类推。 默认情况下这些消息文件位于 $HOME/store/commitlog 目录下如下图所示: 基于文件编程与基于内存编程一个很大的不同是基于内存编程时我们有现成的数据结构例如List、HashMap对数据的读写非常方便那么一条一条消息存入CommitLog文件后该如何查找呢 正如关系型数据库会为每条数据引入一个ID字段基于文件编程也会为每条消息引入一个身份标志消息物理偏移量即消息存储在文件的起始位置。 正是有了物理偏移量的概念CommitLog文件的命名方式也是极具技巧性使用存储在该文件的第一条消息在整个CommitLog文件组中的偏移量来命名例如第一个CommitLog文件为0000000000000000000第二个CommitLog文件为00000000001073741824依次类推。 这样做的好处是给出任意一个消息的物理偏移量可以通过二分法进行查找快速定位这个文件的位置然后用消息物理偏移量减去所在文件的名称得到的差值就是在该文件中的绝对地址。 1.2 ConsumeQueue CommitlLog文件的设计理念是追求极致的消息存储性能但我们知道消息消费模型是基于主题订阅机制的即一个消费组是消费特定主题的消息。根据主题从CommitlLog文件中检索消息这绝不是一个好主意这样只能从文件的第一条消息逐条检索其性能可想而知为了解决基于topic的消息检索问题RocketMQ引入了ConsumeQueue文件。ConsumeQueue文件结构入下图 ConsumeQueue文件是消息消费队列文件是 CommitLog 文件基于topic的索引文件主要用于消费者根据 topic 消费消息其组织方式为 /topic/queue同一个队列中存在多个消息文件。ConsumeQueue 的设计极具技巧每个条目长度固定8字节CommitLog物理偏移量、4字节消息长度、8字节tag哈希码。这里不是存储tag的原始字符串而是存储哈希码目的是确保每个条目的长度固定可以使用访问类似数组下标的方式快速定位条目极大地提高了ConsumeQueue文件的读取性能。消息消费者根据topic、消息消费进度ConsumeQueue逻辑偏移量即第几个ConsumeQueue条目这样的消费进度去访问消息通过逻辑偏移量logicOffset×20即可找到该条目的起始偏移量ConsumeQueue文件中的偏移量然后读取该偏移量后20个字节即可得到一个条目无须遍历ConsumeQueue文件。 ConsumeQueue文件可以看作基于topic维度的CommitLog索引文件故ConsumeQueue文件夹的组织方式为topic/queue/file三层组织结构文件存储在 $HOME/store/consumequeue/{topic}/{queueId}/{fileName}单个文件由30万个条目组成每个文件大小约5.72MB。同样的单个ConsumeQueue文件写满后会继续写入下一个文件中。 1.3 Index RocketMQ与Kafka相比具有一个强大的优势就是支持按消息属性检索消息引入ConsumeQueue文件解决了基于topic查找消息的问题但如果想基于消息的某一个属性进行查找ConsumeQueue文件就无能为力了。故RocketMQ又引入了Index索引文件实现基于文件的哈希索引。Index文件的存储结构如下图所示。 Index文件基于物理磁盘文件实现哈希索引。Index文件由40字节的文件头、500万个哈希槽、2000万个Index条目组成每个哈希槽4字节、每个Index条目含有20个字节分别为4字节索引key的哈希码、8字节消息物理偏移量、4字节时间戳、4字节的前一个Index条目哈希冲突的链表结构。 1.4 内存映射 虽然基于磁盘的顺序写消息可以极大提高I/O的写效率但如果基于文件的存储采用常规的Java文件操作API例如FileOutputStream等将性能提升会很有限故RocketMQ又引入了内存映射将磁盘文件映射到内存中以操作内存的方式操作磁盘将性能又提升了一个档次。 在Java中可通过FileChannel的map方法创建内存映射文件。在Linux服务器中由该方法创建的文件使用的就是操作系统的页缓存pagecache。Linux操作系统中内存使用策略时会尽可能地利用机器的物理内存并常驻内存中即页缓存。在操作系统的内存不够的情况下采用缓存置换算法例如LRU将不常用的页缓存回收即操作系统会自动管理这部分内存。 如果RocketMQ Broker进程异常退出存储在页缓存中的数据并不会丢失操作系统会定时将页缓存中的数据持久化到磁盘实现数据安全可靠。不过如果是机器断电等异常情况存储在页缓存中的数据也有可能丢失。 1.5 TransientStorePool机制 RocketMQ 中的 TransientStorePool 机制是一种优化磁盘写入性能的技术主要应用于异步刷盘场景。这种机制主要是通过预先在堆外内存Direct Memory中分配一块固定大小的内存区域然后将消息数据首先写入堆外内存再由单独的线程负责把堆外内存中的数据批量地、按页对齐的方式写入到 MappedFile 中CommitRealTimeService线程也就是说无需每次写入数据时都进行系统调用从而提高写入效率。 以下是 RocketMQ 使用 TransientStorePool 的主要流程 预先在堆外内存中创建一个内存池即 TransientStorePool并初始化为一段连续的内存空间。当生产者发送消息时RocketMQ 先将消息写入到 TransientStorePool 中的堆外内存里。刷盘线程定时或者达到一定数量的消息后将堆外内存中的数据按页对齐的方式批量写入到 MappedFileMappedByteBuffer中。最后再由 MappedByteBuffer 进行真正的磁盘刷盘操作。 有了 TransientStorePool 的存在消息可以批量写入内存缓冲区RocketMQ 也就可以有效地控制何时以及如何将脏页Dirty Page即已修改但还未写入磁盘的内存页刷写到磁盘避免了操作系统自动进行的随机性、不可预测的脏页刷写操作从而提升了I/O性能特别是在大量写入请求的场景下。 值得一提的是使用TransientStorePool并非没有代价。因为需要额外的一次内存复制操作即从堆外内存复制到内存映射区域。但是在大多数情况下通过控制脏页刷写带来的性能提升相比于增加的内存复制开销更加明显。 并且开启 transientStorePool 机制后由于消息数据会先写入堆外内存然后由特定后台线程CommitRealTimeService将堆外内存中的修改 commit 到内存映射区域而这一步如果发生断电、服务宕机都会产生消息丢失。而普通的异步刷盘由于消息是直接写入内存映射区域所以服务宕机并不会丢失数据只有在服务器突然断电时才会丢失少量数据。 1.6 刷盘策略 有了顺序写和内存映射的加持RocketMQ的写入性能得到了极大的保证但凡事都有利弊引入了内存映射和页缓存机制消息会先写入页缓存此时消息并没有真正持久化到磁盘。那么Broker收到客户端的消息后是存储到页缓存中就直接返回成功还是要持久化到磁盘中才返回成功呢 这是一个“艰难”的选择是在性能与消息可靠性方面进行权衡。为此RocketMQ提供了三种策略同步刷盘、异步刷盘、异步刷盘缓冲区。 类型描述SYNC_FLUSH同步刷盘ASYNC_FLUSH transientStorePoolEnablefalse默认为false异步刷盘ASYNC_FLUSH transientStorePoolEnabletrue异步刷盘缓冲区 同步刷盘时只有消息被真正持久化到磁盘才会响应ACK可靠性非常高但是性能会受到较大影响适用于金融业务。异步刷盘时消息写入PageCache就会响应ACK然后由后台线程异步将PageCache里的内容持久化到磁盘降低了读写延迟提高了性能和吞吐量。服务宕机消息不丢失(操作系统会完成内存映射区域的刷盘)机器断电少量消息丢失。异步刷盘缓冲区消息先写入直接内存缓冲区然后由后台线程异步将缓冲区里的内容持久化到磁盘性能最好。但是最不可靠服务宕机和机器断电都会丢失消息。 1.7 文件恢复机制 我们知道RocketMQ主要的数据存储文件包括CommitLog、ConsumeQueue和Index而ConsumeQueue、Index文件是根据CommitLog文件异步构建的。既然是异步操作这两者之间的数据就不可能始终保持一致那么重启broker时需要如何恢复数据呢我们考虑如下异常场景。 消息采用同步刷盘方式写入CommitLog文件准备转发给ConsumeQueue文件时由于断电等异常导致存储失败。 在刷盘的时候突然记录了100MB消息准备将这100MB消息写入磁盘由于机器突然断电只写入50MB消息到CommitLog文件。 在RocketMQ存储目录下有一个检查点Checkpoint文件用于记录CommitLog等文件的刷盘点。但将数据写入CommitLog文件后才会将刷盘点记录到检查点文件中有可能在从刷盘点写入检查点文件前数据就丢失了。 在RocketMQ中有broker异常停止恢复和正常停止恢复两种场景。这两种场景的区别是定位从哪个文件开始恢复的逻辑不一样大致思路如下。 尝试恢复ConsumeQueue文件根据文件的存储格式8字节物理偏移量、4字节长度、8字节tag哈希码找到最后一条完整的消息格式所对应的物理偏移量用maxPhysical OfConsumequeue表示。尝试恢复CommitLog文件先通过文件的魔数判断该文件是否为ComitLog文件然后按照消息的存储格式寻找最后一条合格的消息拿到其物理偏移量如果CommitLog文件的有效偏移量小于ConsumeQueue文件存储的最大物理偏移量将会删除ConsumeQueue中多余的内容如果大于说明ConsuemQueue文件存储的内容少于CommitLog文件则会重推数据。 那么如何定位要恢复的文件呢 正常停止刷盘的情况下先从倒数第三个文件开始进行恢复然后按照消息的存储格式进行查找如果该文件中所有的消息都符合消息存储格式则继续查找下一个文件直到找到最后一条消息所在的位置。 异常停止刷盘的情况下RocketMQ会借助检查点文件即存储的刷盘点定位恢复的文件。刷盘点记录的是CommitLog、ConsuemQueue、Index文件最后的刷盘时间戳但并不是只认为该时间戳之前的消息是有效的超过这个时间戳之后的消息就是不可靠的。 异常停止刷盘时从最后一个文件开始寻找在寻找时读取该文件第一条消息的存储时间如果这个存储时间小于检查点文件中的刷盘时间就可以从这个文件开始恢复如果这个文件中第一条消息的存储时间大于刷盘点说明不能从这个文件开始恢复需要寻找上一个文件因为检查点文件中的刷盘点代表的是100%可靠的消息。 二. 存储文件组织与内存映射实现 RocketMQ通过使用内存映射文件来提高I/O访问性能无论是CommitLog、Consume-Queue还是Index单个文件都被设计为固定长度一个文件写满以后再创建新文件文件名就为该文件第一条消息对应的全局物理偏移量。 RocketMQ使用MappedFile、MappedFileQueue来封装存储文件。 2.1 MappedFileQueue映射文件队列 MappedFileQueue是MappedFile的管理容器MappedFileQueue对存储目录进行封装例如CommitLog文件的存储路径为${ROCKET_HOME}/store/commitlog/该目录下会存在多个内存映射文件MappedFile。MappedFileQueue结构如下 /*** RocketMQ通过使用内存映射文件来提高I/O访问性能无论是* CommitLog、Consume-Queue还是Index单个文件都被设计为固定长* 度一个文件写满以后再创建新文件文件名就为该文件第一条消息* 对应的全局物理偏移量。* RocketMQ使用MappedFile、MappedFileQueue来封装存储文件。** MappedFileQueue是MappedFile的管理容器MappedFileQueue对* 存储目录进行封装** 例如CommitLog文件的存储场景下存储路径为${ROCKET_HOME}/store/commitlog/* 该目录下会存在多个内存映射文件MappedFile*/ public class MappedFileQueue {private static final int DELETE_FILES_BATCH_MAX 10;/*** 存储目录*/private final String storePath;/*** 单个文件的存储大小*/private final int mappedFileSize;/*** MappedFile集合*/private final CopyOnWriteArrayListMappedFile mappedFiles new CopyOnWriteArrayListMappedFile();/*** 创建MappedFile服务类*/private final AllocateMappedFileService allocateMappedFileService;/*** 当前刷盘指针表示该指针之前的所有数据全部持久化到磁盘*/private long flushedWhere 0;/*** 当前数据提交指针内存中ByteBuffer当前的写指针该值大于、等于flushedWhere*/private long committedWhere 0;private volatile long storeTimestamp 0; }2.1.1 根据消息存储时间戳查找MappdFile /*** 根据消息存储时间戳查找MappdFile。从MappedFile列表中第一个* 文件开始查找找到第一个最后一次更新时间大于待查找时间戳的文* 件如果不存在则返回最后一个MappedFile* param timestamp* return*/public MappedFile getMappedFileByTime(final long timestamp) {Object[] mfs this.copyMappedFiles(0);if (null mfs)return null;for (int i 0; i mfs.length; i) {MappedFile mappedFile (MappedFile) mfs[i];if (mappedFile.getLastModifiedTimestamp() timestamp) {return mappedFile;}}return (MappedFile) mfs[mfs.length - 1];}根据消息存储时间戳查找MappdFile。从MappedFile列表中第一个文件开始查找找到第一个最后一次更新时间大于待查找时间戳的文件如果不存在则返回最后一个MappedFile。 2.1.2 根据消息偏移量offset查找MappedFile、 // org.apache.rocketmq.store.MappedFileQueue#findMappedFileByOffset(long, boolean) public MappedFile findMappedFileByOffset(final long offset, final boolean returnFirstOnNotFound) {try {MappedFile firstMappedFile this.getFirstMappedFile();MappedFile lastMappedFile this.getLastMappedFile();if (firstMappedFile ! null lastMappedFile ! null) {if (offset firstMappedFile.getFileFromOffset() || offset lastMappedFile.getFileFromOffset() this.mappedFileSize) {LOG_ERROR.warn(Offset not matched. Request offset: {}, firstOffset: {}, lastOffset: {}, mappedFileSize: {}, mappedFiles count: {},offset,firstMappedFile.getFileFromOffset(),lastMappedFile.getFileFromOffset() this.mappedFileSize,this.mappedFileSize,this.mappedFiles.size());} else {// 计算文件索引int index (int) ((offset / this.mappedFileSize) - (firstMappedFile.getFileFromOffset() / this.mappedFileSize));MappedFile targetFile null;try {targetFile this.mappedFiles.get(index);} catch (Exception ignored) {}if (targetFile ! null offset targetFile.getFileFromOffset() offset targetFile.getFileFromOffset() this.mappedFileSize) {return targetFile;}for (MappedFile tmpMappedFile : this.mappedFiles) {if (offset tmpMappedFile.getFileFromOffset() offset tmpMappedFile.getFileFromOffset() this.mappedFileSize) {return tmpMappedFile;}}}if (returnFirstOnNotFound) {return firstMappedFile;}}} catch (Exception e) {log.error(findMappedFileByOffset Exception, e);}return null;}根据消息偏移量offset查找MappedFile但是不能直接使用 offset%mappedFileSize。这是因为使用了内存映射只要是存在于存储目录下的文件都需要对应创建内存映射文件如果不定期进行将已消费的消息从存储文件中删除会造成极大的内存压力与资源浪费所以RocketMQ采取定时删除存储文件的策略。也就是说在存储文件中第一个文件不一定是00000000000000000000因为该文件在某一时刻会被删除所以根据offset定位MappedFile的算法为(int)((offset/this.mappedFileSize)(mappedFile.getFileFromOffset()/this.MappedFileSize)) 2.2 MappedFile内存映射文件 MappedFile是RocketMQ内存映射文件的具体实现。核心属性有 public class MappedFile extends ReferenceResource {/*** 操作系统页大小*/public static final int OS_PAGE_SIZE 1024 * 4;protected static final InternalLogger log InternalLoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);/*** 当前JVM实例中MappedFile的虚拟内存。*/private static final AtomicLong TOTAL_MAPPED_VIRTUAL_MEMORY new AtomicLong(0);/*** 当前JVM实例中MappedFile对象个数。*/private static final AtomicInteger TOTAL_MAPPED_FILES new AtomicInteger(0);/*** 当前文件的写指针从0开始内存映射文件中的写指针。*/protected final AtomicInteger wrotePosition new AtomicInteger(0);/*** 当前文件的提交指针如果开启transientStorePool则数据会存储在* TransientStorePool中然后提交到内存映射ByteBuffer中再写入磁盘。*/protected final AtomicInteger committedPosition new AtomicInteger(0);/*** 将该指针之前的数据持久化存储到磁盘中。*/private final AtomicInteger flushedPosition new AtomicInteger(0);protected int fileSize;/*** 文件通道*/protected FileChannel fileChannel;/*** Message will put to here first, and then reput to FileChannel if writeBuffer is not null.*//*** 堆外内存ByteBuffer如果不为空数据首先将存储在该Buffer中然后提交到MappedFile创建的* FileChannel中。transientStorePoolEnable为true时不为空。*/protected ByteBuffer writeBuffer null;/*** 堆外内存池该内存池中的内存会提供内存锁机制。transientStorePoolEnable为true时启用。*/protected TransientStorePool transientStorePool null;/*** 文件名称*/private String fileName;/*** 该文件的初始偏移量*/private long fileFromOffset;/*** 物理文件*/private File file;/*** 物理文件对应的内存映射Buffer。(内存映射对象对其进行数据写入会由操作系统同步至磁盘)*/private MappedByteBuffer mappedByteBuffer;/*** 文件最后一次写入内容的时间*/private volatile long storeTimestamp 0;/*** 是否是MappedFileQueue队列中第一个文件。*/private boolean firstCreateInQueue false; }其中 MappedByteBuffer 就是我们之前所说的内存映射对象当你向 MappedByteBuffer 写入数据时这些改动会直接写入到磁盘上。如果你修改 MappedByteBuffer 中的内容改动对所有访问同一文件的其他映射文件也是可见的。从API角度来看使用MappedByteBuffer操作文件可以像操作内存中的一个连续区域一样修改内存中的数据会由操作系统同步至磁盘。 内存映射机制可以参考《Java中使用内存映射操作文件》 2.2.1 MappedFile初始化 第一步根据是否开启transientStorePoolEnable存在两种初始化情况。transientStorePool-Enable为true表示内容先存储在堆外内存然后通过Commit线程将数据提交到FileChannel中再通过Flush线程将数据持久化到磁盘中 public void init(final String fileName, final int fileSize,final TransientStorePool transientStorePool) throws IOException {init(fileName, fileSize);this.writeBuffer transientStorePool.borrowBuffer();this.transientStorePool transientStorePool;}第二步初始化fileFromOffset为文件名也就是文件名代表该文件的起始偏移量通过RandomAccessFile创建读写文件通道并将文件内容使用NIO的内存映射Buffer将文件映射到内存中 //org.apache.rocketmq.store.MappedFile#init(java.lang.String, int) private void init(final String fileName, final int fileSize) throws IOException {this.fileName fileName;this.fileSize fileSize;this.file new File(fileName);this.fileFromOffset Long.parseLong(this.file.getName());boolean ok false;ensureDirOK(this.file.getParent());try {this.fileChannel new RandomAccessFile(this.file, rw).getChannel();// 创建内存映射 MappedByteBufferthis.mappedByteBuffer this.fileChannel.map(MapMode.READ_WRITE, 0, fileSize);TOTAL_MAPPED_VIRTUAL_MEMORY.addAndGet(fileSize);TOTAL_MAPPED_FILES.incrementAndGet();ok true;} catch (FileNotFoundException e) {log.error(Failed to create file this.fileName, e);throw e;} catch (IOException e) {log.error(Failed to map file this.fileName, e);throw e;} finally {if (!ok this.fileChannel ! null) {this.fileChannel.close();}}} 2.2.2 MappedFile提交 内存映射文件的提交动作由MappedFile的commit()方法实现 /*** 内存映射文件的提交动作由MappedFile的commit()方法实现* param commitLeastPages 本次提交的最小页数如果待提交数据不满足commitLeastPages则不执行本次提交操作等待下次提交* return*/public int commit(final int commitLeastPages) {if (writeBuffer null) {//no need to commit data to file channel, so just regard wrotePosition as committedPosition.return this.wrotePosition.get();}if (this.isAbleToCommit(commitLeastPages)) {if (this.hold()) {commit0(commitLeastPages);this.release();} else {log.warn(in commit, hold failed, commit offset this.committedPosition.get());}}// All dirty data has been committed to FileChannel.if (writeBuffer ! null this.transientStorePool ! null this.fileSize this.committedPosition.get()) {this.transientStorePool.returnBuffer(writeBuffer);this.writeBuffer null;}return this.committedPosition.get();}isAbleToCommit 方法校验本次提交的最小页数如果待提交数据不满足commitLeastPages则不执行本次提交操作等待下次提交 //org.apache.rocketmq.store.MappedFile#isAbleToCommit /*** 判断是否执行commit操作。如果文件已满返回true。如果* commitLeastPages大于0则计算wrotePosition当前writeBuffe的* 写指针与上一次提交的指针committedPosition的差值将其除* 以OS_PAGE_SIZE得到当前脏页的数量如果大于commitLeastPages* 则返回true。如果commitLeastPages小于0表示只要存在脏页就提交* param commitLeastPages* return*/protected boolean isAbleToCommit(final int commitLeastPages) {int flush this.committedPosition.get();int write this.wrotePosition.get();if (this.isFull()) {return true;}if (commitLeastPages 0) {return ((write / OS_PAGE_SIZE) - (flush / OS_PAGE_SIZE)) commitLeastPages;}return write flush;}有了 TransientStorePool 的存在消息可以批量写入堆外内存缓冲区RocketMQ 也就可以有效地控制何时以及如何将脏页Dirty Page即已修改但还未写入磁盘的内存页刷写到磁盘避免了操作系统自动进行的随机性、不可预测的脏页刷写操作从而提升了I/O性能特别是在大量写入请求的场景下。但是引入此种机制会带来新的问题前文已经详细介绍过了此处不再赘述。 2.2.3 MappedFile刷盘 刷盘指的是将内存中的数据写入磁盘永久存储在磁盘中由MappedFile的flush()方法实现 // org.apache.rocketmq.store.MappedFile#flush public int flush(final int flushLeastPages) {if (this.isAbleToFlush(flushLeastPages)) {if (this.hold()) {int value getReadPosition();try {//We only append data to fileChannel or mappedByteBuffer, never both.if (writeBuffer ! null || this.fileChannel.position() ! 0) {this.fileChannel.force(false);} else {// 直接调用mappedByteBuffer或fileChannel的force()方法将数据// 写入磁盘将内存中的数据持久化到磁盘中this.mappedByteBuffer.force();}} catch (Throwable e) {log.error(Error occurred when force data to disk., e);}this.flushedPosition.set(value);this.release();} else {log.warn(in flush, hold failed, flush offset this.flushedPosition.get());this.flushedPosition.set(getReadPosition());}}return this.getFlushedPosition();}2.2.4 TransientStorePool TransientStorePool即短暂的存储池。RocketMQ单独创建了一个DirectByteBuffer内存缓存池用来临时存储数据数据先写入该内存映射中然后由Commit线程定时将数据从该内存复制到与目标物理文件对应的内存映射中。RokcetMQ引入该机制是为了提供一种内存锁定将当前堆外内存一直锁定在内存中避免被进程将内存交换到磁盘中。 TransientStorePool核心属性如下 public class TransientStorePool {private static final InternalLogger log InternalLoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);/*** avaliableBuffers个数可在broker配置文件中通过transient StorePoolSize进行设置默认为5*/private final int poolSize;/*** 每个ByteBuffer的大小默认为mapedFileSizeCommitLog表明TransientStorePool为CommitLog文件服务。*/private final int fileSize;/*** ByteBuffer容器双端队列。*/private final DequeByteBuffer availableBuffers;private final MessageStoreConfig storeConfig; }2.2.5 内存映射整体流程 内存映射文件MappedFile通过AllocateMappedFileService创建MappedFile的创建是典型的生产者-消费者模型MappedFileQueue调用getLastMappedFile获取MappedFile时将请求放入队列中AllocateMappedFileService线程持续监听队列队列有请求时创建出MappedFile对象最后将MappedFile对象预热底层调用force方法和mlock方法 三. 消息存储整体流程 3.1 ConmmitLog存储流程 消息存储入口为org.apache.rocketmq.store.DefaultMessageStore#putMessage。 // org.apache.rocketmq.store.DefaultMessageStore#putMessage// 如果当前broker停止工作或当前不支持写入则拒绝消息写入。if (this.shutdown) {log.warn(message store has shutdown, so putMessage is forbidden);return new PutMessageResult(PutMessageStatus.SERVICE_NOT_AVAILABLE, null);}// 如果当前broker是从节点则拒绝写入消息if (BrokerRole.SLAVE this.messageStoreConfig.getBrokerRole()) {long value this.printTimes.getAndIncrement();if ((value % 50000) 0) {log.warn(message store is slave mode, so putMessage is forbidden );}return new PutMessageResult(PutMessageStatus.SERVICE_NOT_AVAILABLE, null);}// 判断当前broker是否能够写入if (!this.runningFlags.isWriteable()) {long value this.printTimes.getAndIncrement();if ((value % 50000) 0) {log.warn(message store is not writeable, so putMessage is forbidden this.runningFlags.getFlagBits());}return new PutMessageResult(PutMessageStatus.SERVICE_NOT_AVAILABLE, null);} else {this.printTimes.set(0);}// topic长度大于127字符则报错if (msg.getTopic().length() Byte.MAX_VALUE) {log.warn(putMessage message topic length too long msg.getTopic().length());return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, null);}// 消息属性长度大于 32767则报错if (msg.getPropertiesString() ! null msg.getPropertiesString().length() Short.MAX_VALUE) {log.warn(putMessage message properties length too long msg.getPropertiesString().length());return new PutMessageResult(PutMessageStatus.PROPERTIES_SIZE_EXCEEDED, null);}// OSPageCacheBusy通常是出现在操作系统在试图缓存太多页面时。当物理内存充满了操作系统可能试图清除一些页面来为新的页面腾出空间。// 如果这个过程中所有的页面都在使用即“繁忙”那么就会报告OSPageCacheBusy。if (this.isOSPageCacheBusy()) {return new PutMessageResult(PutMessageStatus.OS_PAGECACHE_BUSY, null);}第一步先对broker和消息进行基础校验。如果当前broker停止工作或当前不支持写入则拒绝消息写入。如果消息主题长度超过127个字符、消息属性长度超过32767个字符同样拒绝该消息写入。如果日志中出现“message store is not writeable, so putMessage is forbidden”提示最有可能是因为磁盘空间不足在写入ConsumeQueue、Index文件出现错误时会拒绝消息再次写入。 public PutMessageResult putMessage(final MessageExtBrokerInner msg) {// Set the storage time// 记录消息写入时间msg.setStoreTimestamp(System.currentTimeMillis());// Set the message body BODY CRC (consider the most appropriate setting// on the client)msg.setBodyCRC(UtilAll.crc32(msg.getBody()));// Back to ResultsAppendMessageResult result null;StoreStatsService storeStatsService this.defaultMessageStore.getStoreStatsService();String topic msg.getTopic();int queueId msg.getQueueId();final int tranType MessageSysFlag.getTransactionValue(msg.getSysFlag());if (tranType MessageSysFlag.TRANSACTION_NOT_TYPE|| tranType MessageSysFlag.TRANSACTION_COMMIT_TYPE) {// Delay Deliveryif (msg.getDelayTimeLevel() 0) {if (msg.getDelayTimeLevel() this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) {msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel());}topic ScheduleMessageService.SCHEDULE_TOPIC;queueId ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel());// Backup real topic, queueId// 如果消息的延迟级别大于0将消息的原主题名称与原消息队列ID存入消息属性中用延迟消息主题SCHEDULE_TOPIC_XXXX、消//息队列ID更新原先消息的主题与队列这是并发消息重试关键的异步MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic());MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId()));msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));msg.setTopic(topic);msg.setQueueId(queueId);}}InetSocketAddress bornSocketAddress (InetSocketAddress) msg.getBornHost();if (bornSocketAddress.getAddress() instanceof Inet6Address) {msg.setBornHostV6Flag();}InetSocketAddress storeSocketAddress (InetSocketAddress) msg.getStoreHost();if (storeSocketAddress.getAddress() instanceof Inet6Address) {msg.setStoreHostAddressV6Flag();}long eclipsedTimeInLock 0;MappedFile unlockMappedFile null;// 获取当前可以写入的CommitLog文件对应 ${ROCKET_HOME}/store/commitlog 文件夹下的文件MappedFile mappedFile this.mappedFileQueue.getLastMappedFile();// 在将消息写入CommitLog之前先申请putMessageLockputMessageLock.lock(); //spin or ReentrantLock ,depending on store configtry {long beginLockTimestamp this.defaultMessageStore.getSystemClock().now();this.beginTimeInLock beginLockTimestamp;// Here settings are stored timestamp, in order to ensure an orderly// globalmsg.setStoreTimestamp(beginLockTimestamp);if (null mappedFile || mappedFile.isFull()) {//如果mappedFile为空表明 ${ROCKET_HOME}/store/commitlog目录下不存在任何文件说明本次//消息是第一次发送用偏移量0创建第一个CommitLog文件文件名为00000000000000000000mappedFile this.mappedFileQueue.getLastMappedFile(0); // Mark: NewFile may be cause noise}if (null mappedFile) {log.error(create mapped file1 error, topic: msg.getTopic() clientAddr: msg.getBornHostString());beginTimeInLock 0;// 如果文件创建失败抛出CREATE_MAPEDFILE_FAILED这很有可能是磁盘空间不足或权限不够导致的return new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, null);}// 将消息追加到 CommitLog 中result mappedFile.appendMessage(msg, this.appendMessageCallback);switch (result.getStatus()) {case PUT_OK:break;case END_OF_FILE:// 当前CommitLog文件不够写入此条消息 (CommitLog定长1GB)unlockMappedFile mappedFile;// Create a new file, re-write the messagemappedFile this.mappedFileQueue.getLastMappedFile(0);if (null mappedFile) {// XXX: warn and notify melog.error(create mapped file2 error, topic: msg.getTopic() clientAddr: msg.getBornHostString());beginTimeInLock 0;return new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, result);}result mappedFile.appendMessage(msg, this.appendMessageCallback);break;case MESSAGE_SIZE_EXCEEDED:case PROPERTIES_SIZE_EXCEEDED:beginTimeInLock 0;return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, result);case UNKNOWN_ERROR:beginTimeInLock 0;return new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result);default:beginTimeInLock 0;return new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result);}eclipsedTimeInLock this.defaultMessageStore.getSystemClock().now() - beginLockTimestamp;beginTimeInLock 0;} finally {// 追加完毕释放锁putMessageLock.unlock();}if (eclipsedTimeInLock 500) {log.warn([NOTIFYME]putMessage in lock cost time(ms){}, bodyLength{} AppendMessageResult{}, eclipsedTimeInLock, msg.getBody().length, result);}if (null ! unlockMappedFile this.defaultMessageStore.getMessageStoreConfig().isWarmMapedFileEnable()) {this.defaultMessageStore.unlockMappedFile(unlockMappedFile);}PutMessageResult putMessageResult new PutMessageResult(PutMessageStatus.PUT_OK, result);// StatisticsstoreStatsService.getSinglePutMessageTopicTimesTotal(msg.getTopic()).incrementAndGet();storeStatsService.getSinglePutMessageTopicSizeTotal(topic).addAndGet(result.getWroteBytes());// appendMessage只是将消息追加到内存中需要根据采取的是同步刷盘方式还是异步刷盘方式将内存中的数据持久化到磁盘中。handleDiskFlush(result, putMessageResult, msg);// HA主从同步复制handleHA(result, putMessageResult, msg);return putMessageResult;}第二步如果消息的延迟级别大于0将消息的原主题名称与原消息队列ID存入消息属性中用延迟消息主题SCHEDULE_TOPIC_XXXX、消息队列ID更新原先消息的主题与队列。这是并发消息消费重试关键的一步第5章会重点探讨消息重试机制与定时消息的实现原理。 第三步获取当前可以写入的CommitLog文件。CommitLog文件的存储目录为 ${ROCKET_HOME}/store/commitlog每个文件默认1GB一个文件写满后再创建另一个以该文件中第一个偏移量为文件名如果偏移量少于20位则用0补齐。第一个文件初始偏移量为0第二个文件名中的“1073741824”代表该文件第一条消息的物理偏移量为1073741824这样根据物理偏移量可以快速定位到消息。MappedFileQueue可以看作${ROCKET_HOME}/store/commitlog文件夹而MappedFile则对应该文件夹下的文件。 第四步在将消息写入CommitLog之前先申请putMessageLock 第五步设置消息的存储时间如果mappedFile为空表明${ROCKET_HOME}/store/commitlog目录下不存在任何文件说明本次消息是第一次发送用偏移量0创建第一个CommitLog文件文件名为00000000000000000000如果文件创建失败抛出CREATE_MAPEDFILE_FAILED这很有可能是磁盘空间不足或权限不够导致的。 第六步将消息追加到MappedFile中。首先获取MappedFile当前的写指针如果currentPos大于或等于文件大小表明文件已写满抛出AppendMessageStatus.UNKNOWN_ERROR。如果currentPos小于文件大小通过slice()方法创建一个与原ByteBuffer共享的内存区且拥有独立的position、limit、capacity等指针并设置position为当前指针 // org.apache.rocketmq.store.MappedFile#appendMessagesInner public AppendMessageResult appendMessagesInner(final MessageExt messageExt, final AppendMessageCallback cb) {assert messageExt ! null;assert cb ! null;// 首先获取MappedFile当前的写指针如果currentPos大于或等于文件大小表明文件已写满int currentPos this.wrotePosition.get();if (currentPos this.fileSize) {// 通过slice()方法创建一个与原ByteBuffer共享的内存区且拥有独立的position、limit、capacity等指针零拷贝ByteBuffer byteBuffer writeBuffer ! null ? writeBuffer.slice() : this.mappedByteBuffer.slice();// 并设置position为当前指针byteBuffer.position(currentPos);AppendMessageResult result;// 执行写入操作将消息内容存储到ByteBuffer中这里只是将消息存储在MappedFile对应的内存映射Buffer中并没有写入磁盘if (messageExt instanceof MessageExtBrokerInner) {result cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos, (MessageExtBrokerInner) messageExt);} else if (messageExt instanceof MessageExtBatch) {result cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos, (MessageExtBatch) messageExt);} else {return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR);}this.wrotePosition.addAndGet(result.getWroteBytes());this.storeTimestamp result.getStoreTimestamp();return result;}log.error(MappedFile.appendMessage return null, wrotePosition: {} fileSize: {}, currentPos, this.fileSize);return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR);}第七步创建全局唯一消息ID消息ID有16字节。 // org.apache.rocketmq.store.CommitLog.DefaultAppendMessageCallback#doAppend(long, java.nio.ByteBuffer, int, org.apache.rocketmq.store.MessageExtBrokerInner) // 创建全局唯一的消息IDif ((sysflag MessageSysFlag.STOREHOSTADDRESS_V6_FLAG) 0) {msgId MessageDecoder.createMessageId(this.msgIdMemory, msgInner.getStoreHostBytes(storeHostHolder), wroteOffset);} else {msgId MessageDecoder.createMessageId(this.msgIdV6Memory, msgInner.getStoreHostBytes(storeHostHolder), wroteOffset);}为了消息ID具备可读性返回给应用程序的msgId为字符类型可以通过UtilAll. bytes2string方法将msgId字节数组转换成字符串通过UtilAll.string2bytes方法将msgId字符串还原成16字节的数组根据提取的消息物理偏移量可以快速通过msgId找到消息内容。 // org.apache.rocketmq.common.message.MessageDecoder#createMessageId(java.nio.ByteBuffer, java.nio.ByteBuffer, long) public static String createMessageId(final ByteBuffer input, final ByteBuffer addr, final long offset) {input.flip();int msgIDLength addr.limit() 8 ? 16 : 28;input.limit(msgIDLength);input.put(addr);input.putLong(offset);return UtilAll.bytes2string(input.array());}第九步根据消息体、主题和属性的长度结合消息存储格式计算消息的总长度RocketMQ消息存储格式如下 TOTALSIZE消息条目总长度4字节。MAGICCODE魔数4字节。固定值0xdaa320a7。BODYCRC消息体的crc校验码4字节。QUEUEID消息消费队列ID4字节。FLAG消息标记RocketMQ对其不做处理供应用程序使用默认4字节。QUEUEOFFSET消息在ConsumeQuene文件中的物理偏移量8字节。PHYSICALOFFSET消息在CommitLog文件中的物理偏移量8字节。SYSFLAG消息系统标记例如是否压缩、是否是事务消息等4字节。BORNTIMESTAMP消息生产者调用消息发送API的时间戳8字节。BORNHOST消息发送者IP、端口号8字节。STORETIMESTAMP消息存储时间戳8字节。STOREHOSTADDRESSBroker服务器IP端口号8字节。RECONSUMETIMES消息重试次数4字节。Prepared Transaction Offset事务消息的物理偏移量8字节。BodyLength消息体长度4字节。Body消息体内容长度为bodyLenth中存储的值。TopicLength主题存储长度1字节表示主题名称不能超过255个字符。Topic主题长度为TopicLength中存储的值。PropertiesLength消息属性长度2字节表示消息属性长度不能超过65536个字符。Properties消息属性长度为PropertiesLength中存储的值。 计算消息长度的代码 //org.apache.rocketmq.store.CommitLog#calMsgLength /*** CommitLog条目是不定长的每一个条目的长度存储在前4个字节中。* param sysFlag* param bodyLength 消息body长度* param topicLength topic长度* param propertiesLength properties长度* return*/protected static int calMsgLength(int sysFlag, int bodyLength, int topicLength, int propertiesLength) {int bornhostLength (sysFlag MessageSysFlag.BORNHOST_V6_FLAG) 0 ? 8 : 20;int storehostAddressLength (sysFlag MessageSysFlag.STOREHOSTADDRESS_V6_FLAG) 0 ? 8 : 20;final int msgLen 4 //TOTALSIZE 消息头部4字节记录消息条目总长度 4 //MAGICCODE 4 //BODYCRC 消息体的crc校验码 4 //QUEUEID 消息消费队列ID 4 //FLAG 消息标记RocketMQ对其不做处理供应用程序使用 默认4字节。 8 //QUEUEOFFSET 消息在ConsumeQuene文件中的物理偏移量8字节 8 //PHYSICALOFFSET 消息在CommitLog文件中的物理偏移量8字 节。 4 //SYSFLAG 消息系统标记例如是否压缩、是否是事务消息 等4字节。 8 //BORNTIMESTAMP 消息生产者调用消息发送API的时间戳8字 节。 bornhostLength //BORNHOST 消息发送者IP、端口号8字节。 8 //STORETIMESTAMP 消息存储时间戳8字节。 storehostAddressLength //STOREHOSTADDRESS Broker服务器IP端口号8字节。 4 //RECONSUMETIMES 消息重试次数4字节。 8 //Prepared Transaction Offset 事务消息的物理偏移量8字节 4 (bodyLength 0 ? bodyLength : 0) //BODY 消息体长度4字节 消息体内容长度为bodyLenth中存储的值。 1 topicLength //TOPIC 主题存储长度1字节表示主题名称不能超 过255个字符 Topic内容长度为 topicLength 2 (propertiesLength 0 ? propertiesLength : 0) //propertiesLength 消息属性长度2字节表示消息属性长度不能超过65536个字符 消息属性内容 0;return msgLen;}第十步如果消息长度END_FILE_MIN_BLANK_LENGTH大于CommitLog文件的空闲空间则返回AppendMessageStatus.END_OF_FILEBroker会创建一个新的CommitLog文件来存储该消息。从这里可以看出每个CommitLog文件最少空闲8字节高4字节存储当前文件的剩余空间低4字节存储魔数CommitLog.BLANK_MAGIC_CODE: // 如果消息长度END_FILE_MIN_BLANK_LENGTH大于CommitLog文件的空闲空间// 则返回AppendMessageStatus.END_OF_FILEBroker会创建一个新的CommitLog文件来存储该消息。if ((msgLen END_FILE_MIN_BLANK_LENGTH) maxBlank) {this.resetByteBuffer(this.msgStoreItemMemory, maxBlank);// 1 TOTALSIZEthis.msgStoreItemMemory.putInt(maxBlank);// 2 MAGICCODEthis.msgStoreItemMemory.putInt(CommitLog.BLANK_MAGIC_CODE);// 3 The remaining space may be any value// Here the length of the specially set maxBlankfinal long beginTimeMills CommitLog.this.defaultMessageStore.now();byteBuffer.put(this.msgStoreItemMemory.array(), 0, maxBlank);return new AppendMessageResult(AppendMessageStatus.END_OF_FILE, wroteOffset, maxBlank, msgId, msgInner.getStoreTimestamp(),queueOffset, CommitLog.this.defaultMessageStore.now() - beginTimeMills);}第十一步将消息内容存储到ByteBuffer中然后创建AppendMessageResult。这里只是将消息存储在MappedFile对应的内存映射Buffer中并没有写入磁盘 // org.apache.rocketmq.store.CommitLog.DefaultAppendMessageCallback#doAppend(long, java.nio.ByteBuffer, int, org.apache.rocketmq.store.MessageExtBrokerInner)// Initialization of storage spacethis.resetByteBuffer(msgStoreItemMemory, msgLen);// 1 TOTALSIZEthis.msgStoreItemMemory.putInt(msgLen);// 2 MAGICCODEthis.msgStoreItemMemory.putInt(CommitLog.MESSAGE_MAGIC_CODE);// 3 BODYCRCthis.msgStoreItemMemory.putInt(msgInner.getBodyCRC());// 4 QUEUEIDthis.msgStoreItemMemory.putInt(msgInner.getQueueId());// 5 FLAGthis.msgStoreItemMemory.putInt(msgInner.getFlag());// 6 QUEUEOFFSETthis.msgStoreItemMemory.putLong(queueOffset);// 7 PHYSICALOFFSETthis.msgStoreItemMemory.putLong(fileFromOffset byteBuffer.position());// 8 SYSFLAGthis.msgStoreItemMemory.putInt(msgInner.getSysFlag());// 9 BORNTIMESTAMPthis.msgStoreItemMemory.putLong(msgInner.getBornTimestamp());// 10 BORNHOSTthis.resetByteBuffer(bornHostHolder, bornHostLength);this.msgStoreItemMemory.put(msgInner.getBornHostBytes(bornHostHolder));// 11 STORETIMESTAMPthis.msgStoreItemMemory.putLong(msgInner.getStoreTimestamp());// 12 STOREHOSTADDRESSthis.resetByteBuffer(storeHostHolder, storeHostLength);this.msgStoreItemMemory.put(msgInner.getStoreHostBytes(storeHostHolder));// 13 RECONSUMETIMESthis.msgStoreItemMemory.putInt(msgInner.getReconsumeTimes());// 14 Prepared Transaction Offsetthis.msgStoreItemMemory.putLong(msgInner.getPreparedTransactionOffset());// 15 BODYthis.msgStoreItemMemory.putInt(bodyLength);if (bodyLength 0)this.msgStoreItemMemory.put(msgInner.getBody());// 16 TOPICthis.msgStoreItemMemory.put((byte) topicLength);this.msgStoreItemMemory.put(topicData);// 17 PROPERTIESthis.msgStoreItemMemory.putShort((short) propertiesLength);if (propertiesLength 0)this.msgStoreItemMemory.put(propertiesData);final long beginTimeMills CommitLog.this.defaultMessageStore.now();// Write messages to the queue buffer// 将消息内容存储到ByteBuffer中然后创建AppendMessageResult。这里只是将消息存储在MappedFile对应的内存映射Buffer中并没有写入磁盘byteBuffer.put(this.msgStoreItemMemory.array(), 0, msgLen);AppendMessageResult result new AppendMessageResult(AppendMessageStatus.PUT_OK, wroteOffset, msgLen, msgId,msgInner.getStoreTimestamp(), queueOffset, CommitLog.this.defaultMessageStore.now() - beginTimeMills);AppendMessageResult 结构如下 public class AppendMessageResult {// Return code/*** 消息追加结果。取值为PUT_OK则代表追加成功、* END_OF_FILE则代表超过文件大小、* MESSAGE_SIZE_EXCEEDED则代表消息长度超过最大允许长度、* PROPERTIES_SIZE_EXCEEDED则代表消息属性超过最大允许长度、* UNKNOWN_ERROR则代表未知异常。*/private AppendMessageStatus status;// Where to start writing/*** 消息的物理偏移量。*/private long wroteOffset;// Write Bytesprivate int wroteBytes;// 消息IDprivate String msgId;// Message storage timestamp/*** 消息存储时间戳*/private long storeTimestamp;// Consume queues offset(step by one)/*** 消息消费队列的逻辑偏移量类似于数组下标*/private long logicsOffset;/*** 写入页缓存的响应时间*/private long pagecacheRT 0;/*** 批量发送消息时的消息条数*/private int msgNum 1; }第十二步更新消息队列的逻辑偏移量 第十三步处理完消息追加逻辑后将释放putMessageLock。 第十四步DefaultAppendMessageCallback#doAppend只是将消息追加到内存中需要根据采取的是同步刷盘方式还是异步刷盘方式将内存中的数据持久化到磁盘中后文会详细介绍刷盘操作。 // org.apache.rocketmq.store.CommitLog#putMessage // appendMessage只是将消息追加到内存中需要根据采取的是同步刷盘方式还是异步刷盘方式将内存中的数据持久化到磁盘中。 handleDiskFlush(result, putMessageResult, msg);第十五步然后执行HA主从同步复制。 // org.apache.rocketmq.store.CommitLog#putMessage // HA主从同步复制 handleHA(result, putMessageResult, msg);3.2 ConsumeQueue、Index消息索引的异步构建 因为ConsumeQueue文件、Index文件都是基于CommitLog文件构建的所以当消息生产者提交的消息存储到CommitLog文件中时ConsumeQueue文件、Index文件需要及时更新否则消息无法及时被消费根据消息属性查找消息也会出现较大延迟。RocketMQ通过开启一个线程ReputMessageServcie来准实时转发CommitLog文件的更新事件相应的任务处理器根据转发的消息及时更新ConsumeQueue文件、Index文件。 // org.apache.rocketmq.store.DefaultMessageStore.ReputMessageServiceclass ReputMessageService extends ServiceThread {private volatile long reputFromOffset 0;/*** ReputMessageService线程每执行一次任务推送休息1ms后继续* 尝试推送消息到Consume Queue和Index文件中消息消费转发由* doReput()方法实现*/private void doReput() {if (this.reputFromOffset DefaultMessageStore.this.commitLog.getMinOffset()) {log.warn(The reputFromOffset{} is smaller than minPyOffset{}, this usually indicate that the dispatch behind too much and the commitlog has expired.,this.reputFromOffset, DefaultMessageStore.this.commitLog.getMinOffset());this.reputFromOffset DefaultMessageStore.this.commitLog.getMinOffset();}for (boolean doNext true; this.isCommitLogAvailable() doNext; ) {if (DefaultMessageStore.this.getMessageStoreConfig().isDuplicationEnable() this.reputFromOffset DefaultMessageStore.this.getConfirmOffset()) {break;}// 返回reputFromOffset偏移量开始的全部有效数据CommitLog文件。然后循环读取每一条消息SelectMappedBufferResult result DefaultMessageStore.this.commitLog.getData(reputFromOffset);if (result ! null) {try {this.reputFromOffset result.getStartOffset();for (int readSize 0; readSize result.getSize() doNext; ) {// 从result返回的ByteBuffer中循环读取消息一次读取一条创建Dispatch Request对象DispatchRequest dispatchRequest DefaultMessageStore.this.commitLog.checkMessageAndReturnSize(result.getByteBuffer(), false, false);int size dispatchRequest.getBufferSize() -1 ? dispatchRequest.getMsgSize() : dispatchRequest.getBufferSize();if (dispatchRequest.isSuccess()) {if (size 0) {// 执行CommitLog转发DefaultMessageStore.this.doDispatch(dispatchRequest);if (BrokerRole.SLAVE ! DefaultMessageStore.this.getMessageStoreConfig().getBrokerRole() DefaultMessageStore.this.brokerConfig.isLongPollingEnable()) {DefaultMessageStore.this.messageArrivingListener.arriving(dispatchRequest.getTopic(),dispatchRequest.getQueueId(), dispatchRequest.getConsumeQueueOffset() 1,dispatchRequest.getTagsCode(), dispatchRequest.getStoreTimestamp(),dispatchRequest.getBitMap(), dispatchRequest.getPropertiesMap());}this.reputFromOffset size;readSize size;if (DefaultMessageStore.this.getMessageStoreConfig().getBrokerRole() BrokerRole.SLAVE) {DefaultMessageStore.this.storeStatsService.getSinglePutMessageTopicTimesTotal(dispatchRequest.getTopic()).incrementAndGet();DefaultMessageStore.this.storeStatsService.getSinglePutMessageTopicSizeTotal(dispatchRequest.getTopic()).addAndGet(dispatchRequest.getMsgSize());}} else if (size 0) {this.reputFromOffset DefaultMessageStore.this.commitLog.rollNextFile(this.reputFromOffset);readSize result.getSize();}} else if (!dispatchRequest.isSuccess()) {if (size 0) {log.error([BUG]read total count not equals msg total size. reputFromOffset{}, reputFromOffset);this.reputFromOffset size;} else {doNext false;// If user open the dledger pattern or the broker is master node,// it will not ignore the exception and fix the reputFromOffset variableif (DefaultMessageStore.this.getMessageStoreConfig().isEnableDLegerCommitLog() ||DefaultMessageStore.this.brokerConfig.getBrokerId() MixAll.MASTER_ID) {log.error([BUG]dispatch message to consume queue error, COMMITLOG OFFSET: {},this.reputFromOffset);this.reputFromOffset result.getSize() - readSize;}}}}} finally {result.release();}} else {doNext false;}}}Overridepublic void run() {DefaultMessageStore.log.info(this.getServiceName() service started);while (!this.isStopped()) {try {Thread.sleep(1);// 每休息1ms就执行一次消息转发this.doReput();} catch (Exception e) {DefaultMessageStore.log.warn(this.getServiceName() service has exception. , e);}}DefaultMessageStore.log.info(this.getServiceName() service end);}}3.3 整体流程 Broker端收到消息后将消息原始信息保存在CommitLog文件对应的MappedFile中然后异步刷新到磁盘ReputMessageServie线程异步的将CommitLog中MappedFile中的消息保存到ConsumerQueue和IndexFile中ConsumerQueue和IndexFile只是原始文件的索引信息。 四. 文件创建 当有新的消息到来的时候其会默认选择列表中的最后一个文件来进行消息的保存: 当有新的消息到来的时候其会默认选择列表中的最后一个文件来进行消息的保存: org.apache.rocketmq.store.MappedFileQueue public class MappedFileQueue {public MappedFile getLastMappedFile() {MappedFile mappedFileLast null;while (!this.mappedFiles.isEmpty()) {try {mappedFileLast this.mappedFiles.get(this.mappedFiles.size() - 1);break;} catch (IndexOutOfBoundsException e) {//continue;} catch (Exception e) {log.error(getLastMappedFile has exception., e);break;}}return mappedFileLast;} }当然如果这个 Broker 之前从未接受过消息的话那么这个列表肯定是空的。这样一旦有新的消息需要存储的时候其就得需要立即创建一个 MappedFile 文件来存储消息。 RocketMQ 提供了一个专门用来实例化 MappedFile 文件的服务类 AllocateMappedFileService。在内存中也同时维护了一张请求表 requestTable 和一个优先级请求队列 requestQueue 。当需要创建文件的时候Broker 会创建一个 AllocateRequest 对象其包含了文件的路径、大小等信息。然后先将其放入 requestTable 表中再将其放入优先级请求队列 requestQueue 中: org.apache.rocketmq.store.AllocateMappedFileService#putRequestAndReturnMappedFile public class AllocateMappedFileService extends ServiceThread {public MappedFile putRequestAndReturnMappedFile(String nextFilePath,String nextNextFilePath,int fileSize) {// ...AllocateRequest nextReq new AllocateRequest(nextFilePath, fileSize);boolean nextPutOK this.requestTable.putIfAbsent(nextFilePath, nextReq) null;if (nextPutOK) {// ...boolean offerOK this.requestQueue.offer(nextReq);}}}服务类会一直等待优先级队列是否有新的请求到来如果有便会从队列中取出请求然后创建对应的 MappedFile并将请求表 requestTable 中 AllocateRequest 对象的字段 mappedFile 设置上值。最后将 AllocateRequest 对象上的 CountDownLatch 的计数器减 1 以标明此分配申请的 MappedFile 已经创建完毕了: org.apache.rocketmq.store.AllocateMappedFileService#mmapOperation public class AllocateMappedFileService extends ServiceThread {public void run() {log.info(this.getServiceName() service started);//会一直尝试从队列中获取请求从而执行创建文件的任务while (!this.isStopped() this.mmapOperation()) {}log.info(this.getServiceName() service end);}/*** Only interrupted by the external thread, will return false*/private boolean mmapOperation() {boolean isSuccess false;AllocateRequest req null;try {// 获取优先队列中的请求req this.requestQueue.take();AllocateRequest expectedRequest this.requestTable.get(req.getFilePath());// ...if (req.getMappedFile() null) {long beginTime System.currentTimeMillis();MappedFile mappedFile;if (messageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {try {//创建MappedFilemappedFile ServiceLoader.load(MappedFile.class).iterator().next();mappedFile.init(req.getFilePath(), req.getFileSize(), messageStore.getTransientStorePool());} catch (RuntimeException e) {log.warn(Use default implementation.);mappedFile new MappedFile(req.getFilePath(), req.getFileSize(), messageStore.getTransientStorePool());}} else {mappedFile new MappedFile(req.getFilePath(), req.getFileSize());}// pre write mappedFileif (mappedFile.getFileSize() this.messageStore.getMessageStoreConfig()//...}req.setMappedFile(mappedFile);this.hasException false;isSuccess true;}} catch (InterruptedException e) {log.warn(this.getServiceName() interrupted, possibly by shutdown.);this.hasException true;return false;} catch (IOException e) {log.warn(this.getServiceName() service has exception. , e);this.hasException true;if (null ! req) {requestQueue.offer(req);try {Thread.sleep(1);} catch (InterruptedException ignored) {}}} finally {if (req ! null isSuccess)// 文件创建成功计数减一。因为创建文件的动作是在独立的线程中完成的业务线程需要等待文件创建完毕req.getCountDownLatch().countDown();}return true;} }等待 MappedFile 创建完毕之后其便会从请求表 requestTable 中取出并删除表中记录: org.apache.rocketmq.store.AllocateMappedFileService#putRequestAndReturnMappedFile public class AllocateMappedFileService extends ServiceThread {public MappedFile putRequestAndReturnMappedFile(String nextFilePath,String nextNextFilePath,int fileSize) {// ...........//获取请求AllocateRequest result this.requestTable.get(nextFilePath);try {if (result ! null) {// 等待 MappedFile 的创建完成boolean waitOK result.getCountDownLatch().await(waitTimeOut, TimeUnit.MILLISECONDS);if (!waitOK) {// 创建超时log.warn(create mmap timeout result.getFilePath() result.getFileSize());return null;} else {//创建成功则将requestTable中将请求移除this.requestTable.remove(nextFilePath);// 返回创建的MappedFilesreturn result.getMappedFile();}} else {log.error(find preallocate mmap failed, this never happen);}} catch (InterruptedException e) {log.warn(this.getServiceName() service has exception. , e);}return null;}}创建完成后将其加入列表中 org.apache.rocketmq.store.MappedFileQueue#getLastMappedFile(long, boolean) public class MappedFileQueue {public MappedFile getLastMappedFile(final long startOffset, boolean needCreate) {long createOffset -1;// 尝试获取最后一个MappedFileMappedFile mappedFileLast getLastMappedFile();if (mappedFileLast null) {createOffset startOffset - (startOffset % this.mappedFileSize);}if (mappedFileLast ! null mappedFileLast.isFull()) {createOffset mappedFileLast.getFileFromOffset() this.mappedFileSize;}// 第一次启动未创建MappedFileif (createOffset ! -1 needCreate) {// 文件名String nextFilePath this.storePath File.separator UtilAll.offset2FileName(createOffset);String nextNextFilePath this.storePath File.separator UtilAll.offset2FileName(createOffset this.mappedFileSize);MappedFile mappedFile null;if (this.allocateMappedFileService ! null) {// 提交一个创建MappedFile的请求mappedFile this.allocateMappedFileService.putRequestAndReturnMappedFile(nextFilePath,nextNextFilePath, this.mappedFileSize);} else {try {mappedFile new MappedFile(nextFilePath, this.mappedFileSize);} catch (IOException e) {log.error(create mappedFile exception, e);}}if (mappedFile ! null) {//创建成功if (this.mappedFiles.isEmpty()) {mappedFile.setFirstCreateInQueue(true);}//加入列表this.mappedFiles.add(mappedFile);}return mappedFile;}return mappedFileLast;} }至此MappedFile 已经创建完毕也即可以进行下一步的操作了。 五. 文件初始化 在 MappedFile 的构造函数中其使用了 FileChannel 类提供的 map 函数来将磁盘上的这个文件映射到进程地址空间中。然后当通过 MappedByteBuffer 来读入或者写入文件的时候磁盘上也会有相应的改动。采用这种方式通常比传统的基于文件 IO 流的方式读取效率高。 public class MappedFile extends ReferenceResource {public MappedFile(final String fileName, final int fileSize)throws IOException {init(fileName, fileSize);}private void init(final String fileName, final int fileSize)throws IOException {// ...this.fileChannel new RandomAccessFile(this.file, rw).getChannel();this.mappedByteBuffer this.fileChannel.map(MapMode.READ_WRITE, 0, fileSize);// ...}}六. 消息文件加载 前面提到过Broker 在启动的时候会加载磁盘上的文件到一个 mappedFiles 列表中。但是加载完毕后其还会对这份列表中的消息文件进行验证 (恢复)确保没有错误。 验证的基本想法是通过一一读取列表中的每一个文件然后再一一读取每个文件中的每个消息在读取的过程中其会更新整体的消息写入的偏移量如下图中的红色箭头 (我们假设最终读取的消息的总偏移量为 905): 当确定消息整体的偏移量之后Broker 便会确定每一个单独的 MappedFile 文件的各自的偏移量每一个文件的偏移量是通过取余算法确定的: org.apache.rocketmq.store.MappedFileQueue#truncateDirtyFiles: public class MappedFileQueue {public void truncateDirtyFiles(long offset) {for (MappedFile file : this.mappedFiles) {long fileTailOffset file.getFileFromOffset() this.mappedFileSize;if (fileTailOffset offset) {if (offset file.getFileFromOffset()) {// 确定每个文件的各自偏移量file.setWrotePosition((int) (offset % this.mappedFileSize));file.setCommittedPosition((int) (offset % this.mappedFileSize));file.setFlushedPosition((int) (offset % this.mappedFileSize));} else {// ...}}}// ...}}在确定每个消息文件各自的写入位置的同时其还会删除起始偏移量大于当前总偏移量的消息文件这些文件可以视作脏文件或者也可以说这些文件里面一条消息也没有。这也是上述文件 1073741824 被打上红叉的原因: public void truncateDirtyFiles(long offset) {ListMappedFile willRemoveFiles new ArrayListMappedFile();for (MappedFile file : this.mappedFiles) {long fileTailOffset file.getFileFromOffset() this.mappedFileSize;if (fileTailOffset offset) {if (offset file.getFileFromOffset()) {// ...} else {// 总偏移量 文件起始偏移量// 加入到待删除列表中file.destroy(1000);willRemoveFiles.add(file);}}}this.deleteExpiredFile(willRemoveFiles); }七. 写入消息 消息写入口org.apache.rocketmq.store.CommitLog#putMessage 一旦我们获取到 MappedFile 文件之后我们便可以往这个文件里面写入消息了。写入消息可能会遇见如下两种情况一种是这条消息可以完全追加到这个文件中另外一种是这条消息完全不能或者只有一小部分能存放到这个文件中其余的需要放到新的文件中。我们对于这两种情况分别讨论: 7.1 文件可以完全存储消息 MappedFile 类维护了一个用以标识当前写位置的指针 wrotePosition以及一个用来映射文件到进程地址空间的 mappedByteBuffer: public class MappedFile extends ReferenceResource {protected final AtomicInteger wrotePosition new AtomicInteger(0);private MappedByteBuffer mappedByteBuffer;}由这两个数据结构我们可以看出来单个文件的消息写入过程其实是非常简单的。首先获取到这个文件的写入位置然后将消息内容追加到 byteBuffer 中然后再更新写入位置。 public class MappedFile extends ReferenceResource {public AppendMessageResult appendMessagesInner(final MessageExt messageExt, final AppendMessageCallback cb) {// ...int currentPos this.wrotePosition.get();if (currentPos this.fileSize) {ByteBuffer byteBuffer writeBuffer ! null ?writeBuffer.slice() :this.mappedByteBuffer.slice();// 更新 byteBuffer 位置byteBuffer.position(currentPos);// 写入消息内容// ...// 获取当前需要写入的消息长度更新 wrotePosition 指针的位置this.wrotePosition.addAndGet(result.getWroteBytes());return result;}}}示例流程如下所示: 7.2 文件不可以完全存储消息 在写入消息之前如果判断出文件已经满了的情况下其会直接尝试创建一个新的 MappedFile: public class CommitLog {public PutMessageResult putMessage(final MessageExtBrokerInner msg) {// 文件为空 || 文件已经满了if (null mappedFile || mappedFile.isFull()) {mappedFile this.mappedFileQueue.getLastMappedFile(0);}// ...result mappedFile.appendMessage(msg, this.appendMessageCallback);}}如果文件未满那么在写入之前会先计算出消息体长度 msgLen然后判断这个文件剩下的空间是否有能力容纳这条消息。在这个地方我们还需要介绍下每条消息的存储方式。 每条消息的存储是按照一个 4 字节的长度来做界限的这个长度本身就是整个消息体的长度当读完这整条消息体的长度之后下一次再取出来的一个 4 字节的数字便又是下一条消息的长度: 围绕着一条消息还会存储许多其它内容我们在这里只需要了解前两位是 4 字节的总长度和 4 字节的 MAGICCODE 即可: MAGICCODE 的可选值有: CommitLog.MESSAGE_MAGIC_CODECommitLog.BLANK_MAGIC_CODE 当这个文件有能力容纳这条消息体的情况下其便会存储 MESSAGE_MAGIC_CODE 值当这个文件没有能力容纳这条消息体的情况下其便会存储 BLANK_MAGIC_CODE 值。所以这个 MAGICCODE 是用来界定这是空消息还是一条正常的消息。 当判定这个文件不足以容纳整个消息的时候其将消息体长度设置为这个文件剩余的最大空间长度将 MAGICCODE 设定为这是一个空消息文件 (需要去下一个文件去读)。由此我们可以看出消息体长度 和 MAGICCODE 是判别一条消息格式的最基本要求这也是 END_FILE_MIN_BLANK_LENGTH 的值为 8 的原因: org.apache.rocketmq.store.CommitLog.DefaultAppendMessageCallback#doAppend(long, java.nio.ByteBuffer, int, org.apache.rocketmq.store.MessageExtBrokerInner) public class CommitLog {class DefaultAppendMessageCallback implements AppendMessageCallback {// File at the end of the minimum fixed length emptyprivate static final int END_FILE_MIN_BLANK_LENGTH 4 4;public AppendMessageResult doAppend(final long fileFromOffset,final ByteBuffer byteBuffer,final int maxBlank,final MessageExtBrokerInner msgInner) {// ...if ((msgLen END_FILE_MIN_BLANK_LENGTH) maxBlank) {// ...// 1 TOTALSIZEthis.msgStoreItemMemory.putInt(maxBlank);// 2 MAGICCODEthis.msgStoreItemMemory.putInt(CommitLog.BLANK_MAGIC_CODE);// 3 The remaining space may be any valuebyteBuffer.put(this.msgStoreItemMemory.array(), 0, maxBlank);return new AppendMessageResult(AppendMessageStatus.END_OF_FILE,/** other params **/ );}}} }由上述方法我们看出在这种情况下返回的结果是 END_OF_FILE。当检测到这种返回结果的时候CommitLog 接着又会申请创建新的 MappedFile 并尝试写入消息。追加方法同 (1) 相同不再赘述: 注: 在消息文件加载的过程中其也是通过判断 MAGICCODE 的类型来判断是否继续读取下一个 MappedFile 来计算整体消息偏移量的。 六. 消息刷盘策略 在消息存储主流程执行完成后会调用handleDiskFlush触发刷盘策略。 // org.apache.rocketmq.store.CommitLog#handleDiskFlush /*** 触发刷盘CommitLog* param result* param putMessageResult* param messageExt*/public void handleDiskFlush(AppendMessageResult result, PutMessageResult putMessageResult, MessageExt messageExt) {// Synchronization flushif (FlushDiskType.SYNC_FLUSH this.defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {// 如果是同步刷盘final GroupCommitService service (GroupCommitService) this.flushCommitLogService;if (messageExt.isWaitStoreMsgOK()) {// 构建GroupCommitRequest同步任务并提交到GroupCommitRequest。GroupCommitRequest request new GroupCommitRequest(result.getWroteOffset() result.getWroteBytes());// 将同步任务GroupCommitRequest提交到GroupCommitService线程service.putRequest(request);// 等待同步刷盘任务完成异步刷盘线程刷写完毕后会唤醒当前线程超时时间默认为5s如果超时则返回刷盘错误刷盘成功后正常返回给调用方。boolean flushOK request.waitForFlush(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout());if (!flushOK) {log.error(do groupcommit, wait for flush failed, topic: messageExt.getTopic() tags: messageExt.getTags() client address: messageExt.getBornHostString());putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_DISK_TIMEOUT);}} else {service.wakeup();}}// Asynchronous flushelse {// 异步刷盘if (!this.defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {// 如果transientStorePoolEnable为false消息将追加到与物理文件直接映射的内存中然后写入磁盘// 唤醒刷盘线程执行刷盘操作flushCommitLogService.wakeup();} else {// 如果transientStorePoolEnable为trueRocketMQ会单独申请一个与目标物理文件CommitLog同样大// 小的堆外内存该堆外内存将使用内存锁定确保不会被置换到虚拟内存中去消息首先追加到堆外内存然后提交到与物理文件的内存// 映射中再经flush操作到磁盘commitLogService.wakeup();}}}刷盘的整体流程 producer发送给broker的消息保存在MappedFile中然后通过刷盘机制同步到磁盘中 刷盘分为同步刷盘和异步刷盘 异步刷盘后台线程按一定时间间隔执行 同步刷盘也是生产者-消费者模型。broker保存消息到MappedFile后创建GroupCommitRequest请求放入列表并阻塞等待。后台线程从列表中获取请求并刷新磁盘成功刷盘后通知等待线程。 6.1 异步刷盘 当配置为异步刷盘策略的时候Broker 会运行一个服务 FlushRealTimeService 用来刷新缓冲区的消息内容到磁盘这个服务使用一个独立的线程来做刷盘这件事情默认情况下每隔 500ms 来检查一次是否需要刷盘: class FlushRealTimeService extends FlushCommitLogService {public void run() {// 不停运行while (!this.isStopped()) {// interval 默认值是 500msif (flushCommitLogTimed) {Thread.sleep(interval);} else {this.waitForRunning(interval);}// 刷盘CommitLog.this.mappedFileQueue.flush(flushPhysicQueueLeastPages);}}}本文参考至 《RocketMQ技术内幕》 RocketMQ 消息存储流程 | 赵坤的个人网站 (kunzhao.org) 图解RocketMQ消息发送和存储流程 - 掘金 (juejin.cn)
http://www.zqtcl.cn/news/111084/

相关文章:

  • 同性男做的视频网站赶集网招聘最新招聘附近找工作
  • 做挖机配件销售的网站oa办公系统软件哪家好
  • 聊城设计网站商务网站的特点
  • 厦门做个网站多少钱工程建设范围
  • 百度推广官方网站在哪里制作网页
  • 济南集团网站建设方案沈阳手机网站制作
  • 网站备案号注销的结果做网站的外包能学到什么
  • 在线购物网站开发项目网站建设电话推广话术
  • 网站主体信息太原站扩建
  • 西平县住房和城乡建设局网站空间商网站
  • p2p网站建设cms一键生成图片
  • 甘肃省第八建设集团公司网站能够做物理题的网站
  • 团购网站建设方案建筑工程网校官网
  • 佛山建站网站模板小公司管理方法
  • 常德住房和城乡建设局网站做风险代理案源的网站
  • 手机网站开发人员选项wordpress加载媒体库
  • 做钓鱼网站用哪种编程语言张家界有实力seo优化费用
  • 如何做一个主题网站做网站必须有框架么
  • 建设网站需要什么知识上海高端网页设计
  • 电子商务网站建设基本流程公司网站建设平台
  • 域名没过期 网站打不开怎么办素马设计顾问讲解价格
  • 怎么做非法彩票网站贵州网站开发哪家便宜
  • 青岛市医疗保险网站wordpress七牛云
  • 哪个浏览器可以做网站查询网站的外链
  • 浅析社区网站的建设有了网站源码 怎么建设网站
  • 苏州网站排名优化系统网页设计师
  • 网站开发定制推广杭州河南省的网页制作
  • 北京随喜设计网站国内好的seo网站
  • 网站中宣传彩页怎么做的网站建设评估及分析
  • 东莞php网站建设素材网站php程序源码