当前位置: 首页 > news >正文

网站建设所需要软件wordpress写文章报错

网站建设所需要软件,wordpress写文章报错,广告投放面试,wordpress 详情页这里是weihubeats,觉得文章不错可以关注公众号小奏技术#xff0c;文章首发。拒绝营销号#xff0c;拒绝标题党 RocketMQ版本 5.1.0 普通消息 消息重试的的实现分并普通消息和顺序消息。两者的重试机制大同小异。我们这里先看看不同消息 这里是官网定义的消息重试次数以及… 这里是weihubeats,觉得文章不错可以关注公众号小奏技术文章首发。拒绝营销号拒绝标题党 RocketMQ版本 5.1.0 普通消息 消息重试的的实现分并普通消息和顺序消息。两者的重试机制大同小异。我们这里先看看不同消息 这里是官网定义的消息重试次数以及时间间隔。 有没有发现一个问题这个重试消息的时间间隔和延时消息的时间间隔这么类似 图片来源官网 待会我们进行源码分析就知道了。这里先卖个关子 client源码分析 ConsumeMessageConcurrentlyService 普通消息消费以及重试的逻辑在ConsumeMessageConcurrentlyService中如果是顺序消息则是ConsumeMessageOrderlyService 具体的逻辑是org.apache.rocketmq.client.impl.consumer.ConsumeMessageConcurrentlyService.ConsumeRequest#run 由于今天我们的重点关注点是消息如何重试的。所以我们在消息消费的一些其他细节会略过 首先消息消费的状态由ConsumeConcurrentlyStatus 这个枚举控制 注释很清晰 CONSUME_SUCCESS 成功消费RECONSUME_LATER 消费时间等待重试 可以看到消息消费主要是在 status listener.consumeMessage(Collections.unmodifiableList(msgs), context);这里的listener就是实现了MessageListener接口的类。也就是我们在编写consumer需要传入的一个对象 比如像这种 DefaultMQPushConsumer consumer new DefaultMQPushConsumer(CONSUMER_GROUP, true);consumer.setNamesrvAddr(DEFAULT_NAMESRVADDR); // consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);consumer.subscribe(TOPIC, *);consumer.registerMessageListener((MessageListenerConcurrently) (msg, context) - {System.out.printf(%s Receive New Messages: %s %n, Thread.currentThread().getName(), msg);return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;});consumer.start();System.out.printf(Consumer Started.%n);需要注意的是虽然消费状态只有成功和失败。但是消费结果却是由ConsumeReturnType这个枚举类定义的 SUCCESS 成功TIME_OUT 超时EXCEPTION 异常RETURNNULL 消费结果返回nullFAILED 消费结果返回失败 可以看到这里把消费成功还是失败再次作了细分 if (null status) {if (hasException) {returnType ConsumeReturnType.EXCEPTION;} else {returnType ConsumeReturnType.RETURNNULL;}} else if (consumeRT defaultMQPushConsumer.getConsumeTimeout() * 60 * 1000) {returnType ConsumeReturnType.TIME_OUT;} else if (ConsumeConcurrentlyStatus.RECONSUME_LATER status) {returnType ConsumeReturnType.FAILED;} else if (ConsumeConcurrentlyStatus.CONSUME_SUCCESS status) {returnType ConsumeReturnType.SUCCESS;}我们继续回归主线看看如果消费状态为RECONSUME_LATER会作什么处理 if (!processQueue.isDropped()) {ConsumeMessageConcurrentlyService.this.processConsumeResult(status, context, this);} else {log.warn(processQueue is dropped without process consume result. messageQueue{}, msgs{}, messageQueue, msgs);}这里可以看到processConsumeResult方法对消息结果进行了处理。我们来看看这个方法 processConsumeResult switch (this.defaultMQPushConsumer.getMessageModel()) {case BROADCASTING:for (int i ackIndex 1; i consumeRequest.getMsgs().size(); i) {MessageExt msg consumeRequest.getMsgs().get(i);log.warn(BROADCASTING, the message consume failed, drop it, {}, msg.toString());}break;case CLUSTERING:ListMessageExt msgBackFailed new ArrayList(consumeRequest.getMsgs().size());for (int i ackIndex 1; i consumeRequest.getMsgs().size(); i) {MessageExt msg consumeRequest.getMsgs().get(i);// Maybe message is expired and cleaned, just ignore it.if (!consumeRequest.getProcessQueue().containsMessage(msg)) {log.info(Message is not found in its process queue; skip send-back-procedure, topic{}, brokerName{}, queueId{}, queueOffset{}, msg.getTopic(), msg.getBrokerName(),msg.getQueueId(), msg.getQueueOffset());continue;}boolean result this.sendMessageBack(msg, context);if (!result) {msg.setReconsumeTimes(msg.getReconsumeTimes() 1);msgBackFailed.add(msg);}}if (!msgBackFailed.isEmpty()) {consumeRequest.getMsgs().removeAll(msgBackFailed);this.submitConsumeRequestLater(msgBackFailed, consumeRequest.getProcessQueue(), consumeRequest.getMessageQueue());}break;default:break;} 这里可以看到如果是BROADCASTING(广播消息)则只是打印了warnlog什么处理也不会干 我们来看看集群消息 注意这里的循环条件 for (int i ackIndex 1; i consumeRequest.getMsgs().size(); i)如果所有消息消费成功则 ackIndex consumeRequest.getMsgs().不会有消息进行下面的逻辑处理。如果有消息消费失败才会进行下面的处理 看看下面的处理逻辑 boolean result this.sendMessageBack(msg, context);if (!result) {msg.setReconsumeTimes(msg.getReconsumeTimes() 1);msgBackFailed.add(msg);}发送消费失败消息给broker如果消息发送给broker失败则将消息丢到msgBackFailed。然后再client自己进行消息重新消费重试次数reconsumeTimes 1 我们来看看sendMessageBack的处理逻辑 public boolean sendMessageBack(final MessageExt msg, final ConsumeConcurrentlyContext context) {int delayLevel context.getDelayLevelWhenNextConsume();// Wrap topic with namespace before sending back message.msg.setTopic(this.defaultMQPushConsumer.withNamespace(msg.getTopic()));try {this.defaultMQPushConsumerImpl.sendMessageBack(msg, delayLevel, this.defaultMQPushConsumer.queueWithNamespace(context.getMessageQueue()));return true;} catch (Exception e) {log.error(sendMessageBack exception, group: this.consumerGroup msg: msg, e);}return false;}这里有实际delayLevel延时级别一直是0。 实际发送消息到broker是 this.defaultMQPushConsumerImpl.sendMessageBack(msg, delayLevel, this.defaultMQPushConsumer.queueWithNamespace(context.getMessageQueue()));我们看看sendMessageBack的具体逻辑 private void sendMessageBack(MessageExt msg, int delayLevel, final String brokerName, final MessageQueue mq)throws RemotingException, MQBrokerException, InterruptedException, MQClientException {boolean needRetry true;try {if (brokerName ! null brokerName.startsWith(MixAll.LOGICAL_QUEUE_MOCK_BROKER_PREFIX)|| mq ! null mq.getBrokerName().startsWith(MixAll.LOGICAL_QUEUE_MOCK_BROKER_PREFIX)) {needRetry false;sendMessageBackAsNormalMessage(msg);} else {String brokerAddr (null ! brokerName) ? this.mQClientFactory.findBrokerAddressInPublish(brokerName): RemotingHelper.parseSocketAddressAddr(msg.getStoreHost());this.mQClientFactory.getMQClientAPIImpl().consumerSendMessageBack(brokerAddr, brokerName, msg,this.defaultMQPushConsumer.getConsumerGroup(), delayLevel, 5000, getMaxReconsumeTimes());}} catch (Throwable t) {log.error(Failed to send message back, consumerGroup{}, brokerName{}, mq{}, message{},this.defaultMQPushConsumer.getConsumerGroup(), brokerName, mq, msg, t);if (needRetry) {sendMessageBackAsNormalMessage(msg);}} finally {msg.setTopic(NamespaceUtil.withoutNamespace(msg.getTopic(), this.defaultMQPushConsumer.getNamespace()));}}这里实际走的消息发送逻辑consumerSendMessageBack。 consumerSendMessageBack就是简单的消息发送没有复杂的逻辑 我们通过RequestCode.CONSUMER_SEND_MSG_BACK 找到broker的处理逻辑 broker源码分析 处理RequestCode.CONSUMER_SEND_MSG_BACK请求的主要是SendMessageProcessor 我们进入到 org.apache.rocketmq.broker.processor.AbstractSendMessageProcessor#consumerSendMsgBack方法看看 这里就有延时消息的处理。我们在上面一直说过延时消息的等级一直是0.我们看看是如何处理的 如果重试次数没有超过最大重试次数。并且延时消息等级为0. 则延时消息等级为 重试次数3 这里就清楚知道了为什么消息重试的时间和延时消息是如何相似了吧。 没错 消息重试就是用延时消息实现的 哈哈 然后最大重试次数在哪获取的呢答案就是SubscriptionGroupConfig 也就是订阅信息的元数据 元数据再broker的文件就是subscriptionGroup.json 格式就是如下 GID_xiaozoujishu:{brokerId:0,consumeBroadcastEnable:true,consumeEnable:true,consumeFromMinEnable:true,consumeMessageOrderly:false,consumeTimeoutMinute:15,groupName:GID_xiaozoujishu,groupRetryPolicy:{type:CUSTOMIZED},groupSysFlag:0,notifyConsumerIdsChangedEnable:true,retryMaxTimes:16,retryQueueNums:1,whichBrokerWhenConsumeSlowly:1}这里可以看到一个细节就是重试策略是CUSTOMIZED(CustomizedRetryPolicy) 也就是我们对应的重试时间 还有一个策略就是ExponentialRetryPolicy也就是自定义重试次数 不过最大也就只能指定32次 还有一个细节就是重试次数在哪增加的实际也是在org.apache.rocketmq.broker.processor.AbstractSendMessageProcessor#consumerSendMsgBack这个方法 总结 普通消息重试默认是16次顺序消息默认是Integer.MAX_VALUE消息消费失败会以延时消息的方式投递到broker超过延时消息的最大值则以2小时为重试间隔。现在提供了不同的重试策略默认是CustomizedRetryPolicy 如果broker投递失败则会在本地再次消费该消息.重试次数的元数据存储在subscriptionGroup.json. 重试次数的增加有两种 发送到broker后在broker进行reconsumeTimes1(broker重试)如果发送broker失败则在本地消费进行reconsumeTimes1(client重试)
http://www.zqtcl.cn/news/977756/

