php网站安装图解,进行网站开发的所有步骤,青海城乡建设网站,wordpress去除acfRocketMQ源码阅读-Message拉取与消费-Broker篇 1. ConsumeQueue是什么2. Message重放2.1 从MappedFile文件读取Message到ConsumeQueue2.2 ConsumeQueue持久化 3. Broker提供的拉取接口3.1 请求Header3.2 拉取消息接口3.3 拉取失败处理 4. Broker提供的更新消费进度接口5. Broke… RocketMQ源码阅读-Message拉取与消费-Broker篇 1. ConsumeQueue是什么2. Message重放2.1 从MappedFile文件读取Message到ConsumeQueue2.2 ConsumeQueue持久化 3. Broker提供的拉取接口3.1 请求Header3.2 拉取消息接口3.3 拉取失败处理 4. Broker提供的更新消费进度接口5. Broker 提供发回消息接口6. 小结 上一篇分析到Message由CommitLog存储到了MappedFile 文件中。
消费者就可以去拉取并消费Message了。
Message的拉取与消费是由Broker和Consume共同完成的本篇先来看Broker对于Message拉取的处理代码。 DefaultMessageStore的内部类ReputMessageService完成消息的重放将CommitLog存储到了MappedFile的Message进行重放存储到ConsumeQueue中。
Rocket框架图
借用一张图出处https://www.iocoder.cn/RocketMQ/message-pull-and-consume-first/描述了消费逻辑图中
Prodcer消息生产者CommitLog消息持久化存储的位置ConsumeQueue存储消息在CommitLog中的位置信息Consumer消息消费者
1. ConsumeQueue是什么
ConsumeQueue中存储消息在CommitLog中的位置信息。ConsumeQueue、MappedFileQueue、MappedFile 的关系如下
2. Message重放 2.1 从MappedFile文件读取Message到ConsumeQueue
消息存储到文件后消费方需要感知新消息的到来。这个过程就是Message重放机制。Message重放是由DefaultMessageStore的内部类ReputMessageService完成的。
ReputMessageService实现了Runable接口并重写了run方法
Override
public void run() {DefaultMessageStore.log.info(this.getServiceName() service started);while (!this.isStopped()) {try {Thread.sleep(1);this.doReput();} catch (Exception e) {DefaultMessageStore.log.warn(this.getServiceName() service has exception. , e);}}DefaultMessageStore.log.info(this.getServiceName() service end);
}run方法执行的逻辑是每1ms执行一遍ReputMessageService#doReput()方法。这个线程是在BrokerStartup启动时调用调用BrokerStartup#start方法再调用BrokerController#start方法再调用DefaultMessageStore#start方法最终调用ReputMessageService#start方法其实是Thread的start方法最终调用ReputMessageService#run方法。
ReputMessageService类源码如下
class ReputMessageService extends ServiceThread {// 开始重放消息的CommitLog物理位置private volatile long reputFromOffset 0;public long getReputFromOffset() {return reputFromOffset;}public void setReputFromOffset(long reputFromOffset) {this.reputFromOffset reputFromOffset;}Overridepublic void shutdown() {for (int i 0; i 50 this.isCommitLogAvailable(); i) {try {Thread.sleep(100);} catch (InterruptedException ignored) {}}if (this.isCommitLogAvailable()) {log.warn(shutdown ReputMessageService, but commitlog have not finish to be dispatched, CL: {} reputFromOffset: {},DefaultMessageStore.this.commitLog.getMaxOffset(), this.reputFromOffset);}super.shutdown();}// 剩余需要重放消息字节数public long behind() {return DefaultMessageStore.this.commitLog.getMaxOffset() - this.reputFromOffset;}// 是否commitLog需要重放消息private boolean isCommitLogAvailable() {return this.reputFromOffset DefaultMessageStore.this.commitLog.getMaxOffset();}private void doReput() {if (this.reputFromOffset DefaultMessageStore.this.commitLog.getMinOffset()) {log.warn(The reputFromOffset{} is smaller than minPyOffset{}, this usually indicate that the dispatch behind too much and the commitlog has expired.,this.reputFromOffset, DefaultMessageStore.this.commitLog.getMinOffset());this.reputFromOffset DefaultMessageStore.this.commitLog.getMinOffset();}for (boolean doNext true; this.isCommitLogAvailable() doNext; ) {if (DefaultMessageStore.this.getMessageStoreConfig().isDuplicationEnable() this.reputFromOffset DefaultMessageStore.this.getConfirmOffset()) {break;}// 获取从reputFromOffset开始的commitLog对应的MappeFile对应的MappedByteBufferSelectMappedBufferResult result DefaultMessageStore.this.commitLog.getData(reputFromOffset);if (result ! null) {try {this.reputFromOffset result.getStartOffset();// 遍历MappedByteBufferfor (int readSize 0; readSize result.getSize() doNext; ) {// 生成重放消息重放调度请求DispatchRequest dispatchRequest DefaultMessageStore.this.commitLog.checkMessageAndReturnSize(result.getByteBuffer(), false, false);// 消息长度int size dispatchRequest.getBufferSize() -1 ? dispatchRequest.getMsgSize() : dispatchRequest.getBufferSize();// 根据请求的结果处理if (dispatchRequest.isSuccess()) {if (size 0) {// 读取MessageDefaultMessageStore.this.doDispatch(dispatchRequest);// 通知有新消息if (BrokerRole.SLAVE ! DefaultMessageStore.this.getMessageStoreConfig().getBrokerRole() DefaultMessageStore.this.brokerConfig.isLongPollingEnable()) {DefaultMessageStore.this.messageArrivingListener.arriving(dispatchRequest.getTopic(),dispatchRequest.getQueueId(), dispatchRequest.getConsumeQueueOffset() 1,dispatchRequest.getTagsCode(), dispatchRequest.getStoreTimestamp(),dispatchRequest.getBitMap(), dispatchRequest.getPropertiesMap());}// 统计this.reputFromOffset size;readSize size;if (DefaultMessageStore.this.getMessageStoreConfig().getBrokerRole() BrokerRole.SLAVE) {DefaultMessageStore.this.storeStatsService.getSinglePutMessageTopicTimesTotal(dispatchRequest.getTopic()).incrementAndGet();DefaultMessageStore.this.storeStatsService.getSinglePutMessageTopicSizeTotal(dispatchRequest.getTopic()).addAndGet(dispatchRequest.getMsgSize());}} else if (size 0) {// 读取到MappedFile文件尾this.reputFromOffset DefaultMessageStore.this.commitLog.rollNextFile(this.reputFromOffset);readSize result.getSize();}} else if (!dispatchRequest.isSuccess()) {// 读取失败if (size 0) {log.error([BUG]read total count not equals msg total size. reputFromOffset{}, reputFromOffset);this.reputFromOffset size;} else {doNext false;// If user open the dledger pattern or the broker is master node,// it will not ignore the exception and fix the reputFromOffset variableif (DefaultMessageStore.this.getMessageStoreConfig().isEnableDLegerCommitLog() ||DefaultMessageStore.this.brokerConfig.getBrokerId() MixAll.MASTER_ID) {log.error([BUG]dispatch message to consume queue error, COMMITLOG OFFSET: {},this.reputFromOffset);this.reputFromOffset result.getSize() - readSize;}}}}} finally {result.release();}} else {doNext false;}}}Overridepublic void run() {DefaultMessageStore.log.info(this.getServiceName() service started);while (!this.isStopped()) {try {Thread.sleep(1);this.doReput();} catch (Exception e) {DefaultMessageStore.log.warn(this.getServiceName() service has exception. , e);}}DefaultMessageStore.log.info(this.getServiceName() service end);}Overridepublic String getServiceName() {return ReputMessageService.class.getSimpleName();}}ReputMessageService的两个主要功能
不断生成消息位置信息到消息队列 ConsumeQueue不断生成消息索引到索引文件 IndexFile
查看第71行读取Message的方法DefaultMessageStore#doDispatch(…)
public void doDispatch(DispatchRequest req) {for (CommitLogDispatcher dispatcher : this.dispatcherList) {dispatcher.dispatch(req);}
}循环调用CommitLogDispatcher#dispatchCommitLogDispatcher是接口有三个实现类
CommitLogDispatcherBuildConsumeQueueDefaultMessageStore的内部类操作ConsumeQueueCommitLogDispatcherBuildIndexDefaultMessageStore的内部类操作索引文件CommitLogDispatcherCalcBitMap//todo 还没分析
在DefaultMessageStore的构造方法中会进行dispatcherList链表的初始化
this.dispatcherList new LinkedList();
this.dispatcherList.addLast(new CommitLogDispatcherBuildConsumeQueue());
this.dispatcherList.addLast(new CommitLogDispatcherBuildIndex());因此会依次调用CommitLogDispatcherBuildConsumeQueue#dispatch和CommitLogDispatcherBuildIndex#dispatch。先来看CommitLogDispatcherBuildConsumeQueue#dispatch
class CommitLogDispatcherBuildConsumeQueue implements CommitLogDispatcher {Overridepublic void dispatch(DispatchRequest request) {final int tranType MessageSysFlag.getTransactionValue(request.getSysFlag());switch (tranType) {case MessageSysFlag.TRANSACTION_NOT_TYPE:case MessageSysFlag.TRANSACTION_COMMIT_TYPE:// 非事务消息 或 事务提交消息 建立 消息位置信息 到 ConsumeQueueDefaultMessageStore.this.putMessagePositionInfo(request);break;case MessageSysFlag.TRANSACTION_PREPARED_TYPE:case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE:break;}}
}主要功能是非事务消息 或 事务提交消息 建立 消息位置信息 到 ConsumeQueue。调用了DefaultMessageStore#putMessagePositionInfo
// 建立 消息位置信息 到 ConsumeQueue
public void putMessagePositionInfo(DispatchRequest dispatchRequest) {ConsumeQueue cq this.findConsumeQueue(dispatchRequest.getTopic(), dispatchRequest.getQueueId());cq.putMessagePositionInfoWrapper(dispatchRequest);
}DefaultMessageStore#putMessagePositionInfoWrapper
// 添加位置信息封装
public void putMessagePositionInfoWrapper(DispatchRequest request) {final int maxRetries 30;boolean canWrite this.defaultMessageStore.getRunningFlags().isCQWriteable();// 多次循环写直到成功for (int i 0; i maxRetries canWrite; i) {long tagsCode request.getTagsCode();if (isExtWriteEnable()) {ConsumeQueueExt.CqExtUnit cqExtUnit new ConsumeQueueExt.CqExtUnit();cqExtUnit.setFilterBitMap(request.getBitMap());cqExtUnit.setMsgStoreTime(request.getStoreTimestamp());cqExtUnit.setTagsCode(request.getTagsCode());long extAddr this.consumeQueueExt.put(cqExtUnit);if (isExtAddr(extAddr)) {tagsCode extAddr;} else {log.warn(Save consume queue extend fail, So just save tagsCode! {}, topic:{}, queueId:{}, offset:{}, cqExtUnit,topic, queueId, request.getCommitLogOffset());}}// 调用添加位置信息boolean result this.putMessagePositionInfo(request.getCommitLogOffset(),request.getMsgSize(), tagsCode, request.getConsumeQueueOffset());if (result) {// 添加成功使用消息存储时间 作为 存储check pointthis.defaultMessageStore.getStoreCheckpoint().setLogicsMsgTimestamp(request.getStoreTimestamp());return;} else {// XXX: warn and notify melog.warn([BUG]put commit log position info to topic : queueId request.getCommitLogOffset() failed, retry i times);try {Thread.sleep(1000);} catch (InterruptedException e) {log.warn(, e);}}}// XXX: warn and notify melog.error([BUG]consume queue can not write, {} {}, this.topic, this.queueId);this.defaultMessageStore.getRunningFlags().makeLogicsQueueError();
}调用添加位置信息方法为DefaultMessageStore#putMessagePositionInfo
// 添加位置信息并返回添加是否成功
private boolean putMessagePositionInfo(final long offset, final int size, final long tagsCode,final long cqOffset) {// 如果已经重放过直接返回成功if (offset size this.maxPhysicOffset) {log.warn(Maybe try to build consume queue repeatedly maxPhysicOffset{} phyOffset{}, maxPhysicOffset, offset);return true;}// 写入位置信息到byteBufferthis.byteBufferIndex.flip();this.byteBufferIndex.limit(CQ_STORE_UNIT_SIZE);this.byteBufferIndex.putLong(offset);this.byteBufferIndex.putInt(size);this.byteBufferIndex.putLong(tagsCode);// 计算consumeQueue存储位置并获得对应的MappedFilefinal long expectLogicOffset cqOffset * CQ_STORE_UNIT_SIZE;MappedFile mappedFile this.mappedFileQueue.getLastMappedFile(expectLogicOffset);if (mappedFile ! null) {// 当是ConsumeQueue第一个MappedFile 队列位置非第一个 MappedFile未写入内容则填充前置空白占位if (mappedFile.isFirstCreateInQueue() cqOffset ! 0 mappedFile.getWrotePosition() 0) {this.minLogicOffset expectLogicOffset;this.mappedFileQueue.setFlushedWhere(expectLogicOffset);this.mappedFileQueue.setCommittedWhere(expectLogicOffset);// 填充前置空白占位this.fillPreBlank(mappedFile, expectLogicOffset);log.info(fill pre blank space mappedFile.getFileName() expectLogicOffset mappedFile.getWrotePosition());}// 校验consumeQueue存储位置是否合法if (cqOffset ! 0) {long currentLogicOffset mappedFile.getWrotePosition() mappedFile.getFileFromOffset();if (expectLogicOffset currentLogicOffset) {log.warn(Build consume queue repeatedly, expectLogicOffset: {} currentLogicOffset: {} Topic: {} QID: {} Diff: {},expectLogicOffset, currentLogicOffset, this.topic, this.queueId, expectLogicOffset - currentLogicOffset);return true;}if (expectLogicOffset ! currentLogicOffset) {LOG_ERROR.warn([BUG]logic queue order maybe wrong, expectLogicOffset: {} currentLogicOffset: {} Topic: {} QID: {} Diff: {},expectLogicOffset,currentLogicOffset,this.topic,this.queueId,expectLogicOffset - currentLogicOffset);}}// 设置commitLog重放消息到ConsumeQueue位置this.maxPhysicOffset offset size;// 插入mappedFilereturn mappedFile.appendMessage(this.byteBufferIndex.array());}return false;
}填充前置空白占位调用的方法为DefaultMessageStore#fillPreBlank
private void fillPreBlank(final MappedFile mappedFile, final long untilWhere) {// 写入前置空白占位到byteBufferByteBuffer byteBuffer ByteBuffer.allocate(CQ_STORE_UNIT_SIZE);byteBuffer.putLong(0L);byteBuffer.putInt(Integer.MAX_VALUE);byteBuffer.putLong(0L);int until (int) (untilWhere % this.mappedFileQueue.getMappedFileSize());// 循环填空for (int i 0; i until; i CQ_STORE_UNIT_SIZE) {mappedFile.appendMessage(byteBuffer.array());}
}再来看CommitLogDispatcherBuildIndex#dispatch
class CommitLogDispatcherBuildIndex implements CommitLogDispatcher {// 建立 索引信息 到 IndexFileOverridepublic void dispatch(DispatchRequest request) {if (DefaultMessageStore.this.messageStoreConfig.isMessageIndexEnable()) {DefaultMessageStore.this.indexService.buildIndex(request);}}
}主要功能是建立 索引信息 到 IndexFile详细流程暂时不做分析。 2.2 ConsumeQueue持久化
ConsumeQueue保存了消息在CommitLog中的位置信息。FlushConsumeQueueService负责ConsumeQueue的持久化工作源码如下
class FlushConsumeQueueService extends ServiceThread {private static final int RETRY_TIMES_OVER 3;// 最后flush时间戳private long lastFlushTimestamp 0;private void doFlush(int retryTimes) {int flushConsumeQueueLeastPages DefaultMessageStore.this.getMessageStoreConfig().getFlushConsumeQueueLeastPages();// retryTimes RETRY_TIMES_OVER时进行强制flush。主要用于shutdown时。if (retryTimes RETRY_TIMES_OVER) {flushConsumeQueueLeastPages 0;} 当时间满足flushConsumeQueueThoroughInterval时即使写入的数量不足flushConsumeQueueLeastPages也进行flushlong logicsMsgTimestamp 0;int flushConsumeQueueThoroughInterval DefaultMessageStore.this.getMessageStoreConfig().getFlushConsumeQueueThoroughInterval();long currentTimeMillis System.currentTimeMillis();if (currentTimeMillis (this.lastFlushTimestamp flushConsumeQueueThoroughInterval)) {this.lastFlushTimestamp currentTimeMillis;flushConsumeQueueLeastPages 0;logicsMsgTimestamp DefaultMessageStore.this.getStoreCheckpoint().getLogicsMsgTimestamp();}// flush消费队列ConcurrentMapString, ConcurrentMapInteger, ConsumeQueue tables DefaultMessageStore.this.consumeQueueTable;for (ConcurrentMapInteger, ConsumeQueue maps : tables.values()) {for (ConsumeQueue cq : maps.values()) {boolean result false;for (int i 0; i retryTimes !result; i) {result cq.flush(flushConsumeQueueLeastPages);}}}// flush 存储 check pointif (0 flushConsumeQueueLeastPages) {if (logicsMsgTimestamp 0) {DefaultMessageStore.this.getStoreCheckpoint().setLogicsMsgTimestamp(logicsMsgTimestamp);}DefaultMessageStore.this.getStoreCheckpoint().flush();}}public void run() {DefaultMessageStore.log.info(this.getServiceName() service started);while (!this.isStopped()) {try {int interval DefaultMessageStore.this.getMessageStoreConfig().getFlushIntervalConsumeQueue();this.waitForRunning(interval);this.doFlush(1);} catch (Exception e) {DefaultMessageStore.log.warn(this.getServiceName() service has exception. , e);}}this.doFlush(RETRY_TIMES_OVER);DefaultMessageStore.log.info(this.getServiceName() service end);}Overridepublic String getServiceName() {return FlushConsumeQueueService.class.getSimpleName();}Overridepublic long getJointime() {return 1000 * 60;}
}flush操作每1000ms执行一次。
3. Broker提供的拉取接口 3.1 请求Header
拉取消息的请求Header为PullMessageRequestHeader
public class PullMessageRequestHeader implements CommandCustomHeader {// 消费者分组CFNotNullprivate String consumerGroup;// TopicCFNotNullprivate String topic;// 队列编号CFNotNullprivate Integer queueId;// 队列开始位置CFNotNullprivate Long queueOffset;// 消息数量CFNotNullprivate Integer maxMsgNums;// 系统标识CFNotNullprivate Integer sysFlag;// 提交消费进度位置CFNotNullprivate Long commitOffset;// 挂起超时时间CFNotNullprivate Long suspendTimeoutMillis;// 订阅表达式CFNullableprivate String subscription;// 订阅版本号CFNotNullprivate Long subVersion;private String expressionType;Overridepublic void checkFields() throws RemotingCommandException {}
}sysFlag 系统标识。 第 0 位 FLAG_COMMIT_OFFSET 标记请求提交消费进度位置和 commitOffset 配合。第 1 位 FLAG_SUSPEND 标记请求是否挂起请求和 suspendTimeoutMillis 配合。当拉取不到消息时 Broker 会挂起请求直到有消息。最大挂起时间suspendTimeoutMillis 毫秒。第 2 位 FLAG_SUBSCRIPTION 是否过滤订阅表达式和 subscription 配置。 subVersion 订阅版本号。请求时如果版本号不对则无法拉取到消息需要重新获取订阅信息使用最新的订阅版本号。 3.2 拉取消息接口
Broker提供的拉取消息接口为PullMessageProcessor#processRequest(…)源码非常长慢慢看
private RemotingCommand processRequest(final Channel channel, RemotingCommand request, boolean brokerAllowSuspend)throws RemotingCommandException {RemotingCommand response RemotingCommand.createResponseCommand(PullMessageResponseHeader.class);final PullMessageResponseHeader responseHeader (PullMessageResponseHeader) response.readCustomHeader();final PullMessageRequestHeader requestHeader (PullMessageRequestHeader) request.decodeCommandCustomHeader(PullMessageRequestHeader.class);response.setOpaque(request.getOpaque());log.debug(receive PullMessage request command, {}, request);// 校验 broker 是否可读if (!PermName.isReadable(this.brokerController.getBrokerConfig().getBrokerPermission())) {response.setCode(ResponseCode.NO_PERMISSION);response.setRemark(String.format(the broker[%s] pulling message is forbidden, this.brokerController.getBrokerConfig().getBrokerIP1()));return response;}// 校验 consumer分组配置 是否存在SubscriptionGroupConfig subscriptionGroupConfig this.brokerController.getSubscriptionGroupManager().findSubscriptionGroupConfig(requestHeader.getConsumerGroup());if (null subscriptionGroupConfig) {response.setCode(ResponseCode.SUBSCRIPTION_GROUP_NOT_EXIST);response.setRemark(String.format(subscription group [%s] does not exist, %s, requestHeader.getConsumerGroup(), FAQUrl.suggestTodo(FAQUrl.SUBSCRIPTION_GROUP_NOT_EXIST)));return response;}// 校验 consumer分组配置 是否可消费if (!subscriptionGroupConfig.isConsumeEnable()) {response.setCode(ResponseCode.NO_PERMISSION);response.setRemark(subscription group no permission, requestHeader.getConsumerGroup());return response;}final boolean hasSuspendFlag PullSysFlag.hasSuspendFlag(requestHeader.getSysFlag());final boolean hasCommitOffsetFlag PullSysFlag.hasCommitOffsetFlag(requestHeader.getSysFlag());final boolean hasSubscriptionFlag PullSysFlag.hasSubscriptionFlag(requestHeader.getSysFlag());final long suspendTimeoutMillisLong hasSuspendFlag ? requestHeader.getSuspendTimeoutMillis() : 0;// 校验 topic配置 存在TopicConfig topicConfig this.brokerController.getTopicConfigManager().selectTopicConfig(requestHeader.getTopic());if (null topicConfig) {log.error(the topic {} not exist, consumer: {}, requestHeader.getTopic(), RemotingHelper.parseChannelRemoteAddr(channel));response.setCode(ResponseCode.TOPIC_NOT_EXIST);response.setRemark(String.format(topic[%s] not exist, apply first please! %s, requestHeader.getTopic(), FAQUrl.suggestTodo(FAQUrl.APPLY_TOPIC_URL)));return response;}// 校验 topic配置 权限可读if (!PermName.isReadable(topicConfig.getPerm())) {response.setCode(ResponseCode.NO_PERMISSION);response.setRemark(the topic[ requestHeader.getTopic() ] pulling message is forbidden);return response;}// 校验 读取队列 在 topic配置 队列范围内if (requestHeader.getQueueId() 0 || requestHeader.getQueueId() topicConfig.getReadQueueNums()) {String errorInfo String.format(queueId[%d] is illegal, topic:[%s] topicConfig.readQueueNums:[%d] consumer:[%s],requestHeader.getQueueId(), requestHeader.getTopic(), topicConfig.getReadQueueNums(), channel.remoteAddress());log.warn(errorInfo);response.setCode(ResponseCode.SYSTEM_ERROR);response.setRemark(errorInfo);return response;}// 校验 订阅关系SubscriptionData subscriptionData null;ConsumerFilterData consumerFilterData null;if (hasSubscriptionFlag) {try {subscriptionData FilterAPI.build(requestHeader.getTopic(), requestHeader.getSubscription(), requestHeader.getExpressionType());if (!ExpressionType.isTagType(subscriptionData.getExpressionType())) {consumerFilterData ConsumerFilterManager.build(requestHeader.getTopic(), requestHeader.getConsumerGroup(), requestHeader.getSubscription(),requestHeader.getExpressionType(), requestHeader.getSubVersion());assert consumerFilterData ! null;}} catch (Exception e) {log.warn(Parse the consumers subscription[{}] failed, group: {}, requestHeader.getSubscription(),requestHeader.getConsumerGroup());response.setCode(ResponseCode.SUBSCRIPTION_PARSE_FAILED);response.setRemark(parse the consumers subscription failed);return response;}} else {// 校验 消费分组信息 是否存在ConsumerGroupInfo consumerGroupInfo this.brokerController.getConsumerManager().getConsumerGroupInfo(requestHeader.getConsumerGroup());if (null consumerGroupInfo) {log.warn(the consumers group info not exist, group: {}, requestHeader.getConsumerGroup());response.setCode(ResponseCode.SUBSCRIPTION_NOT_EXIST);response.setRemark(the consumers group info not exist FAQUrl.suggestTodo(FAQUrl.SAME_GROUP_DIFFERENT_TOPIC));return response;}// 校验 消费分组信息 消息模型是否匹配if (!subscriptionGroupConfig.isConsumeBroadcastEnable() consumerGroupInfo.getMessageModel() MessageModel.BROADCASTING) {response.setCode(ResponseCode.NO_PERMISSION);response.setRemark(the consumer group[ requestHeader.getConsumerGroup() ] can not consume by broadcast way);return response;}// 校验 订阅信息 是否存在subscriptionData consumerGroupInfo.findSubscriptionData(requestHeader.getTopic());if (null subscriptionData) {log.warn(the consumers subscription not exist, group: {}, topic:{}, requestHeader.getConsumerGroup(), requestHeader.getTopic());response.setCode(ResponseCode.SUBSCRIPTION_NOT_EXIST);response.setRemark(the consumers subscription not exist FAQUrl.suggestTodo(FAQUrl.SAME_GROUP_DIFFERENT_TOPIC));return response;}// 校验 订阅信息版本 是否合法if (subscriptionData.getSubVersion() requestHeader.getSubVersion()) {log.warn(The brokers subscription is not latest, group: {} {}, requestHeader.getConsumerGroup(),subscriptionData.getSubString());response.setCode(ResponseCode.SUBSCRIPTION_NOT_LATEST);response.setRemark(the consumers subscription not latest);return response;}if (!ExpressionType.isTagType(subscriptionData.getExpressionType())) {consumerFilterData this.brokerController.getConsumerFilterManager().get(requestHeader.getTopic(),requestHeader.getConsumerGroup());if (consumerFilterData null) {response.setCode(ResponseCode.FILTER_DATA_NOT_EXIST);response.setRemark(The brokers consumer filter data is not exist!Your expression may be wrong!);return response;}if (consumerFilterData.getClientVersion() requestHeader.getSubVersion()) {log.warn(The brokers consumer filter data is not latest, group: {}, topic: {}, serverV: {}, clientV: {},requestHeader.getConsumerGroup(), requestHeader.getTopic(), consumerFilterData.getClientVersion(), requestHeader.getSubVersion());response.setCode(ResponseCode.FILTER_DATA_NOT_LATEST);response.setRemark(the consumers consumer filter data not latest);return response;}}}if (!ExpressionType.isTagType(subscriptionData.getExpressionType()) !this.brokerController.getBrokerConfig().isEnablePropertyFilter()) {response.setCode(ResponseCode.SYSTEM_ERROR);response.setRemark(The broker does not support consumer to filter message by subscriptionData.getExpressionType());return response;}MessageFilter messageFilter;if (this.brokerController.getBrokerConfig().isFilterSupportRetry()) {messageFilter new ExpressionForRetryMessageFilter(subscriptionData, consumerFilterData,this.brokerController.getConsumerFilterManager());} else {messageFilter new ExpressionMessageFilter(subscriptionData, consumerFilterData,this.brokerController.getConsumerFilterManager());}// 获取消息final GetMessageResult getMessageResult this.brokerController.getMessageStore().getMessage(requestHeader.getConsumerGroup(), requestHeader.getTopic(),requestHeader.getQueueId(), requestHeader.getQueueOffset(), requestHeader.getMaxMsgNums(), messageFilter);if (getMessageResult ! null) {response.setRemark(getMessageResult.getStatus().name());responseHeader.setNextBeginOffset(getMessageResult.getNextBeginOffset());responseHeader.setMinOffset(getMessageResult.getMinOffset());responseHeader.setMaxOffset(getMessageResult.getMaxOffset());// 计算建议读取brokerIdif (getMessageResult.isSuggestPullingFromSlave()) {responseHeader.setSuggestWhichBrokerId(subscriptionGroupConfig.getWhichBrokerWhenConsumeSlowly());} else {responseHeader.setSuggestWhichBrokerId(MixAll.MASTER_ID);}switch (this.brokerController.getMessageStoreConfig().getBrokerRole()) {case ASYNC_MASTER:case SYNC_MASTER:break;case SLAVE:if (!this.brokerController.getBrokerConfig().isSlaveReadEnable()) {response.setCode(ResponseCode.PULL_RETRY_IMMEDIATELY);responseHeader.setSuggestWhichBrokerId(MixAll.MASTER_ID);}break;}if (this.brokerController.getBrokerConfig().isSlaveReadEnable()) {// consume too slow ,redirect to another machineif (getMessageResult.isSuggestPullingFromSlave()) {responseHeader.setSuggestWhichBrokerId(subscriptionGroupConfig.getWhichBrokerWhenConsumeSlowly());}// consume okelse {responseHeader.setSuggestWhichBrokerId(subscriptionGroupConfig.getBrokerId());}} else {responseHeader.setSuggestWhichBrokerId(MixAll.MASTER_ID);}switch (getMessageResult.getStatus()) {case FOUND:response.setCode(ResponseCode.SUCCESS);break;case MESSAGE_WAS_REMOVING:response.setCode(ResponseCode.PULL_RETRY_IMMEDIATELY);break;case NO_MATCHED_LOGIC_QUEUE:case NO_MESSAGE_IN_QUEUE:if (0 ! requestHeader.getQueueOffset()) {response.setCode(ResponseCode.PULL_OFFSET_MOVED);// XXX: warn and notify melog.info(the broker store no queue data, fix the request offset {} to {}, Topic: {} QueueId: {} Consumer Group: {},requestHeader.getQueueOffset(),getMessageResult.getNextBeginOffset(),requestHeader.getTopic(),requestHeader.getQueueId(),requestHeader.getConsumerGroup());} else {response.setCode(ResponseCode.PULL_NOT_FOUND);}break;case NO_MATCHED_MESSAGE:response.setCode(ResponseCode.PULL_RETRY_IMMEDIATELY);break;case OFFSET_FOUND_NULL:response.setCode(ResponseCode.PULL_NOT_FOUND);break;case OFFSET_OVERFLOW_BADLY:response.setCode(ResponseCode.PULL_OFFSET_MOVED);// XXX: warn and notify melog.info(the request offset: {} over flow badly, broker max offset: {}, consumer: {},requestHeader.getQueueOffset(), getMessageResult.getMaxOffset(), channel.remoteAddress());break;case OFFSET_OVERFLOW_ONE:response.setCode(ResponseCode.PULL_NOT_FOUND);break;case OFFSET_TOO_SMALL:response.setCode(ResponseCode.PULL_OFFSET_MOVED);log.info(the request offset too small. group{}, topic{}, requestOffset{}, brokerMinOffset{}, clientIp{},requestHeader.getConsumerGroup(), requestHeader.getTopic(), requestHeader.getQueueOffset(),getMessageResult.getMinOffset(), channel.remoteAddress());break;default:assert false;break;}if (this.hasConsumeMessageHook()) {ConsumeMessageContext context new ConsumeMessageContext();context.setConsumerGroup(requestHeader.getConsumerGroup());context.setTopic(requestHeader.getTopic());context.setQueueId(requestHeader.getQueueId());String owner request.getExtFields().get(BrokerStatsManager.COMMERCIAL_OWNER);switch (response.getCode()) {case ResponseCode.SUCCESS:int commercialBaseCount brokerController.getBrokerConfig().getCommercialBaseCount();int incValue getMessageResult.getMsgCount4Commercial() * commercialBaseCount;context.setCommercialRcvStats(BrokerStatsManager.StatsType.RCV_SUCCESS);context.setCommercialRcvTimes(incValue);context.setCommercialRcvSize(getMessageResult.getBufferTotalSize());context.setCommercialOwner(owner);break;case ResponseCode.PULL_NOT_FOUND:if (!brokerAllowSuspend) {context.setCommercialRcvStats(BrokerStatsManager.StatsType.RCV_EPOLLS);context.setCommercialRcvTimes(1);context.setCommercialOwner(owner);}break;case ResponseCode.PULL_RETRY_IMMEDIATELY:case ResponseCode.PULL_OFFSET_MOVED:context.setCommercialRcvStats(BrokerStatsManager.StatsType.RCV_EPOLLS);context.setCommercialRcvTimes(1);context.setCommercialOwner(owner);break;default:assert false;break;}this.executeConsumeMessageHookBefore(context);}switch (response.getCode()) {case ResponseCode.SUCCESS:this.brokerController.getBrokerStatsManager().incGroupGetNums(requestHeader.getConsumerGroup(), requestHeader.getTopic(),getMessageResult.getMessageCount());this.brokerController.getBrokerStatsManager().incGroupGetSize(requestHeader.getConsumerGroup(), requestHeader.getTopic(),getMessageResult.getBufferTotalSize());this.brokerController.getBrokerStatsManager().incBrokerGetNums(getMessageResult.getMessageCount());// 读取消息if (this.brokerController.getBrokerConfig().isTransferMsgByHeap()) {final long beginTimeMills this.brokerController.getMessageStore().now();final byte[] r this.readGetMessageResult(getMessageResult, requestHeader.getConsumerGroup(), requestHeader.getTopic(), requestHeader.getQueueId());this.brokerController.getBrokerStatsManager().incGroupGetLatency(requestHeader.getConsumerGroup(),requestHeader.getTopic(), requestHeader.getQueueId(),(int) (this.brokerController.getMessageStore().now() - beginTimeMills));response.setBody(r);} else {try {FileRegion fileRegion new ManyMessageTransfer(response.encodeHeader(getMessageResult.getBufferTotalSize()), getMessageResult);channel.writeAndFlush(fileRegion).addListener(new ChannelFutureListener() {Overridepublic void operationComplete(ChannelFuture future) throws Exception {getMessageResult.release();if (!future.isSuccess()) {log.error(transfer many message by pagecache failed, {}, channel.remoteAddress(), future.cause());}}});} catch (Throwable e) {log.error(transfer many message by pagecache exception, e);getMessageResult.release();}response null;}break;case ResponseCode.PULL_NOT_FOUND:// 消息未查询到 broker允许挂起请求 请求允许挂起if (brokerAllowSuspend hasSuspendFlag) {long pollingTimeMills suspendTimeoutMillisLong;if (!this.brokerController.getBrokerConfig().isLongPollingEnable()) {pollingTimeMills this.brokerController.getBrokerConfig().getShortPollingTimeMills();}String topic requestHeader.getTopic();long offset requestHeader.getQueueOffset();int queueId requestHeader.getQueueId();PullRequest pullRequest new PullRequest(request, channel, pollingTimeMills,this.brokerController.getMessageStore().now(), offset, subscriptionData, messageFilter);this.brokerController.getPullRequestHoldService().suspendPullRequest(topic, queueId, pullRequest);response null;break;}case ResponseCode.PULL_RETRY_IMMEDIATELY:break;case ResponseCode.PULL_OFFSET_MOVED:if (this.brokerController.getMessageStoreConfig().getBrokerRole() ! BrokerRole.SLAVE|| this.brokerController.getMessageStoreConfig().isOffsetCheckInSlave()) {MessageQueue mq new MessageQueue();mq.setTopic(requestHeader.getTopic());mq.setQueueId(requestHeader.getQueueId());mq.setBrokerName(this.brokerController.getBrokerConfig().getBrokerName());OffsetMovedEvent event new OffsetMovedEvent();event.setConsumerGroup(requestHeader.getConsumerGroup());event.setMessageQueue(mq);event.setOffsetRequest(requestHeader.getQueueOffset());event.setOffsetNew(getMessageResult.getNextBeginOffset());this.generateOffsetMovedEvent(event);log.warn(PULL_OFFSET_MOVED:correction offset. topic{}, groupId{}, requestOffset{}, newOffset{}, suggestBrokerId{},requestHeader.getTopic(), requestHeader.getConsumerGroup(), event.getOffsetRequest(), event.getOffsetNew(),responseHeader.getSuggestWhichBrokerId());} else {responseHeader.setSuggestWhichBrokerId(subscriptionGroupConfig.getBrokerId());response.setCode(ResponseCode.PULL_RETRY_IMMEDIATELY);log.warn(PULL_OFFSET_MOVED:none correction. topic{}, groupId{}, requestOffset{}, suggestBrokerId{},requestHeader.getTopic(), requestHeader.getConsumerGroup(), requestHeader.getQueueOffset(),responseHeader.getSuggestWhichBrokerId());}break;default:assert false;}} else {response.setCode(ResponseCode.SYSTEM_ERROR);response.setRemark(store getMessage return null);}// 请求要求持久化进度 broker非主进行持久化进度。boolean storeOffsetEnable brokerAllowSuspend;storeOffsetEnable storeOffsetEnable hasCommitOffsetFlag;storeOffsetEnable storeOffsetEnable this.brokerController.getMessageStoreConfig().getBrokerRole() ! BrokerRole.SLAVE;if (storeOffsetEnable) {this.brokerController.getConsumerOffsetManager().commitOffset(RemotingHelper.parseChannelRemoteAddr(channel),requestHeader.getConsumerGroup(), requestHeader.getTopic(), requestHeader.getQueueId(), requestHeader.getCommitOffset());}return response;
}第160行调用了MessageStore#getMessage(…)获取消息
public GetMessageResult getMessage(final String group, final String topic, final int queueId, final long offset,final int maxMsgNums,final MessageFilter messageFilter) {if (this.shutdown) {log.warn(message store has shutdown, so getMessage is forbidden);return null;}if (!this.runningFlags.isReadable()) {log.warn(message store is not readable, so getMessage is forbidden this.runningFlags.getFlagBits());return null;}long beginTime this.getSystemClock().now();GetMessageStatus status GetMessageStatus.NO_MESSAGE_IN_QUEUE;long nextBeginOffset offset;long minOffset 0;long maxOffset 0;GetMessageResult getResult new GetMessageResult();final long maxOffsetPy this.commitLog.getMaxOffset();// 获取消费队列ConsumeQueue consumeQueue findConsumeQueue(topic, queueId);if (consumeQueue ! null) {// 消费队列 最小队列编号minOffset consumeQueue.getMinOffsetInQueue();// 消费队列 最大队列编号maxOffset consumeQueue.getMaxOffsetInQueue();// 判断 队列位置(offset)if (maxOffset 0) {// 判断 队列位置(offset)status GetMessageStatus.NO_MESSAGE_IN_QUEUE;nextBeginOffset nextOffsetCorrection(offset, 0);} else if (offset minOffset) {// 查询offset 太小status GetMessageStatus.OFFSET_TOO_SMALL;nextBeginOffset nextOffsetCorrection(offset, minOffset);} else if (offset maxOffset) {// 查询offset 超过 消费队列 一个位置status GetMessageStatus.OFFSET_OVERFLOW_ONE;nextBeginOffset nextOffsetCorrection(offset, offset);} else if (offset maxOffset) {// 查询offset 超过 消费队列 一个位置status GetMessageStatus.OFFSET_OVERFLOW_BADLY;if (0 minOffset) {nextBeginOffset nextOffsetCorrection(offset, minOffset);} else {nextBeginOffset nextOffsetCorrection(offset, maxOffset);}} else {// 获得 映射Buffer结果(MappedFile)SelectMappedBufferResult bufferConsumeQueue consumeQueue.getIndexBuffer(offset);if (bufferConsumeQueue ! null) {try {status GetMessageStatus.NO_MATCHED_MESSAGE;long nextPhyFileStartOffset Long.MIN_VALUE;long maxPhyOffsetPulling 0;int i 0;final int maxFilterMessageCount Math.max(16000, maxMsgNums * ConsumeQueue.CQ_STORE_UNIT_SIZE);final boolean diskFallRecorded this.messageStoreConfig.isDiskFallRecorded();ConsumeQueueExt.CqExtUnit cqExtUnit new ConsumeQueueExt.CqExtUnit();for (; i bufferConsumeQueue.getSize() i maxFilterMessageCount; i ConsumeQueue.CQ_STORE_UNIT_SIZE) {long offsetPy bufferConsumeQueue.getByteBuffer().getLong();int sizePy bufferConsumeQueue.getByteBuffer().getInt();long tagsCode bufferConsumeQueue.getByteBuffer().getLong();maxPhyOffsetPulling offsetPy;if (nextPhyFileStartOffset ! Long.MIN_VALUE) {if (offsetPy nextPhyFileStartOffset)continue;}boolean isInDisk checkInDiskByCommitOffset(offsetPy, maxOffsetPy);// 是否已经获得足够消息if (this.isTheBatchFull(sizePy, maxMsgNums, getResult.getBufferTotalSize(), getResult.getMessageCount(),isInDisk)) {break;}boolean extRet false, isTagsCodeLegal true;if (consumeQueue.isExtAddr(tagsCode)) {extRet consumeQueue.getExt(tagsCode, cqExtUnit);if (extRet) {tagsCode cqExtUnit.getTagsCode();} else {// cant find ext content.Client will filter messages by tag also.log.error([BUG] cant find consume queue extend file content!addr{}, offsetPy{}, sizePy{}, topic{}, group{},tagsCode, offsetPy, sizePy, topic, group);isTagsCodeLegal false;}}// 判断消息是否符合条件if (messageFilter ! null !messageFilter.isMatchedByConsumeQueue(isTagsCodeLegal ? tagsCode : null, extRet ? cqExtUnit : null)) {if (getResult.getBufferTotalSize() 0) {status GetMessageStatus.NO_MATCHED_MESSAGE;}continue;}SelectMappedBufferResult selectResult this.commitLog.getMessage(offsetPy, sizePy);if (null selectResult) {if (getResult.getBufferTotalSize() 0) {status GetMessageStatus.MESSAGE_WAS_REMOVING;}nextPhyFileStartOffset this.commitLog.rollNextFile(offsetPy);continue;}if (messageFilter ! null !messageFilter.isMatchedByCommitLog(selectResult.getByteBuffer().slice(), null)) {if (getResult.getBufferTotalSize() 0) {status GetMessageStatus.NO_MATCHED_MESSAGE;}// release...selectResult.release();continue;}this.storeStatsService.getGetMessageTransferedMsgCount().incrementAndGet();getResult.addMessage(selectResult);status GetMessageStatus.FOUND;nextPhyFileStartOffset Long.MIN_VALUE;}if (diskFallRecorded) {long fallBehind maxOffsetPy - maxPhyOffsetPulling;brokerStatsManager.recordDiskFallBehindSize(group, topic, queueId, fallBehind);}nextBeginOffset offset (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);long diff maxOffsetPy - maxPhyOffsetPulling;long memory (long) (StoreUtil.TOTAL_PHYSICAL_MEMORY_SIZE* (this.messageStoreConfig.getAccessMessageInMemoryMaxRatio() / 100.0));getResult.setSuggestPullingFromSlave(diff memory);} finally {bufferConsumeQueue.release();}} else {status GetMessageStatus.OFFSET_FOUND_NULL;nextBeginOffset nextOffsetCorrection(offset, consumeQueue.rollNextFile(offset));log.warn(consumer request topic: topic offset: offset minOffset: minOffset maxOffset: maxOffset , but access logic queue failed.);}}} else {status GetMessageStatus.NO_MATCHED_LOGIC_QUEUE;nextBeginOffset nextOffsetCorrection(offset, 0);}if (GetMessageStatus.FOUND status) {this.storeStatsService.getGetMessageTimesTotalFound().incrementAndGet();} else {this.storeStatsService.getGetMessageTimesTotalMiss().incrementAndGet();}long eclipseTime this.getSystemClock().now() - beginTime;this.storeStatsService.setGetMessageEntireTimeMax(eclipseTime);getResult.setStatus(status);getResult.setNextBeginOffset(nextBeginOffset);getResult.setMaxOffset(maxOffset);getResult.setMinOffset(minOffset);return getResult;
}上面的方法进行了非常多的校验然后根据消息分组(group) 主题(Topic) 队列编号(queueId) 队列位置(offset) 订阅信息(subscriptionData) 获取 指定条数(maxMsgNums) 消息(Message)。
其中判断消息是否符合条件调用了方法DefaultMessageFilter#isMessageMatched(…)
public class DefaultMessageFilter implements MessageFilter {private SubscriptionData subscriptionData;public DefaultMessageFilter(final SubscriptionData subscriptionData) {this.subscriptionData subscriptionData;}Overridepublic boolean isMatchedByConsumeQueue(Long tagsCode, ConsumeQueueExt.CqExtUnit cqExtUnit) {// 消息tagsCode 空if (null tagsCode || null subscriptionData) {return true;}// 订阅数据 空if (subscriptionData.isClassFilterMode()) {return true;}// 订阅表达式 全匹配// 订阅数据code数组 是否包含 消息tagsCodereturn subscriptionData.getSubString().equals(SubscriptionData.SUB_ALL)|| subscriptionData.getCodeSet().contains(tagsCode.intValue());}Overridepublic boolean isMatchedByCommitLog(ByteBuffer msgBuffer, MapString, String properties) {return true;}
}3.3 拉取失败处理
当拉取消息请求获取不到消息时会将请求挂起。处理挂起消息的服务类是PullRequestHoldService
public class PullRequestHoldService extends ServiceThread {private static final InternalLogger log InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);private static final String TOPIC_QUEUEID_SEPARATOR ;private final BrokerController brokerController;private final SystemClock systemClock new SystemClock();// 拉取消息请求集合private ConcurrentMapString/* topicqueueId */, ManyPullRequest pullRequestTable new ConcurrentHashMapString, ManyPullRequest(1024);public PullRequestHoldService(final BrokerController brokerController) {this.brokerController brokerController;}// 添加拉取消息挂起请求public void suspendPullRequest(final String topic, final int queueId, final PullRequest pullRequest) {String key this.buildKey(topic, queueId);ManyPullRequest mpr this.pullRequestTable.get(key);if (null mpr) {mpr new ManyPullRequest();ManyPullRequest prev this.pullRequestTable.putIfAbsent(key, mpr);if (prev ! null) {mpr prev;}}mpr.addPullRequest(pullRequest);}// 根据 主题 队列编号 创建唯一标识private String buildKey(final String topic, final int queueId) {StringBuilder sb new StringBuilder();sb.append(topic);sb.append(TOPIC_QUEUEID_SEPARATOR);sb.append(queueId);return sb.toString();}Overridepublic void run() {log.info({} service started, this.getServiceName());while (!this.isStopped()) {try {// 根据 长轮训 还是 短轮训 设置不同的等待时间if (this.brokerController.getBrokerConfig().isLongPollingEnable()) {this.waitForRunning(5 * 1000);} else {this.waitForRunning(this.brokerController.getBrokerConfig().getShortPollingTimeMills());}long beginLockTimestamp this.systemClock.now();// 检查挂起请求是否有需要通知的this.checkHoldRequest();long costTime this.systemClock.now() - beginLockTimestamp;if (costTime 5 * 1000) {log.info([NOTIFYME] check hold request cost {} ms., costTime);}} catch (Throwable e) {log.warn(this.getServiceName() service has exception. , e);}}log.info({} service end, this.getServiceName());}Overridepublic String getServiceName() {return PullRequestHoldService.class.getSimpleName();}//遍历挂起请求检查是否有需要通知的请求。private void checkHoldRequest() {for (String key : this.pullRequestTable.keySet()) {String[] kArray key.split(TOPIC_QUEUEID_SEPARATOR);if (2 kArray.length) {String topic kArray[0];int queueId Integer.parseInt(kArray[1]);final long offset this.brokerController.getMessageStore().getMaxOffsetInQueue(topic, queueId);try {this.notifyMessageArriving(topic, queueId, offset);} catch (Throwable e) {log.error(check hold request failed. topic{}, queueId{}, topic, queueId, e);}}}}// 检查是否有需要通知的请求public void notifyMessageArriving(final String topic, final int queueId, final long maxOffset) {notifyMessageArriving(topic, queueId, maxOffset, null, 0, null, null);}// 检查是否有需要通知的请求public void notifyMessageArriving(final String topic, final int queueId, final long maxOffset, final Long tagsCode,long msgStoreTime, byte[] filterBitMap, MapString, String properties) {String key this.buildKey(topic, queueId);ManyPullRequest mpr this.pullRequestTable.get(key);if (mpr ! null) {ListPullRequest requestList mpr.cloneListAndClear();if (requestList ! null) {// 不符合唤醒的请求数组ListPullRequest replayList new ArrayListPullRequest();for (PullRequest request : requestList) {long newestOffset maxOffset;if (newestOffset request.getPullFromThisOffset()) {newestOffset this.brokerController.getMessageStore().getMaxOffsetInQueue(topic, queueId);}// 有新的匹配消息唤醒请求即再次拉取消息。if (newestOffset request.getPullFromThisOffset()) {boolean match request.getMessageFilter().isMatchedByConsumeQueue(tagsCode,new ConsumeQueueExt.CqExtUnit(tagsCode, msgStoreTime, filterBitMap));// match by bit map, need eval again when properties is not null.if (match properties ! null) {match request.getMessageFilter().isMatchedByCommitLog(null, properties);}if (match) {try {this.brokerController.getPullMessageProcessor().executeRequestWhenWakeup(request.getClientChannel(),request.getRequestCommand());} catch (Throwable e) {log.error(execute request when wakeup failed., e);}continue;}}// 超过挂起时间唤醒请求即再次拉取消息。if (System.currentTimeMillis() (request.getSuspendTimestamp() request.getTimeoutMillis())) {try {this.brokerController.getPullMessageProcessor().executeRequestWhenWakeup(request.getClientChannel(),request.getRequestCommand());} catch (Throwable e) {log.error(execute request when wakeup failed., e);}continue;}// 不符合再次拉取的请求再次添加回去replayList.add(request);}if (!replayList.isEmpty()) {mpr.addPullRequest(replayList);}}}}
}可以看到此类为拉去消息请求挂起维护线程服务
当拉取消息请求获得不了消息时则会将请求进行挂起添加到该服务当有符合条件信息时 或 挂起超时时重新执行获取消息逻辑 4. Broker提供的更新消费进度接口
更新消费进度接口会再BrokerController中启动方法为BrokerController#initialize(…)
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {Overridepublic void run() {try {BrokerController.this.consumerOffsetManager.persist();} catch (Throwable e) {log.error(schedule persist consumerOffset error., e);}}
}, 1000 * 10, this.brokerConfig.getFlushConsumerOffsetInterval(), TimeUnit.MILLISECONDS);此方法每5s执行一次持久化逻辑。调用的ConsumerOffsetManager#persist方法。实际上是抽象类ConfigManager的persist方法。ConfigManager是抽象类ConsumerOffsetManager继承了ConfigManager
ConfigManager类源码为
public abstract class ConfigManager {private static final InternalLogger log InternalLoggerFactory.getLogger(LoggerName.COMMON_LOGGER_NAME);//编码内容public abstract String encode();// 加载文件public boolean load() {String fileName null;try {fileName this.configFilePath();String jsonString MixAll.file2String(fileName);// 如果内容不存在则加载备份文件if (null jsonString || jsonString.length() 0) {return this.loadBak();} else {this.decode(jsonString);log.info(load fileName OK);return true;}} catch (Exception e) {log.error(load fileName failed, and try to load backup file, e);return this.loadBak();}}//配置文件地址public abstract String configFilePath();//加载备份文件private boolean loadBak() {String fileName null;try {fileName this.configFilePath();String jsonString MixAll.file2String(fileName .bak);if (jsonString ! null jsonString.length() 0) {this.decode(jsonString);log.info(load fileName OK);return true;}} catch (Exception e) {log.error(load fileName Failed, e);return false;}return true;}//解码内容public abstract void decode(final String jsonString);//持久化public synchronized void persist() {String jsonString this.encode(true);if (jsonString ! null) {String fileName this.configFilePath();try {MixAll.string2File(jsonString, fileName);} catch (IOException e) {log.error(persist file fileName exception, e);}}}// 编码存储内容public abstract String encode(final boolean prettyFormat);
}可以看到ConfigManager主要进行文件读取写入操作ConsumerOffsetManager类源码为
public class ConsumerOffsetManager extends ConfigManager {private static final InternalLogger log InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);private static final String TOPIC_GROUP_SEPARATOR ;// 消费进度集合private ConcurrentMapString/* topicgroup */, ConcurrentMapInteger, Long offsetTable new ConcurrentHashMapString, ConcurrentMapInteger, Long(512);private transient BrokerController brokerController;public ConsumerOffsetManager() {}public ConsumerOffsetManager(BrokerController brokerController) {this.brokerController brokerController;}public void scanUnsubscribedTopic() {IteratorEntryString, ConcurrentMapInteger, Long it this.offsetTable.entrySet().iterator();while (it.hasNext()) {EntryString, ConcurrentMapInteger, Long next it.next();String topicAtGroup next.getKey();String[] arrays topicAtGroup.split(TOPIC_GROUP_SEPARATOR);if (arrays.length 2) {String topic arrays[0];String group arrays[1];if (null brokerController.getConsumerManager().findSubscriptionData(group, topic) this.offsetBehindMuchThanData(topic, next.getValue())) {it.remove();log.warn(remove topic offset, {}, topicAtGroup);}}}}private boolean offsetBehindMuchThanData(final String topic, ConcurrentMapInteger, Long table) {IteratorEntryInteger, Long it table.entrySet().iterator();boolean result !table.isEmpty();while (it.hasNext() result) {EntryInteger, Long next it.next();long minOffsetInStore this.brokerController.getMessageStore().getMinOffsetInQueue(topic, next.getKey());long offsetInPersist next.getValue();result offsetInPersist minOffsetInStore;}return result;}public SetString whichTopicByConsumer(final String group) {SetString topics new HashSetString();IteratorEntryString, ConcurrentMapInteger, Long it this.offsetTable.entrySet().iterator();while (it.hasNext()) {EntryString, ConcurrentMapInteger, Long next it.next();String topicAtGroup next.getKey();String[] arrays topicAtGroup.split(TOPIC_GROUP_SEPARATOR);if (arrays.length 2) {if (group.equals(arrays[1])) {topics.add(arrays[0]);}}}return topics;}public SetString whichGroupByTopic(final String topic) {SetString groups new HashSetString();IteratorEntryString, ConcurrentMapInteger, Long it this.offsetTable.entrySet().iterator();while (it.hasNext()) {EntryString, ConcurrentMapInteger, Long next it.next();String topicAtGroup next.getKey();String[] arrays topicAtGroup.split(TOPIC_GROUP_SEPARATOR);if (arrays.length 2) {if (topic.equals(arrays[0])) {groups.add(arrays[1]);}}}return groups;}// 提交消费进度public void commitOffset(final String clientHost, final String group, final String topic, final int queueId,final long offset) {// topicgroupString key topic TOPIC_GROUP_SEPARATOR group;this.commitOffset(clientHost, key, queueId, offset);}private void commitOffset(final String clientHost, final String key, final int queueId, final long offset) {ConcurrentMapInteger, Long map this.offsetTable.get(key);if (null map) {map new ConcurrentHashMapInteger, Long(32);map.put(queueId, offset);this.offsetTable.put(key, map);} else {Long storeOffset map.put(queueId, offset);if (storeOffset ! null offset storeOffset) {log.warn([NOTIFYME]update consumer offset less than store. clientHost{}, key{}, queueId{}, requestOffset{}, storeOffset{}, clientHost, key, queueId, offset, storeOffset);}}}public long queryOffset(final String group, final String topic, final int queueId) {// topicgroupString key topic TOPIC_GROUP_SEPARATOR group;ConcurrentMapInteger, Long map this.offsetTable.get(key);if (null ! map) {Long offset map.get(queueId);if (offset ! null)return offset;}return -1;}public String encode() {return this.encode(false);}Overridepublic String configFilePath() {return BrokerPathConfigHelper.getConsumerOffsetPath(this.brokerController.getMessageStoreConfig().getStorePathRootDir());}// 解码内容格式:JSONOverridepublic void decode(String jsonString) {if (jsonString ! null) {ConsumerOffsetManager obj RemotingSerializable.fromJson(jsonString, ConsumerOffsetManager.class);if (obj ! null) {this.offsetTable obj.offsetTable;}}}public String encode(final boolean prettyFormat) {return RemotingSerializable.toJson(this, prettyFormat);}public ConcurrentMapString, ConcurrentMapInteger, Long getOffsetTable() {return offsetTable;}public void setOffsetTable(ConcurrentHashMapString, ConcurrentMapInteger, Long offsetTable) {this.offsetTable offsetTable;}public MapInteger, Long queryMinOffsetInAllGroup(final String topic, final String filterGroups) {MapInteger, Long queueMinOffset new HashMapInteger, Long();SetString topicGroups this.offsetTable.keySet();if (!UtilAll.isBlank(filterGroups)) {for (String group : filterGroups.split(,)) {IteratorString it topicGroups.iterator();while (it.hasNext()) {if (group.equals(it.next().split(TOPIC_GROUP_SEPARATOR)[1])) {it.remove();}}}}for (Map.EntryString, ConcurrentMapInteger, Long offSetEntry : this.offsetTable.entrySet()) {String topicGroup offSetEntry.getKey();String[] topicGroupArr topicGroup.split(TOPIC_GROUP_SEPARATOR);if (topic.equals(topicGroupArr[0])) {for (EntryInteger, Long entry : offSetEntry.getValue().entrySet()) {long minOffset this.brokerController.getMessageStore().getMinOffsetInQueue(topic, entry.getKey());if (entry.getValue() minOffset) {Long offset queueMinOffset.get(entry.getKey());if (offset null) {queueMinOffset.put(entry.getKey(), Math.min(Long.MAX_VALUE, entry.getValue()));} else {queueMinOffset.put(entry.getKey(), Math.min(entry.getValue(), offset));}}}}}return queueMinOffset;}public MapInteger, Long queryOffset(final String group, final String topic) {// topicgroupString key topic TOPIC_GROUP_SEPARATOR group;return this.offsetTable.get(key);}public void cloneOffset(final String srcGroup, final String destGroup, final String topic) {ConcurrentMapInteger, Long offsets this.offsetTable.get(topic TOPIC_GROUP_SEPARATOR srcGroup);if (offsets ! null) {this.offsetTable.put(topic TOPIC_GROUP_SEPARATOR destGroup, new ConcurrentHashMapInteger, Long(offsets));}}}可以看出来实际上上面的类就是消费进度管理器。
5. Broker 提供发回消息接口
该接口是Consumer消费消息失败时调用的。Broker存储发回的消息下次Consumer拉取该消息能够从CommitLog和ConsumeQueue顺序读取。
大部分逻辑和 Broker 提供接收消息接口 类似此内容在第二篇《Broker消息接收》分析过。
发回消息接口的方法为SendMessageProcessor#consumerSendMsgBack(…)
private RemotingCommand consumerSendMsgBack(final ChannelHandlerContext ctx, final RemotingCommand request)throws RemotingCommandException {// 初始化响应final RemotingCommand response RemotingCommand.createResponseCommand(null);final ConsumerSendMsgBackRequestHeader requestHeader (ConsumerSendMsgBackRequestHeader)request.decodeCommandCustomHeader(ConsumerSendMsgBackRequestHeader.class);String namespace NamespaceUtil.getNamespaceFromResource(requestHeader.getGroup());if (this.hasConsumeMessageHook() !UtilAll.isBlank(requestHeader.getOriginMsgId())) {ConsumeMessageContext context new ConsumeMessageContext();context.setNamespace(namespace);context.setConsumerGroup(requestHeader.getGroup());context.setTopic(requestHeader.getOriginTopic());context.setCommercialRcvStats(BrokerStatsManager.StatsType.SEND_BACK);context.setCommercialRcvTimes(1);context.setCommercialOwner(request.getExtFields().get(BrokerStatsManager.COMMERCIAL_OWNER));this.executeConsumeMessageHookAfter(context);}// 判断消费分组是否存在SubscriptionGroupConfig subscriptionGroupConfig this.brokerController.getSubscriptionGroupManager().findSubscriptionGroupConfig(requestHeader.getGroup());if (null subscriptionGroupConfig) {response.setCode(ResponseCode.SUBSCRIPTION_GROUP_NOT_EXIST);response.setRemark(subscription group not exist, requestHeader.getGroup() FAQUrl.suggestTodo(FAQUrl.SUBSCRIPTION_GROUP_NOT_EXIST));return response;}// 检查 broker 是否有写入权限if (!PermName.isWriteable(this.brokerController.getBrokerConfig().getBrokerPermission())) {response.setCode(ResponseCode.NO_PERMISSION);response.setRemark(the broker[ this.brokerController.getBrokerConfig().getBrokerIP1() ] sending message is forbidden);return response;}// 检查 重试队列数 是否大于0if (subscriptionGroupConfig.getRetryQueueNums() 0) {response.setCode(ResponseCode.SUCCESS);response.setRemark(null);return response;}// 计算retry TopicString newTopic MixAll.getRetryTopic(requestHeader.getGroup());// 计算队列编号int queueIdInt Math.abs(this.random.nextInt() % 99999999) % subscriptionGroupConfig.getRetryQueueNums();// 计算sysFlagint topicSysFlag 0;if (requestHeader.isUnitMode()) {topicSysFlag TopicSysFlag.buildSysFlag(false, true);}// 获取topicConfig。如果获取不到则进行创建TopicConfig topicConfig this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(newTopic,subscriptionGroupConfig.getRetryQueueNums(),PermName.PERM_WRITE | PermName.PERM_READ, topicSysFlag);if (null topicConfig) {response.setCode(ResponseCode.SYSTEM_ERROR);response.setRemark(topic[ newTopic ] not exist);return response;}if (!PermName.isWriteable(topicConfig.getPerm())) {response.setCode(ResponseCode.NO_PERMISSION);response.setRemark(String.format(the topic[%s] sending message is forbidden, newTopic));return response;}// 查询消息。若不存在返回异常错误。MessageExt msgExt this.brokerController.getMessageStore().lookMessageByOffset(requestHeader.getOffset());if (null msgExt) {response.setCode(ResponseCode.SYSTEM_ERROR);response.setRemark(look message by offset failed, requestHeader.getOffset());return response;}// 设置retryTopic到拓展属性final String retryTopic msgExt.getProperty(MessageConst.PROPERTY_RETRY_TOPIC);if (null retryTopic) {MessageAccessor.putProperty(msgExt, MessageConst.PROPERTY_RETRY_TOPIC, msgExt.getTopic());}// 设置消息不等待存储完成msgExt.setWaitStoreMsgOK(false);int delayLevel requestHeader.getDelayLevel();int maxReconsumeTimes subscriptionGroupConfig.getRetryMaxTimes();if (request.getVersion() MQVersion.Version.V3_4_9.ordinal()) {maxReconsumeTimes requestHeader.getMaxReconsumeTimes();}if (msgExt.getReconsumeTimes() maxReconsumeTimes|| delayLevel 0) {// 如果超过最大消费次数则topic修改成%DLQ% 分组名即加入 死信队列(Dead Letter Queue)newTopic MixAll.getDLQTopic(requestHeader.getGroup());queueIdInt Math.abs(this.random.nextInt() % 99999999) % DLQ_NUMS_PER_GROUP;topicConfig this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(newTopic,DLQ_NUMS_PER_GROUP,PermName.PERM_WRITE, 0);if (null topicConfig) {response.setCode(ResponseCode.SYSTEM_ERROR);response.setRemark(topic[ newTopic ] not exist);return response;}} else {if (0 delayLevel) {delayLevel 3 msgExt.getReconsumeTimes();}msgExt.setDelayTimeLevel(delayLevel);}MessageExtBrokerInner msgInner new MessageExtBrokerInner();msgInner.setTopic(newTopic);msgInner.setBody(msgExt.getBody());msgInner.setFlag(msgExt.getFlag());MessageAccessor.setProperties(msgInner, msgExt.getProperties());msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgExt.getProperties()));msgInner.setTagsCode(MessageExtBrokerInner.tagsString2tagsCode(null, msgExt.getTags()));msgInner.setQueueId(queueIdInt);msgInner.setSysFlag(msgExt.getSysFlag());msgInner.setBornTimestamp(msgExt.getBornTimestamp());msgInner.setBornHost(msgExt.getBornHost());msgInner.setStoreHost(this.getStoreHost());msgInner.setReconsumeTimes(msgExt.getReconsumeTimes() 1);String originMsgId MessageAccessor.getOriginMessageId(msgExt);MessageAccessor.setOriginMessageId(msgInner, UtilAll.isBlank(originMsgId) ? msgExt.getMsgId() : originMsgId);PutMessageResult putMessageResult this.brokerController.getMessageStore().putMessage(msgInner);if (putMessageResult ! null) {switch (putMessageResult.getPutMessageStatus()) {case PUT_OK:String backTopic msgExt.getTopic();String correctTopic msgExt.getProperty(MessageConst.PROPERTY_RETRY_TOPIC);if (correctTopic ! null) {backTopic correctTopic;}this.brokerController.getBrokerStatsManager().incSendBackNums(requestHeader.getGroup(), backTopic);response.setCode(ResponseCode.SUCCESS);response.setRemark(null);return response;default:break;}response.setCode(ResponseCode.SYSTEM_ERROR);response.setRemark(putMessageResult.getPutMessageStatus().name());return response;}response.setCode(ResponseCode.SYSTEM_ERROR);response.setRemark(putMessageResult is null);return response;
}6. 小结
本篇内容源码偏长内容偏多有很多细节选择性忽略了。本篇主要将的Broker对于消息拉取所提供的功能的接口
Message重放功能Broker接收到消息会存储到CommitLog中持久化到文件Message重放就是将文件中的消息存放在ConsumeQueue中ConsumeQueue中存储了消息在CommitLog中的位置信息。Broker提供了拉取消息的接口该接口为Consumer调用的主要完成消息的拉取Broker提供更新消费进度的接口改接口也是Consumer调用用于更新消费进度Broker提供消息发回接口该接口是Consumer在消费消息失败时调用以便于消息能够再次被消费其中有一点消息消费超过最大次数会放入死信队列中
以上就是消息拉取过程中Broker提供的能力。下一篇继续分析消息拉取与消费过程中Consumer提供的能力。