上海智能网站建设设计,想开一家公司需要多少钱,宝塔wordpress建站教程,杭州门户网站建设公司1. 绪论
rocketmq之所以能够有如此大的吞吐量#xff0c;离不开两个组件#xff0c;一个是利用netty实现的高性能网络通信组件#xff1b;另一个就是利用mmap技术实现的存储组件。而在rocketmq的存储组件中主要有三个组件#xff0c;分别是持久化文件commitLog#xff0c…
1. 绪论
rocketmq之所以能够有如此大的吞吐量离不开两个组件一个是利用netty实现的高性能网络通信组件另一个就是利用mmap技术实现的存储组件。而在rocketmq的存储组件中主要有三个组件分别是持久化文件commitLog让消费直接消费信息的文件consumerqueue同时我们需要根据msgKey进行检索所以还有一个indexFile。这三个文件是rocketmq存储组件的核心。本文将要介绍的是持久化组件commitLog中与磁盘交互的组件mappedFile。
2. mmap
在介绍commitLog之前我们来看看它所使用的核心技术mmap。
2.1 传统读写
我们来看看传统读操作有什么局限如果是利用传统的读写操作从磁盘文件中读取一块数据的话需要经过如下几步
1.用户线程通过调用操作系统的read方法发起读取磁盘文件的请求。
2.操作系统通过需要读取文件的inode信息在页缓存中查询是否找到页内容如果找到便直接返回文件页内容。
3.如果未找到便通过inode信息定位到磁盘文件地址并且通过计算机的dma组件将磁盘文件复制到页缓存中。
4.cpu再将页缓存数据返回给用户进程的缓冲区中。
通过上面的步骤看出如果采用普通的读写操作需要经过磁盘-内核空间页缓存-用户空间缓冲区-内核空间缓冲区-网卡这四个步骤。 2.2 零拷贝技术
2.2.1 物理地址和虚拟地址
1.什么是物理地址
物理地址就是实际的物理内存的地址
2.什么是虚拟地址
在早期的计算机系统中cpu其实操作的是物理地址后来发现内存不够过后便发明了swap技术即将内存中的不经常访问的数据放到磁盘中去这样内存空间可以存储更多的数据给用户营造了一种内存很大的假象。而这些复杂的操作是操作系统完成的而操作系统给用户提供的是一片连续的地址空间这些地址就是虚拟地址。并且通过内存管理单元mmu实现虚拟地址和物理地址的转换。 2.2.2 mmap映射步骤
1.进程启动映射并且在进程的虚拟地址空间中创建映射区域
其实就是进程会在用户空间中的一块专门的虚拟地址空间直接内存中划分一块区域用来存储映射磁盘文件的虚拟地址。
2.建立物理内存地址和虚拟内存地址的映射关系
即建立地址虚拟空间和磁盘文件的地址之间的映射关系
3.对映射空间进行访问发生缺页异常实现磁盘到主存的拷贝
在通过对虚拟地址空间进行读写时会通过mmu转换成物理地址操作系统发现对应的物理地址缺失产生缺页异常从而将磁盘文件读取到虚拟空间的这片内存中来。 2.2.3 mmap的优点
mmap其实就是零拷贝通过上面传统操作的读写可以看出从磁盘需要拷贝到内核空间中转后才能到用户空间。有没有办法减少中间这次拷贝呢那就是利用mmap技术。
通过mmap技术可以将磁盘文件地址可以和进程虚拟地址空间中的一段虚拟地址映射在对虚拟地址进行读写操作时就相当于对磁盘文件进行读写操作。减少了两次用户缓冲区和内核空间页表复制的过程。 3.mappedFile
mappedfile是rocketmq真正将数据写入到磁盘的组件。接下来我们看看mappedfile是如何存储数据的。
3.1.1 mappedFile的组成
public class DefaultMappedFile extends AbstractMappedFile {//一个page页的大小为4kb即mappedFile隔4kb便写入一个byte值实现文件预热前面说过mmap技术是利用缺页中断将磁盘中的数据加载到内存中的而一个内存页的大小为4kb//如果不采用文件预热机制的话1gb的commitLog需要发生26w次缺页中断所以在初始化commitLog的时候每隔4kb就写入一个假0这样就一次性的将所以的文件页加载到了内存中。public static final int OS_PAGE_SIZE 1024 * 4;public static final Unsafe UNSAFE getUnsafe();private static final Method IS_LOADED_METHOD;public static final int UNSAFE_PAGE_SIZE UNSAFE null ? OS_PAGE_SIZE : UNSAFE.pageSize();protected static final Logger log LoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);//加载过的总的虚拟内存大小protected static final AtomicLong TOTAL_MAPPED_VIRTUAL_MEMORY new AtomicLong(0);//加载过的总的虚拟内存数量protected static final AtomicInteger TOTAL_MAPPED_FILES new AtomicInteger(0);protected static final AtomicIntegerFieldUpdaterDefaultMappedFile WROTE_POSITION_UPDATER;protected static final AtomicIntegerFieldUpdaterDefaultMappedFile COMMITTED_POSITION_UPDATER;protected static final AtomicIntegerFieldUpdaterDefaultMappedFile FLUSHED_POSITION_UPDATER;//当前虚拟buffer的写指针protected volatile int wrotePosition;//虚拟buffer的读指针protected volatile int committedPosition;//flush指针protected volatile int flushedPosition;//文件大小protected int fileSize;protected FileChannel fileChannel;/*** Message will put to here first, and then reput to FileChannel if writeBuffer is not null.*/protected ByteBuffer writeBuffer null;//在高并发场景直接内存可能也抵挡不住所以可以先将数据写入到堆内存中然后再commiit到直接内存中最后通过flush到磁盘中protected TransientStorePool transientStorePool null;//文件名mappedfile的文件名protected String fileName;//当前文件写入offsetprotected long fileFromOffset;protected File file;//这个是对应的虚拟bufferprotected MappedByteBuffer mappedByteBuffer;//存储的时间protected volatile long storeTimestamp 0;protected boolean firstCreateInQueue false;//最后一次flush的时间private long lastFlushTime -1L;protected MappedByteBuffer mappedByteBufferWaitToClean null;protected long swapMapTime 0L;protected long mappedByteBufferAccessCountSinceLastSwap 0L;
}
3.3.2 mmapedFile的写入的两种模式-是否开启瞬时缓存技术
1.如果不开启瞬时缓存技术
在写入的时候直接写入到直接内存中MapedFileBuffer然后flush到磁盘 2.如果开启瞬时缓存技术
如果开启瞬时缓存技术的话数据会先写入bytebuffer中然后commit到MappedBytebuffer最后再flush到磁盘中去。其实commit和flush都是采用异步线程刷入来实现的所以增加了吞吐量。 3. 瞬时缓存技术TransientStorePool
初始化
public class TransientStorePool {private static final Logger log LoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);//有多少内存页private final int poolSize;private final int fileSize;//核心就是多个ByteBuffer队列private final DequeByteBuffer availableBuffers;private volatile boolean isRealCommit true;
}
借内存页
其实就是从内存队列中取出第一个buffer。 public ByteBuffer borrowBuffer() {ByteBuffer buffer availableBuffers.pollFirst();if (availableBuffers.size() poolSize * 0.4) {log.warn(TransientStorePool only remain {} sheets., availableBuffers.size());}return buffer;}
归还内存页
其实就是清空buffer放入到内存队列中。 public void returnBuffer(ByteBuffer byteBuffer) {byteBuffer.position(0);byteBuffer.limit(fileSize);this.availableBuffers.offerFirst(byteBuffer);}
3.3.3 mappedFile的初始化
在介绍完上面mappedFile的两种写入方式过后mappedFile的初始化就很清晰了。 public void init(final String fileName, final int fileSize,final TransientStorePool transientStorePool) throws IOException {init(fileName, fileSize);//这个是开启瞬时存储技术的话需要从transientStorePool获取到一个bufferthis.writeBuffer transientStorePool.borrowBuffer();this.transientStorePool transientStorePool;}private void init(final String fileName, final int fileSize) throws IOException {this.fileName fileName;this.fileSize fileSize;//构造文件this.file new File(fileName);//可以看出mappedfile的文件名就是他的offsetthis.fileFromOffset Long.parseLong(this.file.getName());boolean ok false;//确保文件夹初始化完成UtilAll.ensureDirOK(this.file.getParent());try {this.fileChannel new RandomAccessFile(this.file, rw).getChannel();//核心就是将mappedByteBuffer映射到对于的磁盘文件上this.mappedByteBuffer this.fileChannel.map(MapMode.READ_WRITE, 0, fileSize);TOTAL_MAPPED_VIRTUAL_MEMORY.addAndGet(fileSize);TOTAL_MAPPED_FILES.incrementAndGet();ok true;} catch (FileNotFoundException e) {log.error(Failed to create file this.fileName, e);throw e;} catch (IOException e) {log.error(Failed to map file this.fileName, e);throw e;} finally {if (!ok this.fileChannel ! null) {this.fileChannel.close();}}}
至此mappedfile中的直接内存缓冲区映射完成堆内存buffer也初始化完成。
3.3.4 mappedFile的数据写入
分析这段逻辑其实我们可以发现本质上是调用的AppendMessageCallback方法写入数据封装到了MessageExt中并且返回了PutMessageContext这一写入结果。 public AppendMessageResult appendMessagesInner(final MessageExt messageExt, final AppendMessageCallback cb,PutMessageContext putMessageContext) {assert messageExt ! null;assert cb ! null;//获取当前的写指针int currentPos WROTE_POSITION_UPDATER.get(this);//判断写指针大小是否超过了文件大小如果超过便抛出异常if (currentPos this.fileSize) {ByteBuffer byteBuffer appendMessageBuffer().slice();byteBuffer.position(currentPos);AppendMessageResult result;if (messageExt instanceof MessageExtBatch !((MessageExtBatch) messageExt).isInnerBatch()) {// traditional batch messageresult cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos,(MessageExtBatch) messageExt, putMessageContext);} else if (messageExt instanceof MessageExtBrokerInner) {// traditional single message or newly introduced inner-batch message//本质上是调用的AppendMessageCallback的doAppend方法result cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos,(MessageExtBrokerInner) messageExt, putMessageContext);} else {return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR);}//更新写指针WROTE_POSITION_UPDATER.addAndGet(this, result.getWroteBytes());//更新时间戳this.storeTimestamp result.getStoreTimestamp();return result;}log.error(MappedFile.appendMessage return null, wrotePosition: {} fileSize: {}, currentPos, this.fileSize);return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR);}
1. MappedFile的写入数据格式-MessageExt
MessageExt包含了消息的一些元信息和它的实际内容。
public class Message implements Serializable {private static final long serialVersionUID 8445773977080406428L;//消息属于哪个topicprivate String topic;private int flag;//额外的附加属性private MapString, String properties;//真正的消息内容private byte[] body;//如果是事务消息有事务idprivate String transactionId;
}public class MessageExt extends Message {private static final long serialVersionUID 5720810158625748049L;//当前broker的名称private String brokerName;//消息属于哪个queueIdprivate int queueId;//消息大小private int storeSize;//消息在queue中的偏移量是多少private long queueOffset;private int sysFlag;//消息产生时间private long bornTimestamp;//消息是诞生主机的ip地址private SocketAddress bornHost;private long storeTimestamp;//消息存储主机的ip地址private SocketAddress storeHost;//消息idprivate String msgId;//消息在commitLog中的offset是多少private long commitLogOffset;private int bodyCRC;private int reconsumeTimes;private long preparedTransactionOffset;} 2.mappedFile的核心写入逻辑-AppendMessageCallback
其实就是在MessageExt的基础上补充上写入commitLog的一些信息并且刷新到buffer中 //byteBuffer - 消息写入到直接内存的buffer//preEncodeBuffer - 消息内容public AppendMessageResult doAppend(final long fileFromOffset, final ByteBuffer byteBuffer, final int maxBlank,final MessageExtBrokerInner msgInner, PutMessageContext putMessageContext) {// STORETIMESTAMP STOREHOSTADDRESS OFFSET br//取出消息的内容preEncodeBuffer里面ByteBuffer preEncodeBuffer msgInner.getEncodedBuff();boolean isMultiDispatchMsg messageStoreConfig.isEnableMultiDispatch() CommitLog.isMultiDispatchMsg(msgInner);if (isMultiDispatchMsg) {AppendMessageResult appendMessageResult handlePropertiesForLmqMsg(preEncodeBuffer, msgInner);if (appendMessageResult ! null) {return appendMessageResult;}}//获取消息的总长度final int msgLen preEncodeBuffer.getInt(0);preEncodeBuffer.position(0);preEncodeBuffer.limit(msgLen);// PHY OFFSET//当前文件的位置消息长度 实际开始写入的位置long wroteOffset fileFromOffset byteBuffer.position();//构建消息id 机器ip端口号偏移量SupplierString msgIdSupplier () - {int sysflag msgInner.getSysFlag();int msgIdLen (sysflag MessageSysFlag.STOREHOSTADDRESS_V6_FLAG) 0 ? 4 4 8 : 16 4 8;ByteBuffer msgIdBuffer ByteBuffer.allocate(msgIdLen);MessageExt.socketAddress2ByteBuffer(msgInner.getStoreHost(), msgIdBuffer);msgIdBuffer.clear();//because socketAddress2ByteBuffer flip the buffermsgIdBuffer.putLong(msgIdLen - 8, wroteOffset);return UtilAll.bytes2string(msgIdBuffer.array());};// Record ConsumeQueue information//记录consumerqueue中的偏移量Long queueOffset msgInner.getQueueOffset();// this msg maybe an inner-batch msg.short messageNum getMessageNum(msgInner);// Transaction messages that require special handlingfinal int tranType MessageSysFlag.getTransactionValue(msgInner.getSysFlag());switch (tranType) {// Prepared and Rollback message is not consumed, will not enter the consume queuecase MessageSysFlag.TRANSACTION_PREPARED_TYPE:case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE:queueOffset 0L;break;case MessageSysFlag.TRANSACTION_NOT_TYPE:case MessageSysFlag.TRANSACTION_COMMIT_TYPE:default:break;}//如果写入文件大小超过了磁盘大小抛出END_OF_FILE的异常// Determines whether there is sufficient free spaceif ((msgLen END_FILE_MIN_BLANK_LENGTH) maxBlank) {this.msgStoreItemMemory.clear();// 1 TOTALSIZEthis.msgStoreItemMemory.putInt(maxBlank);// 2 MAGICCODEthis.msgStoreItemMemory.putInt(CommitLog.BLANK_MAGIC_CODE);// 3 The remaining space may be any value// Here the length of the specially set maxBlankfinal long beginTimeMills CommitLog.this.defaultMessageStore.now();byteBuffer.put(this.msgStoreItemMemory.array(), 0, 8);return new AppendMessageResult(AppendMessageStatus.END_OF_FILE, wroteOffset,maxBlank, /* only wrote 8 bytes, but declare wrote maxBlank for compute write position */msgIdSupplier, msgInner.getStoreTimestamp(),queueOffset, CommitLog.this.defaultMessageStore.now() - beginTimeMills);}//更新queue中偏移量、物理地址、产生消息的时间戳、机器地址等int pos 4 4 4 4 4;// 6 QUEUEOFFSETpreEncodeBuffer.putLong(pos, queueOffset);pos 8;// 7 PHYSICALOFFSETpreEncodeBuffer.putLong(pos, fileFromOffset byteBuffer.position());int ipLen (msgInner.getSysFlag() MessageSysFlag.BORNHOST_V6_FLAG) 0 ? 4 4 : 16 4;// 8 SYSFLAG, 9 BORNTIMESTAMP, 10 BORNHOST, 11 STORETIMESTAMPpos 8 4 8 ipLen;// refresh store time stamp in lockpreEncodeBuffer.putLong(pos, msgInner.getStoreTimestamp());if (enabledAppendPropCRC) {// 18 CRC32int checkSize msgLen - crc32ReservedLength;ByteBuffer tmpBuffer preEncodeBuffer.duplicate();tmpBuffer.limit(tmpBuffer.position() checkSize);int crc32 UtilAll.crc32(tmpBuffer);tmpBuffer.limit(tmpBuffer.position() crc32ReservedLength);MessageDecoder.createCrc32(tmpBuffer, crc32);}final long beginTimeMills CommitLog.this.defaultMessageStore.now();CommitLog.this.getMessageStore().getPerfCounter().startTick(WRITE_MEMORY_TIME_MS);// Write messages to the queue buffer//写入文件到直接内存中byteBuffer.put(preEncodeBuffer);CommitLog.this.getMessageStore().getPerfCounter().endTick(WRITE_MEMORY_TIME_MS);msgInner.setEncodedBuff(null);if (isMultiDispatchMsg) {CommitLog.this.multiDispatch.updateMultiQueueOffset(msgInner);}//返回写入成功return new AppendMessageResult(AppendMessageStatus.PUT_OK, wroteOffset, msgLen, msgIdSupplier,msgInner.getStoreTimestamp(), queueOffset, CommitLog.this.defaultMessageStore.now() - beginTimeMills, messageNum);}
3.mappedFile的写入结果-AppendMessageResult
主要是返回了写入的物理地址消息的MsgIdconsumerqueue中的逻辑地址。可以看出MsgId其实是在写入到commitLog后生成的因为里面需要包含写入的物理地址。
public class AppendMessageResult {// Return codeprivate AppendMessageStatus status;// Where to start writingprivate long wroteOffset;// Write Bytesprivate int wroteBytes;// Message IDprivate String msgId;private SupplierString msgIdSupplier;// Message storage timestampprivate long storeTimestamp;// Consume queues offset(step by one)private long logicsOffset;private long pagecacheRT 0;
}
3.3.5 mappedFile的数据commit操作
其实就是将数据写入到bytebuffer中. public int commit(final int commitLeastPages) {//如果没有开启瞬时缓存技术直接返回写指针if (writeBuffer null) {//no need to commit data to file channel, so just regard wrotePosition as committedPosition.return WROTE_POSITION_UPDATER.get(this);}//no need to commit data to file channel, so just set committedPosition to wrotePosition.if (transientStorePool ! null !transientStorePool.isRealCommit()) {COMMITTED_POSITION_UPDATER.set(this, WROTE_POSITION_UPDATER.get(this));} else if (this.isAbleToCommit(commitLeastPages)) {if (this.hold()) {commit0();this.release();} else {log.warn(in commit, hold failed, commit offset COMMITTED_POSITION_UPDATER.get(this));}
} protected void commit0() {int writePos WROTE_POSITION_UPDATER.get(this);int lastCommittedPosition COMMITTED_POSITION_UPDATER.get(this);if (writePos - lastCommittedPosition 0) {try {ByteBuffer byteBuffer writeBuffer.slice();byteBuffer.position(lastCommittedPosition);byteBuffer.limit(writePos);//其实就是将当前数据写入到对内存buffer中this.fileChannel.position(lastCommittedPosition);this.fileChannel.write(byteBuffer);//更新commit指针COMMITTED_POSITION_UPDATER.set(this, writePos);} catch (Throwable e) {log.error(Error occurred when commit data to FileChannel., e);}}}
3.3.6 mappedFile数据的flush操作 public int flush(final int flushLeastPages) {if (this.isAbleToFlush(flushLeastPages)) {if (this.hold()) {int value getReadPosition();try {this.mappedByteBufferAccessCountSinceLastSwap;//We only append data to fileChannel or mappedByteBuffer, never both.if (writeBuffer ! null || this.fileChannel.position() ! 0) {this.fileChannel.force(false);} else {//利用force方法将缓存中数据刷入到磁盘中this.mappedByteBuffer.force();}this.lastFlushTime System.currentTimeMillis();} catch (Throwable e) {log.error(Error occurred when force data to disk., e);}FLUSHED_POSITION_UPDATER.set(this, value);this.release();} else {log.warn(in flush, hold failed, flush offset FLUSHED_POSITION_UPDATER.get(this));FLUSHED_POSITION_UPDATER.set(this, getReadPosition());}}return this.getFlushedPosition();}
3.3.7 mappedFile数据的内存预热 public void warmMappedFile(FlushDiskType type, int pages) {this.mappedByteBufferAccessCountSinceLastSwap;long beginTime System.currentTimeMillis();ByteBuffer byteBuffer this.mappedByteBuffer.slice();long flush 0;// long time System.currentTimeMillis();//fileSize总的mappedfile大小即每隔4kb便向mappedByteBuffer写入一个零否者的话会产生频繁的页中断导致磁盘和数据频繁交互。for (long i 0, j 0; i this.fileSize; i DefaultMappedFile.OS_PAGE_SIZE, j) {byteBuffer.put((int) i, (byte) 0);// 如果是同步刷盘的话便会进行缓存预热因为同步刷盘可能一次数据量很小造成频繁的os//与buffer的交互if (type FlushDiskType.SYNC_FLUSH) {if ((i / OS_PAGE_SIZE) - (flush / OS_PAGE_SIZE) pages) {flush i;mappedByteBuffer.force();}}}
4.总结
mappedFile本质上是利用mapp技术来提高读写效率的而mappedFile的核心本质上就是mappedFileBuffer默认大小为1G可以由mappedFileSizeCommitLog来进行控制。