当前位置: 首页 > news >正文

如何做系统集成公司网站wordpress 输出作者

如何做系统集成公司网站,wordpress 输出作者,个人博客网站的设计与实现,中小学网站建设前言 顺序消息是 RocketMQ 提供的一种高级消息类型#xff0c;支持消费者按照发送消息的先后顺序获取消息#xff0c;从而实现业务场景中的顺序处理。 顺序消息的顺序关系通过消息组#xff08;MessageGroup#xff09;判定和识别#xff0c;发送顺序消息时需要为每条消息…前言 顺序消息是 RocketMQ 提供的一种高级消息类型支持消费者按照发送消息的先后顺序获取消息从而实现业务场景中的顺序处理。 顺序消息的顺序关系通过消息组MessageGroup判定和识别发送顺序消息时需要为每条消息设置归属的消息组相同消息组的多条消息之间遵循先进先出的顺序关系不同消息组、无消息组的消息之间不涉及顺序性。比如一条订单从创建到完结整个生命周期内产生的消息如果要保证消费的顺序性则可以用订单号作为 MessageGroup。 RocketMQ 4.x 实现顺序消息相对容易因为采用的是队列模型一个队列只能被一个消费者消费而队列本身是能保证先进先出的此时只要保证消费者单线程串行消费即可。 到了 RocketMQ 5.0 时代Pop 模式下因为采用的是消息模型消费者可以消费所有队列的消息顺序消息的实现也将变得更加复杂。 如何保证消息的顺序 顺序消息需要依赖生产者、Broker、消费者共同保证。 生产顺序性 首先是消息生产的顺序性相同 MessageGroup 必须保证单一生产者、单线程同步发送。多生产者实例或者多线程并发发送消息都无法保证消息是顺序到达 Broker 的消息源头的顺序性都无法保证后续流程的顺序就更是无从谈起了。 存储顺序性 消息发送到 Broker 必须按照到达顺序有序存储这一点很容易实现。因为队列天生是先进先出的但是一个 Topic 下可能会有多个队列此时保证相同 MessageGroup 的消息被发送到同一个队列是重点这个可以通过计算 MessageGroup 哈希值对队列数取模实现。 投递顺序性 消费者来拉取顺序消息时Broker 得知道之前投递的消息是否全部被消费完了如果还在消费中则当前队列不能再继续投递了消费者必须等待其它拉取到消息的消费者消费完毕后才能接着拉取后面的消息。 消费顺序性 消费者在拉取到消息后必须保证单线程顺序消费如果并发消费也是不能保证顺序的。 设计实现 生产端的顺序需要调用方自行保证这个没啥好说的。 存储端的顺序队列本身能保证先进先出只要保证相同 MessageGroup 投递到同一个目标队列即可。Proxy 用一个叫 SendMessageQueueSelector 的组件对消息的 MessageGroup 计算一致性哈希后取模得到目标队列。 Override public AddressableMessageQueue select(ProxyContext ctx, MessageQueueView messageQueueView) {try {apache.rocketmq.v2.Message message request.getMessages(0);String shardingKey null;if (request.getMessagesCount() 1) {// 分片键 也就是MessageGroupshardingKey message.getSystemProperties().getMessageGroup();}AddressableMessageQueue targetMessageQueue;if (StringUtils.isNotEmpty(shardingKey)) {// 根据写队列数计算一致性哈希ListAddressableMessageQueue writeQueues messageQueueView.getWriteSelector().getQueues();int bucket Hashing.consistentHash(shardingKey.hashCode(), writeQueues.size());targetMessageQueue writeQueues.get(bucket);} else {targetMessageQueue messageQueueView.getWriteSelector().selectOne(false);}return targetMessageQueue;} catch (Exception e) {return null;} }顺序消费 RocketMQ 5.0 消费者在启动时就会和 Proxy 建立 TCP 长连接查询订阅的 Topic 路由数据TopicRouteData。紧接着调用 telemetry 接口发送 SETTINGS 命令同步设置要同步哪些设置呢 RocketMQ 5.0 的客户端 SDK 要做轻量化客户端最好啥也不知道一些策略和配置最好靠服务端下发同步设置就是干这个的。 请求体表明了消费者所属的消费组以及消息订阅配置 client_type: PUSH_CONSUMER access_point {scheme: IPv4addresses {host: 127.0.0.1port: 8081} } request_timeout {seconds: 3 } subscription {group {name: G_fifo}subscriptions {topic {name: fifo}expression {type: TAGexpression: *}} } user_agent {language: JAVAversion: 5.0.4platform: Mac OS X 10.16hostname: localhost-5.local }Proxy 返回的设置信息包含消息消费失败的重试策略、消费者是否要顺序消费、单次最大消息拉取数量、以及无消息时的长轮询挂起时间。 client_type: PUSH_CONSUMER access_point {scheme: IPv4addresses {host: 127.0.0.1port: 8081} } backoff_policy {max_attempts: 17customized_backoff {next {seconds: 1}next {seconds: 5}next {seconds: 10}next {seconds: 30}next {seconds: 60}next {seconds: 120}next {seconds: 180}next {seconds: 240}next {seconds: 300}next {seconds: 360}next {seconds: 420}next {seconds: 480}next {seconds: 540}next {seconds: 600}next {seconds: 1200}next {seconds: 1800}next {seconds: 3600}next {seconds: 7200}} } request_timeout {seconds: 3 } subscription {group {name: G_fifo}subscriptions {topic {name: fifo}expression {type: TAGexpression: *}}fifo: truereceive_batch_size: 32long_polling_timeout {seconds: 20} } user_agent {language: JAVAversion: 5.0.4platform: Mac OS X 10.16hostname: localhost-5.local } metric { } 对于顺序消息来说最重要的配置项就是fifo: true它决定了消费者是多线程并发消费还是单线程串行消费消费者会根据配置创建对应的 ConsumeService。顾名思义FifoConsumeService 是用来消费顺序消息的StandardConsumeService 用来消费普通消息。 private ConsumeService createConsumeService() {final ScheduledExecutorService scheduler this.getClientManager().getScheduler();if (pushSubscriptionSettings.isFifo()) {return new FifoConsumeService(clientId, messageListener, consumptionExecutor, this, scheduler);}return new StandardConsumeService(clientId, messageListener, consumptionExecutor, this, scheduler); }FifoConsumeService 会按照顺序消费拉取到的消息而且会等待上一个消息消费完毕才会去消费下一个。 Override public void consume(ProcessQueue pq, ListMessageViewImpl messageViews) {// 基于迭代器消费consumeIteratively(pq, messageViews.iterator()); }public void consumeIteratively(ProcessQueue pq, IteratorMessageViewImpl iterator) {if (!iterator.hasNext()) {return;}final MessageViewImpl messageView iterator.next();if (messageView.isCorrupted()) {// 消息损坏consumeIteratively(pq, iterator);return;}// 触发MessageListener消费消息final ListenableFutureConsumeResult future0 consume(messageView);// 处理消费结果ListenableFutureVoid future Futures.transformAsync(future0, result - pq.eraseFifoMessage(messageView,result), MoreExecutors.directExecutor());// 等待消息消费完毕再递归消费下一个消息future.addListener(() - consumeIteratively(pq, iterator), MoreExecutors.directExecutor()); }对于顺序消息来说消费失败是个麻烦事儿。因为要保证消息的顺序上一个消息没消费成功下一个消息就无法被消费容易导致消息堆积。RocketMQ 的策略是重试几次还是不行就发到死信队列方法是ProcessQueueImpl#eraseFifoMessage Override public ListenableFutureVoid eraseFifoMessage(MessageViewImpl messageView, ConsumeResult consumeResult) {statsConsumptionResult(consumeResult);final RetryPolicy retryPolicy consumer.getRetryPolicy();// 最大重试次数final int maxAttempts retryPolicy.getMaxAttempts();int attempt messageView.getDeliveryAttempt();final MessageId messageId messageView.getMessageId();final ConsumeService service consumer.getConsumeService();final ClientId clientId consumer.getClientId();// 失败且没超过最大重试次数if (ConsumeResult.FAILURE.equals(consumeResult) attempt maxAttempts) {// 下一个延迟时间final Duration nextAttemptDelay retryPolicy.getNextAttemptDelay(attempt);attempt messageView.incrementAndGetDeliveryAttempt();log.debug(Prepare to redeliver the fifo message because of the consumption failure, maxAttempt{}, attempt{}, mq{}, messageId{}, nextAttemptDelay{}, clientId{}, maxAttempts, attempt, mq,messageId, nextAttemptDelay, clientId);// 丢到线程池定时调度执行final ListenableFutureConsumeResult future service.consume(messageView, nextAttemptDelay);return Futures.transformAsync(future, result - eraseFifoMessage(messageView, result),MoreExecutors.directExecutor());}boolean ok ConsumeResult.SUCCESS.equals(consumeResult);// 超过重试次数还是失败 发到死信队列ListenableFutureVoid future ok ? ackMessage(messageView) : forwardToDeadLetterQueue(messageView);future.addListener(() - evictCache(messageView), consumer.getConsumptionExecutor());return future; }至此消费端对于顺序消息的处理就结束了。核心是如果消费组配置的是顺序投递消费者在拉取到消息后会单线程同步消费消息。 顺序投递 消费者的顺序性还是比较容易保证的整个链路里最复杂的必须是 Broker 投递的顺序性因为 Broker 得记录队列里上一批拉取到的消息是否全部消费完根据此来判断要不要继续投递后面的消息。 Broker 引入一个新组件 ConsumerOrderInfoManager来管理消费者顺序消息的消费情况。它继承了 ConfigManager所以支持数据的持久化。 它内部使用一个双层嵌套 Map 来记录消费组对于某个队列的顺序消息消费情况所谓的数据持久化就是把这个 Map 序列化成 JSON 后落地到磁盘。 private ConcurrentHashMapString/* topicgroup*/, ConcurrentHashMapInteger/*queueId*/, OrderInfo table new ConcurrentHashMap(128);落盘的文件路径是{storeHome}/config/consumerOrderInfo.json内容大概长这样 {table:{fifoG_fifo:{0:{cm:1,i:60000,l:1703644544701,o:[460],oc:{},popTime:1703644544701}}} }核心是 OrderInfo 类它记录了消费者针对某个队列拉取到的最新一批顺序消息的消费情况。offsetList 记录了消息的偏移量可以根据此来定位消息commitOffsetBit 记录了各消息的消费情况它是一个位图消息提交以后会把对应的比特位设为1。 public static class OrderInfo {// 各消息的偏移量(增量编码)private ListLong offsetList;// 消耗次数private int consumedCount;// 最近一次消费的时间戳 其实是拉取时间private long lastConsumeTimestamp;// 消息提交位图private long commitOffsetBit; }消费者在拉取消息时Broker 会给投递的这一批顺序消息记录一个 OrderInfo private long popMsgFromQueue() {......if (isOrder) {// 顺序消息 给拉取到的这一批消息记录OrderInfoint count brokerController.getConsumerOrderInfoManager().update(topic,requestHeader.getConsumerGroup(),queueId, getMessageTmpResult.getMessageQueueOffset());this.brokerController.getConsumerOffsetManager().commitOffset(channel.remoteAddress().toString(),requestHeader.getConsumerGroup(), topic, queueId, offset);ExtraInfoUtil.buildOrderCountInfo(orderCountInfo, isRetry, queueId, count);} else {// 普通消息 追加CheckPointappendCheckPoint(requestHeader, topic, reviveQid, queueId, offset, getMessageTmpResult, popTime, this.brokerController.getBrokerConfig().getBrokerName());}...... }方法是ConsumerOrderInfoManager#update主要是构建一个 OrderInfo 对象存入 Map public int update(String topic, String group, int queueId, ListLong msgOffsetList) {String key topic TOPIC_GROUP_SEPARATOR group;ConcurrentHashMapInteger/*queueId*/, OrderInfo qs table.get(key);if (qs null) {qs new ConcurrentHashMap(16);ConcurrentHashMapInteger/*queueId*/, OrderInfo old table.putIfAbsent(key, qs);if (old ! null) {qs old;}}OrderInfo orderInfo qs.get(queueId);// 转增量编码ListLong simple OrderInfo.simpleO(msgOffsetList);if (orderInfo ! null simple.get(0).equals(orderInfo.getOffsetList().get(0))) {if (simple.equals(orderInfo.getOffsetList())) {orderInfo.setConsumedCount(orderInfo.getConsumedCount() 1);} else {// reset, because msgs are changed.orderInfo.setConsumedCount(0);}orderInfo.setLastConsumeTimestamp(System.currentTimeMillis());orderInfo.setOffsetList(simple);orderInfo.setCommitOffsetBit(0);} else {// 构建新的OrderInfo覆盖掉上一批orderInfo new OrderInfo();orderInfo.setOffsetList(simple);orderInfo.setLastConsumeTimestamp(System.currentTimeMillis());orderInfo.setConsumedCount(0);orderInfo.setCommitOffsetBit(0);qs.put(queueId, orderInfo);}return orderInfo.getConsumedCount(); }假设此时又有其它消费者来拉取同一队列的消息Broker 会先定位到对应的 OrderInfo再判断是否要继续投递后面的消息 private long popMsgFromQueue() {......if (isOrder brokerController.getConsumerOrderInfoManager().checkBlock(topic,requestHeader.getConsumerGroup(), queueId, requestHeader.getInvisibleTime())) {// 之前拉取的一批消息还没全部commit不能拉取新消息return this.brokerController.getMessageStore().getMaxOffsetInQueue(topic, queueId) - offset restNum;}...... }方法是ConsumerOrderInfoManager#checkBlock只有当下面两个条件都满足Broker 才会拒绝投递 上一批消息的拉取时间还没超过消息的不可见时间(60s)上一批消息还没全部提交 public boolean checkBlock(String topic, String group, int queueId, long invisibleTime) {String key topic TOPIC_GROUP_SEPARATOR group;ConcurrentHashMapInteger/*queueId*/, OrderInfo qs table.get(key);if (qs null) {qs new ConcurrentHashMap(16);ConcurrentHashMapInteger/*queueId*/, OrderInfo old table.putIfAbsent(key, qs);if (old ! null) {qs old;}}OrderInfo orderInfo qs.get(queueId);if (orderInfo null) {// 当前队列还没拉取过可以直接拉return false;}// 距离最后一次消费时间是否小于不可见时间60sboolean isBlock System.currentTimeMillis() - orderInfo.getLastConsumeTimestamp() invisibleTime;/*** 没超过不可见时间则必须等这一批消息全部commit才能继续拉取*/return isBlock !orderInfo.isDone(); }判断消息是否全部提交的方法是OrderInfo#isDone其实就是判断 commitOffsetBit 位图对应的位是否全部为1 public boolean isDone() {if (offsetList null || offsetList.isEmpty()) {return true;}int num offsetList.size();for (byte i 0; i num; i) {if ((commitOffsetBit (1L i)) 0) {return false;}}return true; }消息投递后消费者会按照顺序串行消费并上报消费结果即 ack 消息。Broker 在处理消息的 ack 请求时会判断 ack 的是不是顺序消息如果是就会更新 OrderInfo 位图。然后再判断 OrderInfo 里的这一批消息是否全部提交如果是就提交消费位点同时通知其它被挂起的请求拉取消息。 private RemotingCommand processRequest(){......if (rqId KeyBuilder.POP_ORDER_REVIVE_QUEUE) {// 顺序消息String lockKey requestHeader.getTopic() PopAckConstants.SPLIT requestHeader.getConsumerGroup() PopAckConstants.SPLIT requestHeader.getQueueId();long oldOffset this.brokerController.getConsumerOffsetManager().queryOffset(requestHeader.getConsumerGroup(),requestHeader.getTopic(), requestHeader.getQueueId());if (requestHeader.getOffset() oldOffset) {return response;}// 加锁while (!this.brokerController.getPopMessageProcessor().getQueueLockManager().tryLock(lockKey)) {}try {oldOffset this.brokerController.getConsumerOffsetManager().queryOffset(requestHeader.getConsumerGroup(),requestHeader.getTopic(), requestHeader.getQueueId());if (requestHeader.getOffset() oldOffset) {return response;}// 更新位图long nextOffset brokerController.getConsumerOrderInfoManager().commitAndNext(requestHeader.getTopic(), requestHeader.getConsumerGroup(),requestHeader.getQueueId(), requestHeader.getOffset());if (nextOffset -1) {// 这一批顺序消息全部消费掉了提交消费位点this.brokerController.getConsumerOffsetManager().commitOffset(channel.remoteAddress().toString(),requestHeader.getConsumerGroup(), requestHeader.getTopic(),requestHeader.getQueueId(),nextOffset);// 通知其它被挂起的请求开始拉取消息this.brokerController.getPopMessageProcessor().notifyMessageArriving(requestHeader.getTopic(), requestHeader.getConsumerGroup(),requestHeader.getQueueId());} else if (nextOffset -1) {String errorInfo String.format(offset is illegal, key:%s, old:%d, commit:%d, next:%d, %s,lockKey, oldOffset, requestHeader.getOffset(), nextOffset, channel.remoteAddress());POP_LOGGER.warn(errorInfo);response.setCode(ResponseCode.MESSAGE_ILLEGAL);response.setRemark(errorInfo);return response;}} finally {this.brokerController.getPopMessageProcessor().getQueueLockManager().unLock(lockKey);}return response;}...... }更新位图的方法是ConsumerOrderInfoManager#commitAndNext它会把 commitOffsetBit 对应的比特位设为1然后返回值代表消息是否全被消费掉了通知外层要提交消费位点。-2 代表还没消费完、大于等于0表示需要提交消费位点。 public long commitAndNext(String topic, String group, int queueId, long offset) {String key topic TOPIC_GROUP_SEPARATOR group;ConcurrentHashMapInteger/*queueId*/, OrderInfo qs table.get(key);if (qs null) {return offset 1;}OrderInfo orderInfo qs.get(queueId);if (orderInfo null) {log.warn(OrderInfo is null, {}, {}, {}, key, offset, orderInfo);return offset 1;}ListLong offsetList orderInfo.getOffsetList();if (offsetList null || offsetList.isEmpty()) {log.warn(OrderInfo is empty, {}, {}, {}, key, offset, orderInfo);return -1;}Long first offsetList.get(0);int i 0, size offsetList.size();for (; i size; i) {long temp;if (i 0) {temp first;} else {temp first offsetList.get(i);}if (offset temp) {break;}}if (i size) {log.warn(OrderInfo not found commit offset, {}, {}, {}, key, offset, orderInfo);return -1;}// 更新Commit位图 对应位设为1orderInfo.setCommitOffsetBit(orderInfo.getCommitOffsetBit() | (1L i));if (orderInfo.isDone()) {// 这一批消息全部Commit了if (size 1) {return offsetList.get(0) 1;} else {return offsetList.get(size - 1) first 1;}}// 无需commitreturn -2; }尾巴 RocketMQ 顺序消息需要多端共同保证包括生产端顺序性、存储端顺序性、投递端顺序性、消费端顺序性。5.0 和 4.x 最大的区别就是Pop 模式下的消息模型允许消费者消费所有队列Broker 投递的顺序性是实现难点。RocketMQ 给出的解决方案是用一个嵌套 Map 维护 OrderInfo用来管理消费组针对某个队列的消费情况。Broker 在投递消息前会针对这一批消息构建一个 OrderInfo 对象存储下来在收到消费者发送的 ack 请求时更新对应的位图。下一个消费者来拉取消息时Broker 会判断对应的 OrderInfo 里的消息是否全部提交如果还有消息没提交是不会投递后面的消息的以此来保证消息投递的顺序性。
http://www.zqtcl.cn/news/244093/

