当前位置: 首页 > news >正文

英文网站建设比较好小程序网站模板

英文网站建设比较好,小程序网站模板,哪些网站专做自媒体的,设计网站的公司名称RocketMQ源码阅读-八-定时消息和消息重试 定时消息概念逻辑流程图延迟级别Producer发送定时消息Broker存储定时消息Broker发送定时消息Broker 持久化定时发送进度 消息重试总结 定时消息 概念 官网给出的概念#xff1a;https://rocketmq.apache.org/zh/docs/featureBehavior… RocketMQ源码阅读-八-定时消息和消息重试 定时消息概念逻辑流程图延迟级别Producer发送定时消息Broker存储定时消息Broker发送定时消息Broker 持久化定时发送进度 消息重试总结 定时消息 概念 官网给出的概念https://rocketmq.apache.org/zh/docs/featureBehavior/02delaymessage 定时消息是 Apache RocketMQ 提供的一种高级消息类型消息被发送至Broker服务端后在指定时间后才能被消费者消费。通过设置一定的定时时间可以实现分布式场景的延时调度触发效果。 逻辑流程图 来源https://www.iocoder.cn/RocketMQ/message-schedule-and-retry/?github1601 延迟级别 RocketMQ 目前只支持固定精度的定时消息。 官方给出不能任意时间延迟的原因如果要支持任意的时间精度在 Broker 层面必须要做消息排序如果再涉及到持久化那么消息排序要不可避免的产生巨大性能开销。 延迟级别相关源码如下MessageStoreConfig /*** 消息延迟级别字符串配置*/ private String messageDelayLevel 1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h;可以看到一共有18个延时级别。解析延迟级别的代码在ScheduleMessageService private final ConcurrentHashMapInteger /* level */, Long/* delay timeMillis */ delayLevelTable new ConcurrentHashMap(32); /*** 解析延迟级别** return 是否解析成功*/ public boolean parseDelayLevel() {HashMapString, Long timeUnitTable new HashMap();timeUnitTable.put(s, 1000L);timeUnitTable.put(m, 1000L * 60);timeUnitTable.put(h, 1000L * 60 * 60);timeUnitTable.put(d, 1000L * 60 * 60 * 24);String levelString this.defaultMessageStore.getMessageStoreConfig().getMessageDelayLevel();try {String[] levelArray levelString.split( );for (int i 0; i levelArray.length; i) {String value levelArray[i];String ch value.substring(value.length() - 1);Long tu timeUnitTable.get(ch);int level i 1;if (level this.maxDelayLevel) {this.maxDelayLevel level;}long num Long.parseLong(value.substring(0, value.length() - 1));long delayTimeMillis tu * num;this.delayLevelTable.put(level, delayTimeMillis);}} catch (Exception e) {log.error(parseDelayLevel exception, e);log.info(levelString String {}, levelString);return false;}return true; }此方法将延迟级别转换为毫秒数存储在delayLevelTable中。 Producer发送定时消息 下面是官方给出发送定时消息的 //定时/延时消息发送MessageBuilder messageBuilder new MessageBuilderImpl();;//以下示例表示延迟时间为10分钟之后的Unix时间戳。Long deliverTimeStamp System.currentTimeMillis() 10L * 60 * 1000;Message message messageBuilder.setTopic(topic)//设置消息索引键可根据关键字精确查找某条消息。.setKeys(messageKey)//设置消息Tag用于消费端根据指定Tag过滤消息。.setTag(messageTag).setDeliveryTimestamp(deliverTimeStamp)//消息体.setBody(messageBody.getBytes()).build();try {//发送消息需要关注发送结果并捕获失败等异常。SendReceipt sendReceipt producer.send(message);System.out.println(sendReceipt.getMessageId());} catch (ClientException e) {e.printStackTrace();}//消费示例一使用PushConsumer消费定时消息只需要在消费监听器处理即可。MessageListener messageListener new MessageListener() {Overridepublic ConsumeResult consume(MessageView messageView) {System.out.println(messageView.getDeliveryTimestamp());//根据消费结果返回状态。return ConsumeResult.SUCCESS;}};//消费示例二使用SimpleConsumer消费定时消息主动获取消息进行消费处理并提交消费结果。ListMessageView messageViewList null;try {messageViewList simpleConsumer.receive(10, Duration.ofSeconds(30));messageViewList.forEach(messageView - {System.out.println(messageView);//消费处理完成后需要主动调用ACK提交消费结果。try {simpleConsumer.ack(messageView);} catch (ClientException e) {e.printStackTrace();}});} catch (ClientException e) {//如果遇到系统流控等原因造成拉取失败需要重新发起获取消息请求。e.printStackTrace();}主要通过setDeliveryTimestamp方法设置定时时间。 Broker存储定时消息 Broker 存储消息时延迟消息进入特定 Topic 为 SCHEDULE_TOPIC_XXXX。同时会将 延迟级别 与 消息队列编号 做固定映射QueueId DelayLevel - 1。核心代码在CommitLog#putMessage中 /*** 添加消息返回消息结果** param msg 消息* return 结果*/ public PutMessageResult putMessage(final MessageExtBrokerInner msg) {// ...省略代码// 定时消息处理final int tranType MessageSysFlag.getTransactionValue(msg.getSysFlag());if (tranType MessageSysFlag.TRANSACTION_NOT_TYPE//|| tranType MessageSysFlag.TRANSACTION_COMMIT_TYPE) {// Delay Deliveryif (msg.getDelayTimeLevel() 0) {if (msg.getDelayTimeLevel() this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) {msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel());}// 存储消息时延迟消息进入 Topic 为 SCHEDULE_TOPIC_XXXX 。topic ScheduleMessageService.SCHEDULE_TOPIC;// 延迟级别 与 消息队列编号 做固定映射queueId ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel());// Backup real topic, queueIdMessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic());MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId()));msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));msg.setTopic(topic);msg.setQueueId(queueId);}}// ...省略代码 }延迟级别 与 消息队列编号 做固定映射的代码为ScheduleMessageService#delayLevel2QueueId /*** 根据 延迟级别 计算 消息队列编号* QueueId DelayLevel - 1** param delayLevel 延迟级别* return 消息队列编号*/ public static int delayLevel2QueueId(final int delayLevel) {return delayLevel - 1; }在生成ConsumeQueue时每条消息的 tagsCode 使用【消息计划消费时间】。这样ScheduleMessageService 在轮询 ConsumeQueue 时可以使用 tagsCode 进行过滤。相应的代码如下 public DispatchRequest checkMessageAndReturnSize(ByteBuffer byteBuffer, final boolean checkCRC, final boolean readBody) {try {// ... 省略代码// 17 propertiesshort propertiesLength byteBuffer.getShort();if (propertiesLength 0) {byteBuffer.get(bytesContent, 0, propertiesLength);String properties new String(bytesContent, 0, propertiesLength, MessageDecoder.CHARSET_UTF8);MapString, String propertiesMap MessageDecoder.string2messageProperties(properties);keys propertiesMap.get(MessageConst.PROPERTY_KEYS);uniqKey propertiesMap.get(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX);String tags propertiesMap.get(MessageConst.PROPERTY_TAGS);if (tags ! null tags.length() 0) {tagsCode MessageExtBrokerInner.tagsString2tagsCode(MessageExt.parseTopicFilterType(sysFlag), tags);}// Timing message processing{String t propertiesMap.get(MessageConst.PROPERTY_DELAY_TIME_LEVEL);if (ScheduleMessageService.SCHEDULE_TOPIC.equals(topic) t ! null) {int delayLevel Integer.parseInt(t);if (delayLevel this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) {delayLevel this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel();}if (delayLevel 0) {tagsCode this.defaultMessageStore.getScheduleMessageService().computeDeliverTimestamp(delayLevel,storeTimestamp);}}}}int readLength calMsgLength(bodyLen, topicLen, propertiesLength);if (totalSize ! readLength) {doNothingForDeadCode(reconsumeTimes);doNothingForDeadCode(flag);doNothingForDeadCode(bornTimeStamp);doNothingForDeadCode(byteBuffer1);doNothingForDeadCode(byteBuffer2);log.error([BUG]read total count not equals msg total size. totalSize{}, readTotalCount{}, bodyLen{}, topicLen{}, propertiesLength{},totalSize, readLength, bodyLen, topicLen, propertiesLength);return new DispatchRequest(totalSize, false/* success */);}return new DispatchRequest(//topic, // 1queueId, // 2physicOffset, // 3totalSize, // 4tagsCode, // 5storeTimestamp, // 6queueOffset, // 7keys, // 8uniqKey, //9sysFlag, // 9preparedTransactionOffset// 10);} catch (Exception e) {}return new DispatchRequest(-1, false /* success */); }32行调用computeDeliverTimestamp方法计算计划消费时间 /*** 计算 投递时间【计划消费时间】** param delayLevel 延迟级别* param storeTimestamp 存储时间* return 投递时间【计划消费时间】*/ public long computeDeliverTimestamp(final int delayLevel, final long storeTimestamp) {Long time this.delayLevelTable.get(delayLevel);if (time ! null) {return time storeTimestamp;}return storeTimestamp 1000; }计算出来的计划消费时间作为tagsCode。后面Broker发送定时消息时会用到这个tagsCode进行过滤。 Broker发送定时消息 针对延时消息队列即每一个SCHEDULE_TOPIC_XXXX主题每个消费队列都会有一个单独的定时任务进行轮询用来发送到达定时的计划消费时间的消息。流程图如下出处;https://www.iocoder.cn/RocketMQ/message-schedule-and-retry/?github1601相应的实现源码在DeliverDelayedMessageTimerTask 中该类继承TimerTask是一个定时任务源码如下 /*** 发送投递延迟消息定时任务*/ class DeliverDelayedMessageTimerTask extends TimerTask {/*** 延迟级别*/private final int delayLevel;/*** 位置*/private final long offset;public DeliverDelayedMessageTimerTask(int delayLevel, long offset) {this.delayLevel delayLevel;this.offset offset;}Overridepublic void run() {try {this.executeOnTimeup();} catch (Exception e) {// XXX: warn and notify melog.error(ScheduleMessageService, executeOnTimeup exception, e);ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(this.delayLevel, this.offset), DELAY_FOR_A_PERIOD);}}/*** 纠正可投递时间。* 因为发送级别对应的发送间隔可以调整如果超过当前间隔则修正成当前配置避免后面的消息无法发送。** param now 当前时间* param deliverTimestamp 投递时间* return 纠正结果*/private long correctDeliverTimestamp(final long now, final long deliverTimestamp) {long result deliverTimestamp;long maxTimestamp now ScheduleMessageService.this.delayLevelTable.get(this.delayLevel);if (deliverTimestamp maxTimestamp) {result now;}return result;}public void executeOnTimeup() {ConsumeQueue cq ScheduleMessageService.this.defaultMessageStore.findConsumeQueue(SCHEDULE_TOPIC, delayLevel2QueueId(delayLevel));long failScheduleOffset offset;if (cq ! null) {SelectMappedBufferResult bufferCQ cq.getIndexBuffer(this.offset);if (bufferCQ ! null) {try {long nextOffset offset;int i 0;for (; i bufferCQ.getSize(); i ConsumeQueue.CQ_STORE_UNIT_SIZE) {long offsetPy bufferCQ.getByteBuffer().getLong();int sizePy bufferCQ.getByteBuffer().getInt();long tagsCode bufferCQ.getByteBuffer().getLong();long now System.currentTimeMillis();long deliverTimestamp this.correctDeliverTimestamp(now, tagsCode);nextOffset offset (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);long countdown deliverTimestamp - now;if (countdown 0) { // 消息到达可发送时间MessageExt msgExt ScheduleMessageService.this.defaultMessageStore.lookMessageByOffset(offsetPy, sizePy);if (msgExt ! null) {try {// 发送消息MessageExtBrokerInner msgInner this.messageTimeup(msgExt);PutMessageResult putMessageResult ScheduleMessageService.this.defaultMessageStore.putMessage(msgInner);if (putMessageResult ! null putMessageResult.getPutMessageStatus() PutMessageStatus.PUT_OK) { // 发送成功continue;} else { // 发送失败// XXX: warn and notify melog.error(ScheduleMessageService, a message time up, but reput it failed, topic: {} msgId {}, msgExt.getTopic(), msgExt.getMsgId());// 安排下一次任务ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(this.delayLevel, nextOffset), DELAY_FOR_A_PERIOD);// 更新进度ScheduleMessageService.this.updateOffset(this.delayLevel, nextOffset);return;}} catch (Exception e) {// XXX: warn and notify melog.error(ScheduleMessageService, messageTimeup execute error, drop it. msgExt msgExt , nextOffset nextOffset ,offsetPy offsetPy ,sizePy sizePy, e);}}} else {// 安排下一次任务ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(this.delayLevel, nextOffset), countdown);// 更新进度ScheduleMessageService.this.updateOffset(this.delayLevel, nextOffset);return;}} // end of fornextOffset offset (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);// 安排下一次任务ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(this.delayLevel, nextOffset), DELAY_FOR_A_WHILE);// 更新进度ScheduleMessageService.this.updateOffset(this.delayLevel, nextOffset);return;} finally {bufferCQ.release();}} // end of if (bufferCQ ! null)else { // 消费队列已经被删除部分跳转到最小的消费进度long cqMinOffset cq.getMinOffsetInQueue();if (offset cqMinOffset) {failScheduleOffset cqMinOffset;log.error(schedule CQ offset invalid. offset offset , cqMinOffset cqMinOffset , queueId cq.getQueueId());}}} // end of if (cq ! null)ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(this.delayLevel, failScheduleOffset), DELAY_FOR_A_WHILE);}/*** 设置消息内容** param msgExt 消息* return 消息*/private MessageExtBrokerInner messageTimeup(MessageExt msgExt) {MessageExtBrokerInner msgInner new MessageExtBrokerInner();msgInner.setBody(msgExt.getBody());msgInner.setFlag(msgExt.getFlag());MessageAccessor.setProperties(msgInner, msgExt.getProperties());TopicFilterType topicFilterType MessageExt.parseTopicFilterType(msgInner.getSysFlag());long tagsCodeValue MessageExtBrokerInner.tagsString2tagsCode(topicFilterType, msgInner.getTags());msgInner.setTagsCode(tagsCodeValue);msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgExt.getProperties()));msgInner.setSysFlag(msgExt.getSysFlag());msgInner.setBornTimestamp(msgExt.getBornTimestamp());msgInner.setBornHost(msgExt.getBornHost());msgInner.setStoreHost(msgExt.getStoreHost());msgInner.setReconsumeTimes(msgExt.getReconsumeTimes());msgInner.setWaitStoreMsgOK(false);MessageAccessor.clearProperty(msgInner, MessageConst.PROPERTY_DELAY_TIME_LEVEL);msgInner.setTopic(msgInner.getProperty(MessageConst.PROPERTY_REAL_TOPIC));String queueIdStr msgInner.getProperty(MessageConst.PROPERTY_REAL_QUEUE_ID);int queueId Integer.parseInt(queueIdStr);msgInner.setQueueId(queueId);return msgInner;} }上面代码实现了逻辑如下 轮询延迟消息的topic看是否有到期的定时任务到期的定时任务提交到CommitLog供消费者消费 Broker 持久化定时发送进度 定时消息发送进度存储在文件(…/config/delayOffset.json)里每 10s 定时持久化发送进度 核心代码在类ScheduleMessageService中 public void start() {// 定时发送消息for (Map.EntryInteger, Long entry : this.delayLevelTable.entrySet()) {Integer level entry.getKey();Long timeDelay entry.getValue();Long offset this.offsetTable.get(level);if (null offset) {offset 0L;}if (timeDelay ! null) {this.timer.schedule(new DeliverDelayedMessageTimerTask(level, offset), FIRST_DELAY_TIME);}}// 定时持久化发送进度this.timer.scheduleAtFixedRate(new TimerTask() {Overridepublic void run() {try {ScheduleMessageService.this.persist();} catch (Exception e) {log.error(scheduleAtFixedRate flush exception, e);}}}, 10000, this.defaultMessageStore.getMessageStoreConfig().getFlushDelayOffsetInterval()); }此方法同样是启动一个定时任务每10s执行一次持久化操作。 消息重试 消息重试发生在Consumer消费消费时消费失败的消息会发回到Broker进入延时消息队列过一段时间重新消费。所以消息重试和定时/延时消息是密切相关的。消费者将消费失败的消息发回Broker的源码在SendMessageProcessor#consumerSendMsgBack /*** 消费者发回消息** param ctx ctx* param request 请求* return 响应* throws RemotingCommandException 当远程调用异常*/ private RemotingCommand consumerSendMsgBack(final ChannelHandlerContext ctx, final RemotingCommand request) throws RemotingCommandException {// ... 省略部分代码// 处理 delayLevelint delayLevel requestHeader.getDelayLevel();int maxReconsumeTimes subscriptionGroupConfig.getRetryMaxTimes();if (request.getVersion() MQVersion.Version.V3_4_9.ordinal()) {maxReconsumeTimes requestHeader.getMaxReconsumeTimes();}if (msgExt.getReconsumeTimes() maxReconsumeTimes//|| delayLevel 0) { // 如果超过最大消费次数则topic修改成%DLQ% 分组名即加入 死信队列(Dead Letter Queue)// 此时不会进入} else {if (0 delayLevel) {delayLevel 3 msgExt.getReconsumeTimes();}// 设置延时msgExt.setDelayTimeLevel(delayLevel);}// ... 省略部分代码return response; }重点在于第26行设置了延时时间。 总结 本篇分析了RocketMQ的定时消息的处理逻辑。 RocketMQ不支持任意时间的延迟只支持固定时间因为性能考虑Producer发送定时消息只是调用setDeliveryTimestamp指定延迟时间或等级Broker会先将定时消息存储在特定的Topic名字格式为 SCHEDULE_TOPIC_XXXXBroker会启动一个定时任务每1000ms执行一次轮询 SCHEDULE_TOPIC_XXXX 中的消息通过tagsCode过滤将到期的消息发送到CommitLogBroker同时会启动持久化定时发送进度的任务每10s执行一次消息发送存储到Commitlog后Consumer就可以消费到消息消费失败时Consumer会将消息发回到Broker的延时消息Topic固定时间后再次重试消费
http://www.zqtcl.cn/news/255126/

