和平县做网站,wordpress清除缓存,广州专业网站制作设计,wordpress 离线发布消费者启动时#xff0c;订阅相应的topic并加入到消费者组。消费者将消费进度信息存储到Broker中#xff0c;包括当前消费到的消息的offset、队列信息等。消费者定时从Broker中获取topic的路由信息#xff08;包括消息队列、broker信息等#xff09;#xff0c;并更新本地…消费者启动时订阅相应的topic并加入到消费者组。消费者将消费进度信息存储到Broker中包括当前消费到的消息的offset、队列信息等。消费者定时从Broker中获取topic的路由信息包括消息队列、broker信息等并更新本地缓存。当消费者组内新增或删除消费者时触发重平衡事件即重新分配消息队列给消费者。重平衡事件由消费组内最先触发的消费者发起通知Broker开始重平衡。Broker接到重平衡请求后向消费者组内的其他消费者发送通知。消费者接收到重平衡通知后开始重新计算自己所分配到的队列根据拉取到的消息队列的分配策略重新分配队列。消费者重新分配完队列后将分配结果发送给Broker并更新本地缓存的路由信息。Broker接收到所有消费者发送的分配结果后按照分配结果更新自己记录的消费组的消费进度信息包括消费进度、offset等信息。当消费者消费完消息队列中的所有消息后将消费进度信息更新到Broker中以便下次重平衡时可以根据消费进度信息进行分配。
以下是重平衡代码
private boolean rebalanceByTopic(final String topic, final boolean isOrder) {boolean balanced true;switch (messageModel) {//广播模式不需要重平衡case BROADCASTING: {SetMessageQueue mqSet this.topicSubscribeInfoTable.get(topic);if (mqSet ! null) {boolean changed this.updateProcessQueueTableInRebalance(topic, mqSet, isOrder);if (changed) {this.messageQueueChanged(topic, mqSet, mqSet);log.info(messageQueueChanged {} {} {} {}, consumerGroup, topic, mqSet, mqSet);}balanced mqSet.equals(getWorkingMessageQueue(topic));} else {this.messageQueueChanged(topic, Collections.MessageQueueemptySet(), Collections.MessageQueueemptySet());log.warn(doRebalance, {}, but the topic[{}] not exist., consumerGroup, topic);}break;}case CLUSTERING: {//内存中拿出所有queueSetMessageQueue mqSet this.topicSubscribeInfoTable.get(topic);//GET_CONSUMER_LIST_BY_GROUP这个请求去ns拿出所有的客户端ListString cidAll this.mQClientFactory.findConsumerIdList(topic, consumerGroup);if (null mqSet) {if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {this.messageQueueChanged(topic, Collections.MessageQueueemptySet(), Collections.MessageQueueemptySet());log.warn(doRebalance, {}, but the topic[{}] not exist., consumerGroup, topic);}}if (null cidAll) {log.warn(doRebalance, {} {}, get consumer id list failed, consumerGroup, topic);}if (mqSet ! null cidAll ! null) {ListMessageQueue mqAll new ArrayListMessageQueue();mqAll.addAll(mqSet);//将两个list进行排序Collections.sort(mqAll);Collections.sort(cidAll);//重平衡策略AllocateMessageQueueStrategy strategy this.allocateMessageQueueStrategy;ListMessageQueue allocateResult null;try {//策略进行重平衡allocateResult strategy.allocate(this.consumerGroup,this.mQClientFactory.getClientId(),mqAll,cidAll);} catch (Throwable e) {log.error(allocate message queue exception. strategy name: {}, ex: {}, strategy.getName(), e);return false;}SetMessageQueue allocateResultSet new HashSetMessageQueue();if (allocateResult ! null) {allocateResultSet.addAll(allocateResult);}//对比queue是否有变化并且更新到processQueueTable中boolean changed this.updateProcessQueueTableInRebalance(topic, allocateResultSet, isOrder);if (changed) {log.info(client rebalanced result changed. allocateMessageQueueStrategyName{}, group{}, topic{}, clientId{}, mqAllSize{}, cidAllSize{}, rebalanceResultSize{}, rebalanceResultSet{},strategy.getName(), consumerGroup, topic, this.mQClientFactory.getClientId(), mqSet.size(), cidAll.size(),allocateResultSet.size(), allocateResultSet);this.messageQueueChanged(topic, mqSet, allocateResultSet);}balanced allocateResultSet.equals(getWorkingMessageQueue(topic));}break;}default:break;}return balanced;}