当前位置: 首页 > news >正文

高性能网站建设指南在线阅读互联网行业发展前景分析报告

高性能网站建设指南在线阅读,互联网行业发展前景分析报告,葫芦岛建设网站,网站建设如何做报价这里是weihubeats,觉得文章不错可以关注公众号小奏技术#xff0c;文章首发。拒绝营销号#xff0c;拒绝标题党 RocketMQ版本 5.1.0 普通消息 消息重试的的实现分并普通消息和顺序消息。两者的重试机制大同小异。我们这里先看看不同消息 这里是官网定义的消息重试次数以及… 这里是weihubeats,觉得文章不错可以关注公众号小奏技术文章首发。拒绝营销号拒绝标题党 RocketMQ版本 5.1.0 普通消息 消息重试的的实现分并普通消息和顺序消息。两者的重试机制大同小异。我们这里先看看不同消息 这里是官网定义的消息重试次数以及时间间隔。 有没有发现一个问题这个重试消息的时间间隔和延时消息的时间间隔这么类似 图片来源官网 待会我们进行源码分析就知道了。这里先卖个关子 client源码分析 ConsumeMessageConcurrentlyService 普通消息消费以及重试的逻辑在ConsumeMessageConcurrentlyService中如果是顺序消息则是ConsumeMessageOrderlyService 具体的逻辑是org.apache.rocketmq.client.impl.consumer.ConsumeMessageConcurrentlyService.ConsumeRequest#run 由于今天我们的重点关注点是消息如何重试的。所以我们在消息消费的一些其他细节会略过 首先消息消费的状态由ConsumeConcurrentlyStatus 这个枚举控制 注释很清晰 CONSUME_SUCCESS 成功消费RECONSUME_LATER 消费时间等待重试 可以看到消息消费主要是在 status listener.consumeMessage(Collections.unmodifiableList(msgs), context);这里的listener就是实现了MessageListener接口的类。也就是我们在编写consumer需要传入的一个对象 比如像这种 DefaultMQPushConsumer consumer new DefaultMQPushConsumer(CONSUMER_GROUP, true);consumer.setNamesrvAddr(DEFAULT_NAMESRVADDR); // consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);consumer.subscribe(TOPIC, *);consumer.registerMessageListener((MessageListenerConcurrently) (msg, context) - {System.out.printf(%s Receive New Messages: %s %n, Thread.currentThread().getName(), msg);return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;});consumer.start();System.out.printf(Consumer Started.%n);需要注意的是虽然消费状态只有成功和失败。但是消费结果却是由ConsumeReturnType这个枚举类定义的 SUCCESS 成功TIME_OUT 超时EXCEPTION 异常RETURNNULL 消费结果返回nullFAILED 消费结果返回失败 可以看到这里把消费成功还是失败再次作了细分 if (null status) {if (hasException) {returnType ConsumeReturnType.EXCEPTION;} else {returnType ConsumeReturnType.RETURNNULL;}} else if (consumeRT defaultMQPushConsumer.getConsumeTimeout() * 60 * 1000) {returnType ConsumeReturnType.TIME_OUT;} else if (ConsumeConcurrentlyStatus.RECONSUME_LATER status) {returnType ConsumeReturnType.FAILED;} else if (ConsumeConcurrentlyStatus.CONSUME_SUCCESS status) {returnType ConsumeReturnType.SUCCESS;}我们继续回归主线看看如果消费状态为RECONSUME_LATER会作什么处理 if (!processQueue.isDropped()) {ConsumeMessageConcurrentlyService.this.processConsumeResult(status, context, this);} else {log.warn(processQueue is dropped without process consume result. messageQueue{}, msgs{}, messageQueue, msgs);}这里可以看到processConsumeResult方法对消息结果进行了处理。我们来看看这个方法 processConsumeResult switch (this.defaultMQPushConsumer.getMessageModel()) {case BROADCASTING:for (int i ackIndex 1; i consumeRequest.getMsgs().size(); i) {MessageExt msg consumeRequest.getMsgs().get(i);log.warn(BROADCASTING, the message consume failed, drop it, {}, msg.toString());}break;case CLUSTERING:ListMessageExt msgBackFailed new ArrayList(consumeRequest.getMsgs().size());for (int i ackIndex 1; i consumeRequest.getMsgs().size(); i) {MessageExt msg consumeRequest.getMsgs().get(i);// Maybe message is expired and cleaned, just ignore it.if (!consumeRequest.getProcessQueue().containsMessage(msg)) {log.info(Message is not found in its process queue; skip send-back-procedure, topic{}, brokerName{}, queueId{}, queueOffset{}, msg.getTopic(), msg.getBrokerName(),msg.getQueueId(), msg.getQueueOffset());continue;}boolean result this.sendMessageBack(msg, context);if (!result) {msg.setReconsumeTimes(msg.getReconsumeTimes() 1);msgBackFailed.add(msg);}}if (!msgBackFailed.isEmpty()) {consumeRequest.getMsgs().removeAll(msgBackFailed);this.submitConsumeRequestLater(msgBackFailed, consumeRequest.getProcessQueue(), consumeRequest.getMessageQueue());}break;default:break;} 这里可以看到如果是BROADCASTING(广播消息)则只是打印了warnlog什么处理也不会干 我们来看看集群消息 注意这里的循环条件 for (int i ackIndex 1; i consumeRequest.getMsgs().size(); i)如果所有消息消费成功则 ackIndex consumeRequest.getMsgs().不会有消息进行下面的逻辑处理。如果有消息消费失败才会进行下面的处理 看看下面的处理逻辑 boolean result this.sendMessageBack(msg, context);if (!result) {msg.setReconsumeTimes(msg.getReconsumeTimes() 1);msgBackFailed.add(msg);}发送消费失败消息给broker如果消息发送给broker失败则将消息丢到msgBackFailed。然后再client自己进行消息重新消费重试次数reconsumeTimes 1 我们来看看sendMessageBack的处理逻辑 public boolean sendMessageBack(final MessageExt msg, final ConsumeConcurrentlyContext context) {int delayLevel context.getDelayLevelWhenNextConsume();// Wrap topic with namespace before sending back message.msg.setTopic(this.defaultMQPushConsumer.withNamespace(msg.getTopic()));try {this.defaultMQPushConsumerImpl.sendMessageBack(msg, delayLevel, this.defaultMQPushConsumer.queueWithNamespace(context.getMessageQueue()));return true;} catch (Exception e) {log.error(sendMessageBack exception, group: this.consumerGroup msg: msg, e);}return false;}这里有实际delayLevel延时级别一直是0。 实际发送消息到broker是 this.defaultMQPushConsumerImpl.sendMessageBack(msg, delayLevel, this.defaultMQPushConsumer.queueWithNamespace(context.getMessageQueue()));我们看看sendMessageBack的具体逻辑 private void sendMessageBack(MessageExt msg, int delayLevel, final String brokerName, final MessageQueue mq)throws RemotingException, MQBrokerException, InterruptedException, MQClientException {boolean needRetry true;try {if (brokerName ! null brokerName.startsWith(MixAll.LOGICAL_QUEUE_MOCK_BROKER_PREFIX)|| mq ! null mq.getBrokerName().startsWith(MixAll.LOGICAL_QUEUE_MOCK_BROKER_PREFIX)) {needRetry false;sendMessageBackAsNormalMessage(msg);} else {String brokerAddr (null ! brokerName) ? this.mQClientFactory.findBrokerAddressInPublish(brokerName): RemotingHelper.parseSocketAddressAddr(msg.getStoreHost());this.mQClientFactory.getMQClientAPIImpl().consumerSendMessageBack(brokerAddr, brokerName, msg,this.defaultMQPushConsumer.getConsumerGroup(), delayLevel, 5000, getMaxReconsumeTimes());}} catch (Throwable t) {log.error(Failed to send message back, consumerGroup{}, brokerName{}, mq{}, message{},this.defaultMQPushConsumer.getConsumerGroup(), brokerName, mq, msg, t);if (needRetry) {sendMessageBackAsNormalMessage(msg);}} finally {msg.setTopic(NamespaceUtil.withoutNamespace(msg.getTopic(), this.defaultMQPushConsumer.getNamespace()));}}这里实际走的消息发送逻辑consumerSendMessageBack。 consumerSendMessageBack就是简单的消息发送没有复杂的逻辑 我们通过RequestCode.CONSUMER_SEND_MSG_BACK 找到broker的处理逻辑 broker源码分析 处理RequestCode.CONSUMER_SEND_MSG_BACK请求的主要是SendMessageProcessor 我们进入到 org.apache.rocketmq.broker.processor.AbstractSendMessageProcessor#consumerSendMsgBack方法看看 这里就有延时消息的处理。我们在上面一直说过延时消息的等级一直是0.我们看看是如何处理的 如果重试次数没有超过最大重试次数。并且延时消息等级为0. 则延时消息等级为 重试次数3 这里就清楚知道了为什么消息重试的时间和延时消息是如何相似了吧。 没错 消息重试就是用延时消息实现的 哈哈 然后最大重试次数在哪获取的呢答案就是SubscriptionGroupConfig 也就是订阅信息的元数据 元数据再broker的文件就是subscriptionGroup.json 格式就是如下 GID_xiaozoujishu:{brokerId:0,consumeBroadcastEnable:true,consumeEnable:true,consumeFromMinEnable:true,consumeMessageOrderly:false,consumeTimeoutMinute:15,groupName:GID_xiaozoujishu,groupRetryPolicy:{type:CUSTOMIZED},groupSysFlag:0,notifyConsumerIdsChangedEnable:true,retryMaxTimes:16,retryQueueNums:1,whichBrokerWhenConsumeSlowly:1}这里可以看到一个细节就是重试策略是CUSTOMIZED(CustomizedRetryPolicy) 也就是我们对应的重试时间 还有一个策略就是ExponentialRetryPolicy也就是自定义重试次数 不过最大也就只能指定32次 还有一个细节就是重试次数在哪增加的实际也是在org.apache.rocketmq.broker.processor.AbstractSendMessageProcessor#consumerSendMsgBack这个方法 总结 普通消息重试默认是16次顺序消息默认是Integer.MAX_VALUE消息消费失败会以延时消息的方式投递到broker超过延时消息的最大值则以2小时为重试间隔。现在提供了不同的重试策略默认是CustomizedRetryPolicy 如果broker投递失败则会在本地再次消费该消息.重试次数的元数据存储在subscriptionGroup.json. 重试次数的增加有两种 发送到broker后在broker进行reconsumeTimes1(broker重试)如果发送broker失败则在本地消费进行reconsumeTimes1(client重试)
http://www.zqtcl.cn/news/739099/

