网站建设所需要软件,wordpress写文章报错,广告投放面试,wordpress 详情页这里是weihubeats,觉得文章不错可以关注公众号小奏技术#xff0c;文章首发。拒绝营销号#xff0c;拒绝标题党 RocketMQ版本
5.1.0
普通消息
消息重试的的实现分并普通消息和顺序消息。两者的重试机制大同小异。我们这里先看看不同消息 这里是官网定义的消息重试次数以及… 这里是weihubeats,觉得文章不错可以关注公众号小奏技术文章首发。拒绝营销号拒绝标题党 RocketMQ版本
5.1.0
普通消息
消息重试的的实现分并普通消息和顺序消息。两者的重试机制大同小异。我们这里先看看不同消息 这里是官网定义的消息重试次数以及时间间隔。
有没有发现一个问题这个重试消息的时间间隔和延时消息的时间间隔这么类似 图片来源官网 待会我们进行源码分析就知道了。这里先卖个关子
client源码分析
ConsumeMessageConcurrentlyService
普通消息消费以及重试的逻辑在ConsumeMessageConcurrentlyService中如果是顺序消息则是ConsumeMessageOrderlyService
具体的逻辑是org.apache.rocketmq.client.impl.consumer.ConsumeMessageConcurrentlyService.ConsumeRequest#run
由于今天我们的重点关注点是消息如何重试的。所以我们在消息消费的一些其他细节会略过
首先消息消费的状态由ConsumeConcurrentlyStatus 这个枚举控制 注释很清晰
CONSUME_SUCCESS 成功消费RECONSUME_LATER 消费时间等待重试 可以看到消息消费主要是在
status listener.consumeMessage(Collections.unmodifiableList(msgs), context);这里的listener就是实现了MessageListener接口的类。也就是我们在编写consumer需要传入的一个对象
比如像这种 DefaultMQPushConsumer consumer new DefaultMQPushConsumer(CONSUMER_GROUP, true);consumer.setNamesrvAddr(DEFAULT_NAMESRVADDR);
// consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);consumer.subscribe(TOPIC, *);consumer.registerMessageListener((MessageListenerConcurrently) (msg, context) - {System.out.printf(%s Receive New Messages: %s %n, Thread.currentThread().getName(), msg);return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;});consumer.start();System.out.printf(Consumer Started.%n);需要注意的是虽然消费状态只有成功和失败。但是消费结果却是由ConsumeReturnType这个枚举类定义的
SUCCESS 成功TIME_OUT 超时EXCEPTION 异常RETURNNULL 消费结果返回nullFAILED 消费结果返回失败
可以看到这里把消费成功还是失败再次作了细分 if (null status) {if (hasException) {returnType ConsumeReturnType.EXCEPTION;} else {returnType ConsumeReturnType.RETURNNULL;}} else if (consumeRT defaultMQPushConsumer.getConsumeTimeout() * 60 * 1000) {returnType ConsumeReturnType.TIME_OUT;} else if (ConsumeConcurrentlyStatus.RECONSUME_LATER status) {returnType ConsumeReturnType.FAILED;} else if (ConsumeConcurrentlyStatus.CONSUME_SUCCESS status) {returnType ConsumeReturnType.SUCCESS;}我们继续回归主线看看如果消费状态为RECONSUME_LATER会作什么处理 if (!processQueue.isDropped()) {ConsumeMessageConcurrentlyService.this.processConsumeResult(status, context, this);} else {log.warn(processQueue is dropped without process consume result. messageQueue{}, msgs{}, messageQueue, msgs);}这里可以看到processConsumeResult方法对消息结果进行了处理。我们来看看这个方法
processConsumeResult switch (this.defaultMQPushConsumer.getMessageModel()) {case BROADCASTING:for (int i ackIndex 1; i consumeRequest.getMsgs().size(); i) {MessageExt msg consumeRequest.getMsgs().get(i);log.warn(BROADCASTING, the message consume failed, drop it, {}, msg.toString());}break;case CLUSTERING:ListMessageExt msgBackFailed new ArrayList(consumeRequest.getMsgs().size());for (int i ackIndex 1; i consumeRequest.getMsgs().size(); i) {MessageExt msg consumeRequest.getMsgs().get(i);// Maybe message is expired and cleaned, just ignore it.if (!consumeRequest.getProcessQueue().containsMessage(msg)) {log.info(Message is not found in its process queue; skip send-back-procedure, topic{}, brokerName{}, queueId{}, queueOffset{}, msg.getTopic(), msg.getBrokerName(),msg.getQueueId(), msg.getQueueOffset());continue;}boolean result this.sendMessageBack(msg, context);if (!result) {msg.setReconsumeTimes(msg.getReconsumeTimes() 1);msgBackFailed.add(msg);}}if (!msgBackFailed.isEmpty()) {consumeRequest.getMsgs().removeAll(msgBackFailed);this.submitConsumeRequestLater(msgBackFailed, consumeRequest.getProcessQueue(), consumeRequest.getMessageQueue());}break;default:break;}
这里可以看到如果是BROADCASTING(广播消息)则只是打印了warnlog什么处理也不会干
我们来看看集群消息
注意这里的循环条件
for (int i ackIndex 1; i consumeRequest.getMsgs().size(); i)如果所有消息消费成功则 ackIndex consumeRequest.getMsgs().不会有消息进行下面的逻辑处理。如果有消息消费失败才会进行下面的处理
看看下面的处理逻辑 boolean result this.sendMessageBack(msg, context);if (!result) {msg.setReconsumeTimes(msg.getReconsumeTimes() 1);msgBackFailed.add(msg);}发送消费失败消息给broker如果消息发送给broker失败则将消息丢到msgBackFailed。然后再client自己进行消息重新消费重试次数reconsumeTimes 1
我们来看看sendMessageBack的处理逻辑
public boolean sendMessageBack(final MessageExt msg, final ConsumeConcurrentlyContext context) {int delayLevel context.getDelayLevelWhenNextConsume();// Wrap topic with namespace before sending back message.msg.setTopic(this.defaultMQPushConsumer.withNamespace(msg.getTopic()));try {this.defaultMQPushConsumerImpl.sendMessageBack(msg, delayLevel, this.defaultMQPushConsumer.queueWithNamespace(context.getMessageQueue()));return true;} catch (Exception e) {log.error(sendMessageBack exception, group: this.consumerGroup msg: msg, e);}return false;}这里有实际delayLevel延时级别一直是0。
实际发送消息到broker是
this.defaultMQPushConsumerImpl.sendMessageBack(msg, delayLevel, this.defaultMQPushConsumer.queueWithNamespace(context.getMessageQueue()));我们看看sendMessageBack的具体逻辑
private void sendMessageBack(MessageExt msg, int delayLevel, final String brokerName, final MessageQueue mq)throws RemotingException, MQBrokerException, InterruptedException, MQClientException {boolean needRetry true;try {if (brokerName ! null brokerName.startsWith(MixAll.LOGICAL_QUEUE_MOCK_BROKER_PREFIX)|| mq ! null mq.getBrokerName().startsWith(MixAll.LOGICAL_QUEUE_MOCK_BROKER_PREFIX)) {needRetry false;sendMessageBackAsNormalMessage(msg);} else {String brokerAddr (null ! brokerName) ? this.mQClientFactory.findBrokerAddressInPublish(brokerName): RemotingHelper.parseSocketAddressAddr(msg.getStoreHost());this.mQClientFactory.getMQClientAPIImpl().consumerSendMessageBack(brokerAddr, brokerName, msg,this.defaultMQPushConsumer.getConsumerGroup(), delayLevel, 5000, getMaxReconsumeTimes());}} catch (Throwable t) {log.error(Failed to send message back, consumerGroup{}, brokerName{}, mq{}, message{},this.defaultMQPushConsumer.getConsumerGroup(), brokerName, mq, msg, t);if (needRetry) {sendMessageBackAsNormalMessage(msg);}} finally {msg.setTopic(NamespaceUtil.withoutNamespace(msg.getTopic(), this.defaultMQPushConsumer.getNamespace()));}}这里实际走的消息发送逻辑consumerSendMessageBack。
consumerSendMessageBack就是简单的消息发送没有复杂的逻辑 我们通过RequestCode.CONSUMER_SEND_MSG_BACK 找到broker的处理逻辑
broker源码分析
处理RequestCode.CONSUMER_SEND_MSG_BACK请求的主要是SendMessageProcessor 我们进入到 org.apache.rocketmq.broker.processor.AbstractSendMessageProcessor#consumerSendMsgBack方法看看
这里就有延时消息的处理。我们在上面一直说过延时消息的等级一直是0.我们看看是如何处理的 如果重试次数没有超过最大重试次数。并且延时消息等级为0.
则延时消息等级为 重试次数3
这里就清楚知道了为什么消息重试的时间和延时消息是如何相似了吧。
没错 消息重试就是用延时消息实现的 哈哈
然后最大重试次数在哪获取的呢答案就是SubscriptionGroupConfig
也就是订阅信息的元数据 元数据再broker的文件就是subscriptionGroup.json
格式就是如下 GID_xiaozoujishu:{brokerId:0,consumeBroadcastEnable:true,consumeEnable:true,consumeFromMinEnable:true,consumeMessageOrderly:false,consumeTimeoutMinute:15,groupName:GID_xiaozoujishu,groupRetryPolicy:{type:CUSTOMIZED},groupSysFlag:0,notifyConsumerIdsChangedEnable:true,retryMaxTimes:16,retryQueueNums:1,whichBrokerWhenConsumeSlowly:1}这里可以看到一个细节就是重试策略是CUSTOMIZED(CustomizedRetryPolicy) 也就是我们对应的重试时间
还有一个策略就是ExponentialRetryPolicy也就是自定义重试次数
不过最大也就只能指定32次 还有一个细节就是重试次数在哪增加的实际也是在org.apache.rocketmq.broker.processor.AbstractSendMessageProcessor#consumerSendMsgBack这个方法 总结
普通消息重试默认是16次顺序消息默认是Integer.MAX_VALUE消息消费失败会以延时消息的方式投递到broker超过延时消息的最大值则以2小时为重试间隔。现在提供了不同的重试策略默认是CustomizedRetryPolicy
如果broker投递失败则会在本地再次消费该消息.重试次数的元数据存储在subscriptionGroup.json.
重试次数的增加有两种
发送到broker后在broker进行reconsumeTimes1(broker重试)如果发送broker失败则在本地消费进行reconsumeTimes1(client重试)