当前位置: 首页 > news >正文

网站空间域名购买网站托管服务提供商

网站空间域名购买,网站托管服务提供商,wordpress末班,1核做网站这里是weihubeats,觉得文章不错可以关注公众号小奏技术#xff0c;文章首发。拒绝营销号#xff0c;拒绝标题党 RocketMQ版本 5.1.0 普通消息 消息重试的的实现分并普通消息和顺序消息。两者的重试机制大同小异。我们这里先看看不同消息 这里是官网定义的消息重试次数以及… 这里是weihubeats,觉得文章不错可以关注公众号小奏技术文章首发。拒绝营销号拒绝标题党 RocketMQ版本 5.1.0 普通消息 消息重试的的实现分并普通消息和顺序消息。两者的重试机制大同小异。我们这里先看看不同消息 这里是官网定义的消息重试次数以及时间间隔。 有没有发现一个问题这个重试消息的时间间隔和延时消息的时间间隔这么类似 图片来源官网 待会我们进行源码分析就知道了。这里先卖个关子 client源码分析 ConsumeMessageConcurrentlyService 普通消息消费以及重试的逻辑在ConsumeMessageConcurrentlyService中如果是顺序消息则是ConsumeMessageOrderlyService 具体的逻辑是org.apache.rocketmq.client.impl.consumer.ConsumeMessageConcurrentlyService.ConsumeRequest#run 由于今天我们的重点关注点是消息如何重试的。所以我们在消息消费的一些其他细节会略过 首先消息消费的状态由ConsumeConcurrentlyStatus 这个枚举控制 注释很清晰 CONSUME_SUCCESS 成功消费RECONSUME_LATER 消费时间等待重试 可以看到消息消费主要是在 status listener.consumeMessage(Collections.unmodifiableList(msgs), context);这里的listener就是实现了MessageListener接口的类。也就是我们在编写consumer需要传入的一个对象 比如像这种 DefaultMQPushConsumer consumer new DefaultMQPushConsumer(CONSUMER_GROUP, true);consumer.setNamesrvAddr(DEFAULT_NAMESRVADDR); // consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);consumer.subscribe(TOPIC, *);consumer.registerMessageListener((MessageListenerConcurrently) (msg, context) - {System.out.printf(%s Receive New Messages: %s %n, Thread.currentThread().getName(), msg);return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;});consumer.start();System.out.printf(Consumer Started.%n);需要注意的是虽然消费状态只有成功和失败。但是消费结果却是由ConsumeReturnType这个枚举类定义的 SUCCESS 成功TIME_OUT 超时EXCEPTION 异常RETURNNULL 消费结果返回nullFAILED 消费结果返回失败 可以看到这里把消费成功还是失败再次作了细分 if (null status) {if (hasException) {returnType ConsumeReturnType.EXCEPTION;} else {returnType ConsumeReturnType.RETURNNULL;}} else if (consumeRT defaultMQPushConsumer.getConsumeTimeout() * 60 * 1000) {returnType ConsumeReturnType.TIME_OUT;} else if (ConsumeConcurrentlyStatus.RECONSUME_LATER status) {returnType ConsumeReturnType.FAILED;} else if (ConsumeConcurrentlyStatus.CONSUME_SUCCESS status) {returnType ConsumeReturnType.SUCCESS;}我们继续回归主线看看如果消费状态为RECONSUME_LATER会作什么处理 if (!processQueue.isDropped()) {ConsumeMessageConcurrentlyService.this.processConsumeResult(status, context, this);} else {log.warn(processQueue is dropped without process consume result. messageQueue{}, msgs{}, messageQueue, msgs);}这里可以看到processConsumeResult方法对消息结果进行了处理。我们来看看这个方法 processConsumeResult switch (this.defaultMQPushConsumer.getMessageModel()) {case BROADCASTING:for (int i ackIndex 1; i consumeRequest.getMsgs().size(); i) {MessageExt msg consumeRequest.getMsgs().get(i);log.warn(BROADCASTING, the message consume failed, drop it, {}, msg.toString());}break;case CLUSTERING:ListMessageExt msgBackFailed new ArrayList(consumeRequest.getMsgs().size());for (int i ackIndex 1; i consumeRequest.getMsgs().size(); i) {MessageExt msg consumeRequest.getMsgs().get(i);// Maybe message is expired and cleaned, just ignore it.if (!consumeRequest.getProcessQueue().containsMessage(msg)) {log.info(Message is not found in its process queue; skip send-back-procedure, topic{}, brokerName{}, queueId{}, queueOffset{}, msg.getTopic(), msg.getBrokerName(),msg.getQueueId(), msg.getQueueOffset());continue;}boolean result this.sendMessageBack(msg, context);if (!result) {msg.setReconsumeTimes(msg.getReconsumeTimes() 1);msgBackFailed.add(msg);}}if (!msgBackFailed.isEmpty()) {consumeRequest.getMsgs().removeAll(msgBackFailed);this.submitConsumeRequestLater(msgBackFailed, consumeRequest.getProcessQueue(), consumeRequest.getMessageQueue());}break;default:break;} 这里可以看到如果是BROADCASTING(广播消息)则只是打印了warnlog什么处理也不会干 我们来看看集群消息 注意这里的循环条件 for (int i ackIndex 1; i consumeRequest.getMsgs().size(); i)如果所有消息消费成功则 ackIndex consumeRequest.getMsgs().不会有消息进行下面的逻辑处理。如果有消息消费失败才会进行下面的处理 看看下面的处理逻辑 boolean result this.sendMessageBack(msg, context);if (!result) {msg.setReconsumeTimes(msg.getReconsumeTimes() 1);msgBackFailed.add(msg);}发送消费失败消息给broker如果消息发送给broker失败则将消息丢到msgBackFailed。然后再client自己进行消息重新消费重试次数reconsumeTimes 1 我们来看看sendMessageBack的处理逻辑 public boolean sendMessageBack(final MessageExt msg, final ConsumeConcurrentlyContext context) {int delayLevel context.getDelayLevelWhenNextConsume();// Wrap topic with namespace before sending back message.msg.setTopic(this.defaultMQPushConsumer.withNamespace(msg.getTopic()));try {this.defaultMQPushConsumerImpl.sendMessageBack(msg, delayLevel, this.defaultMQPushConsumer.queueWithNamespace(context.getMessageQueue()));return true;} catch (Exception e) {log.error(sendMessageBack exception, group: this.consumerGroup msg: msg, e);}return false;}这里有实际delayLevel延时级别一直是0。 实际发送消息到broker是 this.defaultMQPushConsumerImpl.sendMessageBack(msg, delayLevel, this.defaultMQPushConsumer.queueWithNamespace(context.getMessageQueue()));我们看看sendMessageBack的具体逻辑 private void sendMessageBack(MessageExt msg, int delayLevel, final String brokerName, final MessageQueue mq)throws RemotingException, MQBrokerException, InterruptedException, MQClientException {boolean needRetry true;try {if (brokerName ! null brokerName.startsWith(MixAll.LOGICAL_QUEUE_MOCK_BROKER_PREFIX)|| mq ! null mq.getBrokerName().startsWith(MixAll.LOGICAL_QUEUE_MOCK_BROKER_PREFIX)) {needRetry false;sendMessageBackAsNormalMessage(msg);} else {String brokerAddr (null ! brokerName) ? this.mQClientFactory.findBrokerAddressInPublish(brokerName): RemotingHelper.parseSocketAddressAddr(msg.getStoreHost());this.mQClientFactory.getMQClientAPIImpl().consumerSendMessageBack(brokerAddr, brokerName, msg,this.defaultMQPushConsumer.getConsumerGroup(), delayLevel, 5000, getMaxReconsumeTimes());}} catch (Throwable t) {log.error(Failed to send message back, consumerGroup{}, brokerName{}, mq{}, message{},this.defaultMQPushConsumer.getConsumerGroup(), brokerName, mq, msg, t);if (needRetry) {sendMessageBackAsNormalMessage(msg);}} finally {msg.setTopic(NamespaceUtil.withoutNamespace(msg.getTopic(), this.defaultMQPushConsumer.getNamespace()));}}这里实际走的消息发送逻辑consumerSendMessageBack。 consumerSendMessageBack就是简单的消息发送没有复杂的逻辑 我们通过RequestCode.CONSUMER_SEND_MSG_BACK 找到broker的处理逻辑 broker源码分析 处理RequestCode.CONSUMER_SEND_MSG_BACK请求的主要是SendMessageProcessor 我们进入到 org.apache.rocketmq.broker.processor.AbstractSendMessageProcessor#consumerSendMsgBack方法看看 这里就有延时消息的处理。我们在上面一直说过延时消息的等级一直是0.我们看看是如何处理的 如果重试次数没有超过最大重试次数。并且延时消息等级为0. 则延时消息等级为 重试次数3 这里就清楚知道了为什么消息重试的时间和延时消息是如何相似了吧。 没错 消息重试就是用延时消息实现的 哈哈 然后最大重试次数在哪获取的呢答案就是SubscriptionGroupConfig 也就是订阅信息的元数据 元数据再broker的文件就是subscriptionGroup.json 格式就是如下 GID_xiaozoujishu:{brokerId:0,consumeBroadcastEnable:true,consumeEnable:true,consumeFromMinEnable:true,consumeMessageOrderly:false,consumeTimeoutMinute:15,groupName:GID_xiaozoujishu,groupRetryPolicy:{type:CUSTOMIZED},groupSysFlag:0,notifyConsumerIdsChangedEnable:true,retryMaxTimes:16,retryQueueNums:1,whichBrokerWhenConsumeSlowly:1}这里可以看到一个细节就是重试策略是CUSTOMIZED(CustomizedRetryPolicy) 也就是我们对应的重试时间 还有一个策略就是ExponentialRetryPolicy也就是自定义重试次数 不过最大也就只能指定32次 还有一个细节就是重试次数在哪增加的实际也是在org.apache.rocketmq.broker.processor.AbstractSendMessageProcessor#consumerSendMsgBack这个方法 总结 普通消息重试默认是16次顺序消息默认是Integer.MAX_VALUE消息消费失败会以延时消息的方式投递到broker超过延时消息的最大值则以2小时为重试间隔。现在提供了不同的重试策略默认是CustomizedRetryPolicy 如果broker投递失败则会在本地再次消费该消息.重试次数的元数据存储在subscriptionGroup.json. 重试次数的增加有两种 发送到broker后在broker进行reconsumeTimes1(broker重试)如果发送broker失败则在本地消费进行reconsumeTimes1(client重试)
http://www.zqtcl.cn/news/429111/