相关文章:

  • 哪些网站做的最好厦门网站建设网站
  • 网站安全事件应急处置机制建设类似百度的网站
  • 内蒙古知名网站建设网站测速工具
  • 怎样建立网站赚钱怎么登录住建局官网
  • 建站自学网页转向功能网站
  • 网站都有什么费用做酒店网站有哪些目录
  • 本地郑州网站建设东莞网站优化中易
  • 动态域名可以建网站德州公司做网站
  • 深圳建设银行官方网站wordpress 添加qq
  • 甘肃第九建设集团公司网站网站对企业的好处
  • 论坛网站建设规划书公司网站建设与设计制作
  • 做棋牌游戏网站犯法吗如何进行搜索引擎的优化
  • 常见的网站首页布局有哪几种陈光锋网站运营推广新动向
  • 手机网站活动策划方案开一个设计公司
  • 宝塔建设网站教程visual studio 2010 网站开发教程
  • 做网站购买服务器做谷歌网站使用什么统计代码吗
  • 网站系统与网站源码的关系emlog轻松转wordpress
  • 网站的简介怎么在后台炒做吉林省住房城乡建设厅网站首页
  • 泉州易尔通网站建设国际酒店网站建设不好
  • 网页下载网站福田企业网站推广公司
  • 北京网站建设开发公司哪家好网站添加在线留言
  • 新建的网站怎么做seo优化平面广告创意设计
  • yy陪玩网站怎么做软件项目管理计划
  • 西安建网站价格低百度推广区域代理
  • 中英网站模板 照明公司注册在自贸区的利弊
  • 全球十大网站排名wordpress标题连接符
  • 网站开发可能遇到的问题四川建筑人才招聘网
  • 镇江网站托管怎么做淘宝网站赚钱吗
  • 交互式网站是什么知名vi设计企业
  • 上海个人做网站网站建设销售好做嘛