相关文章:

  • 社交网站wap模板wordpress网址导航插件
  • 沈阳快速建站公司有哪些国外做二手服装网站
  • 手机如何建立网站平台seo比较好的优化
  • 电商网站建设外包禅城南庄网站制作
  • 哈尔滨企业网站开发报价免费php网站源码
  • 东莞市公司网站建设淄博网站制作营销
  • 企业网站无线端怎么做手机网站做成app
  • 让他人建设网站需要提供的材料可在哪些网站做链接
  • 外贸公司做网站3d建模好学吗
  • dedecms新网站 上传到万网的空间上海新媒体运营公司排名
  • 包装东莞网站建设0769三层网络架构
  • 淘客网站自己做固安建站公司
  • 咸阳学校网站建设联系电话网络app开发网站建设价格
  • 没网站怎么做淘宝客网站建设耂首先金手指
  • 网站带做收录排名淘外网站怎么做
  • 网站建设分金手指排名五申请邮箱账号注册
  • 餐饮加盟网站建设字体怎么安装wordpress
  • 网站建设与维护培训凡科和有赞哪个好用
  • 景区网站的作用长春新冠最新情况
  • 个人网站上传有啥要求wordpress 浏览记录
  • appcan 手机网站开发wordpress首页音乐
  • 杭州响应式网站案例建筑工程网站建站方案
  • 网站访客抓取国内网站搭建
  • 凡科网站做的好不好太原网页
  • 十堰商城网站建设国外效果图网站
  • 怎么登陆建设工程网站泉州网红
  • 哈尔滨队网站网页美工跨境电商是什么意思
  • 网站规划与建设课程推广型网站建设软件
  • 山东网站建设系统网站设计哪家更好
  • 网络推广有哪些网站网络推广公司联系昔年下拉