相关文章:

  • 做网站和软件的团队网页设计与网页制作的实验报告
  • 广州网站建设很棒 乐云践新wordpress搬家 登录报错
  • 顺的网站建设案例如何上传网站
  • 网站管理和建设工作职责中国建设银行卖狗年纪念币官方网站
  • 如何快速开发一个网站干洗店投资多少钱可以营业了
  • 哪些分类网站WordPress商用收费吗
  • 南开网站建设优化seo福建凭祥建设工程有限公司网站
  • 建设工程消防设计备案凭证查询网站网站建设课程设计目的和内容
  • 网站开发要花多少钱wordpress网站邀请码
  • 社旗网站设计小程序制作用华网天下优惠
  • 建设产品网站代理注册企业邮箱
  • 购物网站建设费用珠海本地网站
  • 做电商网站前期做什么工作网站后台jsp怎么做分页
  • 百家利网站开发搜索引擎分哪三类
  • 安徽集团网站建设深圳最新通告今天
  • 公司网站主机流量30g每月够用吗攀枝花网站网站建设
  • 淘宝做图片的网站手机网站北京
  • 重庆网站首页排名公司网站公众号小程序开发公司
  • 河源网站制作1993seo福州室内设计公司排名
  • 哪里有做装修网站网站开发总出现出现404
  • 做a漫画在线观看网站策划营销型网站
  • 怎么 从头开始建设一个网站临沂高端网站建设
  • 网页设计制作网站素材传奇代理平台
  • 公司建站网站软文营销方案
  • 成品短视频网站源码搭建免费温州外贸网站制作
  • 旅游公司网站建设pptwordpress 用户增强
  • wordpress 最新东莞seo技术培训
  • 上海微网站开发网站 选项卡 图标
  • 淘宝网站建设的目标什么做网站公司 营销
  • 360企业网站认证wordpress 个人照片