相关文章:

  • 灵犀科技网站开发佼佼者门户网站建设和检务公开整改
  • php mysql做网站登录免费素材哪里找
  • 休闲食品网站建设网页设计网站实例
  • 微信网站结构58同城北京网站建设
  • thinkcmf做网站快不快南宁网站建设找哪家好
  • 百度网站类型西部数码官网
  • app和网站哪个难做如何做本地网站
  • 怎么做网站导航栏个性化定制产品
  • 如何做企业网站排名优化工业设计公司logo
  • 怎样制作网站教程中国建设银行总部网站
  • 美食网站建设规划书辽宁建设工程信息网中标通知
  • iis搭建网站教程深圳注册公司条件
  • 怎么优化网站关键词排名api接口开发网站开发
  • 如何提升网站的搜索排名秦皇岛黄页大全秦皇岛本地信息网
  • 学生作业网站笔记本可以做网站吗
  • 网站开发毕设开题报告在线设计网站源码
  • 优普南通网站建设申请注册公司流程
  • 越南网站建设河南企业做网站
  • 优化免费网站建设做网站领券收佣金
  • 网站常用图标素材办公用品十大购物网站排名
  • 网络门户网站站长要维护网站
  • 网上有做衣服的网站有哪些做网站推广怎样才能省钱
  • 网站专题设计欣赏找网站公司做网站是怎样的流程
  • 网站上传后如何设置首页制作网络游戏
  • 外贸接单网站排名榜珠宝行网站建设方案
  • 酒店门户网站建设背景门户网站的发布特点
  • 网站营销与推广汕头澄海
  • php和asp做网站哪个好阿里云wordpress配置
  • 东莞响应式网站建设网络营销策略和营销策略的区别
  • 番禺做网站哪家强合肥网页网站制作