相关文章:

  • 做网站必须得ipc手机网站制作方法
  • 山东省建设监理协会网站打不开移动互联网开发实践
  • 南宁微网站制作需要多少钱小米商城网站开发文档
  • 制作销售网站有哪些如何制作个人网页设计
  • 新网站做内链智能网站推广软件
  • 西宁市住房和城乡建设局网站广州站是指哪个站
  • 帮建网站网页设计师考试内容
  • seo网站开发txt 发布 wordpress
  • 资讯门户类网站模板定制系统开发公司
  • 让网站快速收录初中毕业如何提升学历
  • 石家庄做网站价格seo优化效果
  • 为什么浏览器打开是2345网址导航seo免费资源大全
  • 网站工程是干啥的动态个人网页制作html教程
  • 阿里云多网站建设wordpress 统计分析
  • 长沙网站定制公司科技特长生
  • 查公司的口碑和评价的网站中学生怎么做网站
  • 做网站买空间多少钱深圳seo优化公司
  • 中国建设银行北京市互联网网站wordpress商城购物表单
  • 万网网站备案管理查询工程建设项目的网站
  • 网站建设国内外研究现状模板ppt设计大赛
  • 专业网站优化方案网站设计过程怎么写
  • 福州定制网站建设网站ip过万
  • wordpress网站评论插件厦门软件网站建设
  • 网站黄金比例wordpress转typecho
  • 重庆有哪些网络公司百度系优化
  • 无锡网站制作方案企业三合一建站公司怎么找
  • 钉钉crm客户管理系统免费seo网站推荐一下软件
  • wordpress公司网站模版怎么显示wordpress里元素的源代码
  • 泉州网站制作运营商专业wordpress评论软件
  • 网站开发是什么意思啊有没有帮人做简历的网站