当前位置: 首页 > news >正文

网站建设和网络优化请示卫辉网站建设

网站建设和网络优化请示,卫辉网站建设,有人利用婚恋网站做微商,网站做的好的医院RocketMQ一个消费者组中可以有多个消费者, 在集群模式下他们共同消费topic下的所有消息, RocketMQ规定一个消息队列仅能被一个消费者消费, 但是一个消费者可以同时消费多个消息队列。需要负载均衡服务RebalanceService来进行消息队列分配的重平衡。使用负载均衡服务RebalanceSe…RocketMQ一个消费者组中可以有多个消费者, 在集群模式下他们共同消费topic下的所有消息, RocketMQ规定一个消息队列仅能被一个消费者消费, 但是一个消费者可以同时消费多个消息队列。需要负载均衡服务RebalanceService来进行消息队列分配的重平衡。使用负载均衡服务RebalanceService来专门处理多个消息队列和消费者的对应关系, 如何分配消息队列给这些消费者。 RocketMQ消费者负载均衡服务RebalanceService的入口代码 文章目录 1.负载均衡或者重平衡1.1 RebalanceService自动重平衡1.2 Consumer启动重平衡1.3 Broker请求重平衡1.4 Broker处理心跳请求1.4.1 registerConsumer注册消费者1.4.2 客户端处理重平衡请求 1.负载均衡或者重平衡 有三种情况会触发Consumer进行负载均衡或者说重平衡。 RebalanceService服务是一个线程任务, 由MQClientInstance启动, 每隔20s自动进行一次自动负载均衡。Broker触发的重平衡: Broker收到心跳请求之后如果发现消息中有新的consumer连接或者consumer订阅了新的topic或者移除了topic的订阅, Broker发送Code为NOTIFY_CONSUMER_IDS_CHANGED的请求给该group下面的所有Consumer, 要求进行一次负载均衡。如果某个客户端连接出现连接异常事件EXCEPTION、连接断开事件CLOSE、或者连接闲置事件IDLE, 则Broker同样会发送重平衡请求给消费者组下面的所有消费者。 新的Consumer服务启动的时候, 主动调用rebalanceImmediately唤醒负载均衡服务rebalanceService。 1.1 RebalanceService自动重平衡 RebalanceService#run方法, 最多每隔20s执行一次重平衡。 /*** RebalanceServicede 方法*/ Override public void run() {log.info(this.getServiceName() service started);/** 运行时逻辑* 如果服务没有停止则在死循环中执行负载均衡*/while (!this.isStopped()) {//等待运行默认最多等待20s可以被唤醒this.waitForRunning(waitInterval);//执行重平衡操作this.mqClientFactory.doRebalance();}log.info(this.getServiceName() service end); }1.2 Consumer启动重平衡 新的Consumer服务启动的时候, 主动调用rebalanceImmediately唤醒负载均衡服务rebalanceService。 /*** MQClientInstance的方法* 立即重平衡*/ public void rebalanceImmediately() {//唤醒重平衡服务立即重平衡this.rebalanceService.wakeup(); }1.3 Broker请求重平衡 Broker收到心跳请求之后如果发现消息中有新的consumer连接或者consumer订阅了新的topic或者移除了topic的订阅, Broker发送Code为NOTIFY_CONSUMER_IDS_CHANGED的请求给该group下面的所有Consumer, 要求进行一次负载均衡。如果某个客户端连接出现连接异常事件EXCEPTION、连接断开事件CLOSE、或者连接闲置事件IDLE, 则Broker同样会发送重平衡请求给消费者组下面的所有消费者。处理入口方法为ClientHousekeepingService# doChannelCloseEvent方法。 新的Consumer和Producer启动的时候, 发送心跳给Broker, MQClientInstance的内部的服务也会定时30s发送心跳信息给Broker。 心跳请求的Code为HEART_BEAT, 该请求最终被Broker的ClientManageProcessor处理器处理。 /*** ClientManageProcessor的方法, Broker处理心跳方法*/ Override public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request)throws RemotingCommandException {switch (request.getCode()) {//客户端心跳请求case RequestCode.HEART_BEAT://客户端心跳请求return this.heartBeat(ctx, request);case RequestCode.UNREGISTER_CLIENT:return this.unregisterClient(ctx, request);case RequestCode.CHECK_CLIENT_CONFIG:return this.checkClientConfig(ctx, request);default:break;}return null; }1.4 Broker处理心跳请求 Broker的ClientManageProcessor#heartBeat该方法用于Broker处理来自客户端, consumer和producer的请求。 解码消息中的信息成为HeartbeatData对象。循环遍历处理consumerDataSet集合, 对ConsumerData信息进行注册或者更改, 如果consumer信息修改的话, Broker会发送NOTIFY_CONSUMER_IDS_CHANGED请求给同组的所有consumer客户端, 进行重平衡操作。循环遍历处理consumerDataSet集合, 对ProducerData信息注册或者更改。 /*** ClientManageProcessor的方法* p* 处理客户端心跳请求*/ public RemotingCommand heartBeat(ChannelHandlerContext ctx, RemotingCommand request) {//构建响应命令对象RemotingCommand response RemotingCommand.createResponseCommand(null);//解码HeartbeatData heartbeatData HeartbeatData.decode(request.getBody(), HeartbeatData.class);//构建客户端连接信息对象ClientChannelInfo clientChannelInfo new ClientChannelInfo(ctx.channel(),heartbeatData.getClientID(),request.getLanguage(),request.getVersion());/** 1 循环遍历处理consumerDataSet即处理consumer的心跳信息*/for (ConsumerData data : heartbeatData.getConsumerDataSet()) {//查找broker缓存的当前消费者组的订阅组配置SubscriptionGroupConfig subscriptionGroupConfig this.brokerController.getSubscriptionGroupManager().findSubscriptionGroupConfig(data.getGroupName());boolean isNotifyConsumerIdsChangedEnable true;//如果已存在订阅组if (null ! subscriptionGroupConfig) {//当consumer发生改变的时候是否支持通知同组的所有consumer默认true即支持isNotifyConsumerIdsChangedEnable subscriptionGroupConfig.isNotifyConsumerIdsChangedEnable();int topicSysFlag 0;if (data.isUnitMode()) {topicSysFlag TopicSysFlag.buildSysFlag(false, true);}//尝试创建重试topicString newTopic MixAll.getRetryTopic(data.getGroupName());this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(newTopic,subscriptionGroupConfig.getRetryQueueNums(),PermName.PERM_WRITE | PermName.PERM_READ, topicSysFlag);}/** 注册consumer返回consumer信息是否已发生改变* 如果发生了改变Broker会发送NOTIFY_CONSUMER_IDS_CHANGED请求给同组的所有consumer客户端要求进行重平衡操作*/boolean changed this.brokerController.getConsumerManager().registerConsumer(data.getGroupName(),clientChannelInfo,data.getConsumeType(),data.getMessageModel(),data.getConsumeFromWhere(),data.getSubscriptionDataSet(),isNotifyConsumerIdsChangedEnable);if (changed) {//如果consumer信息发生了改变打印日志log.info(registerConsumer info changed {} {},data.toString(),RemotingHelper.parseChannelRemoteAddr(ctx.channel()));}}/** 2 循环遍历处理producerDataSet即处理producer的心跳信息*/for (ProducerData data : heartbeatData.getProducerDataSet()) {/** 注册producer*/this.brokerController.getProducerManager().registerProducer(data.getGroupName(),clientChannelInfo);}//返回响应response.setCode(ResponseCode.SUCCESS);response.setRemark(null);return response; }1.4.1 registerConsumer注册消费者 循环遍历处理consumerDataSet集合, 对ConsumerData信息进行注册或者更改, 如果consumer信息修改的话, Broker会发送NOTIFY_CONSUMER_IDS_CHANGED请求给同组的所有consumer客户端, 进行重平衡操作。 /*** ConsumerManager的方法* p* 注册consumer返回consumer信息是否已发生改变* 如果发生了改变Broker会发送NOTIFY_CONSUMER_IDS_CHANGED请求给同组的所有consumer客户端要求进行重平衡操作** param group 消费者组* param clientChannelInfo 客户端连接信息* param consumeType 消费类型PULL or PUSH* param messageModel 消息模式集群 or 广播* param consumeFromWhere 启动消费位置* param subList 订阅信息数据* param isNotifyConsumerIdsChangedEnable 一个consumer改变时是否通知该consumergroup中的所有consumer进行重平衡* return 是否重平衡*/ public boolean registerConsumer(final String group, final ClientChannelInfo clientChannelInfo,ConsumeType consumeType, MessageModel messageModel, ConsumeFromWhere consumeFromWhere,final SetSubscriptionData subList, boolean isNotifyConsumerIdsChangedEnable) {//获取当前group对应的ConsumerGroupInfoConsumerGroupInfo consumerGroupInfo this.consumerTable.get(group);//如果为null那么新建一个ConsumerGroupInfo并存入consumerTableif (null consumerGroupInfo) {ConsumerGroupInfo tmp new ConsumerGroupInfo(group, consumeType, messageModel, consumeFromWhere);ConsumerGroupInfo prev this.consumerTable.putIfAbsent(group, tmp);consumerGroupInfo prev ! null ? prev : tmp;}/** 1 更新连接*/boolean r1 consumerGroupInfo.updateChannel(clientChannelInfo, consumeType, messageModel,consumeFromWhere);/** 2 更新订阅信息*/boolean r2 consumerGroupInfo.updateSubscription(subList);/** 2 如果连接或者订阅信息有更新并且允许通知那么通知该consumergroup中的所有consumer进行重平衡*/if (r1 || r2) {if (isNotifyConsumerIdsChangedEnable) {//CHANGE事件this.consumerIdsChangeListener.handle(ConsumerGroupEvent.CHANGE, group, consumerGroupInfo.getAllChannel());}}// 3.注册订阅信息到ConsumerFilterManagerthis.consumerIdsChangeListener.handle(ConsumerGroupEvent.REGISTER, group, subList);return r1 || r2; }1.updateChannel更新连接 更新ConsumerGroup组对应的ConsumerGroupInfo的一些属性, 还会判断当前连接是否是新连接, 如果Broker此前没有该连接的信息, 那么表示有新的consumer连接到此broker, 需要通知当前ConsumerGroup的所有consumer进行重平衡。 /*** ConsumerGroupInfo的方法* p* 更新连接** param infoNew 新连接信息* param consumeType 消费类型PULL or PUSH* param messageModel 消息模式集群 or 广播* param consumeFromWhere 启动消费位置* return 是否通知*/ public boolean updateChannel(final ClientChannelInfo infoNew, ConsumeType consumeType,MessageModel messageModel, ConsumeFromWhere consumeFromWhere) {boolean updated false;//更新信息this.consumeType consumeType;this.messageModel messageModel;this.consumeFromWhere consumeFromWhere;//根据当前连接获取channelInfoTable缓存中的连接信息ClientChannelInfo infoOld this.channelInfoTable.get(infoNew.getChannel());//如果缓存中的连接信息为null说明当前连接是一个新连接if (null infoOld) {//存入缓存ClientChannelInfo prev this.channelInfoTable.put(infoNew.getChannel(), infoNew);//长期按没有该连接信息那么表示有新的consumer连接到此broekr那么需要通知if (null prev) {log.info(new consumer connected, group: {} {} {} channel: {}, this.groupName, consumeType,messageModel, infoNew.toString());updated true;}infoOld infoNew;} else {//异常情况if (!infoOld.getClientId().equals(infoNew.getClientId())) {log.error([BUG] consumer channel exist in broker, but clientId not equal. GROUP: {} OLD: {} NEW: {} ,this.groupName,infoOld.toString(),infoNew.toString());this.channelInfoTable.put(infoNew.getChannel(), infoNew);}}//更新更新时间this.lastUpdateTimestamp System.currentTimeMillis();infoOld.setLastUpdateTimestamp(this.lastUpdateTimestamp);return updated; }2.updateSubscription更新订阅信息 更新此ConsumerGroup组对应的订阅信息集合, 如果存在新增订阅的topic, 或者移除某个topic的订阅, 那么需要通知当前ConsumerGroup的所有consumer进行重平衡。 RocketMQ需要保证组内的所有消费者订阅的topic都必须一致。 方法首先遍历当前请求传递的订阅信息集合, 然后对于每个订阅的topic从subscriptionTable缓存中尝试获取, 获取不到则表示新增了topic订阅信息, 存入subscriptionTable。遍历subscriptionTable集合, 判断每一个topic是否存在于当前请求传递的订阅信息集合中, 如果不存在, 表示consumer移除了topic的订阅, 那么当前topic的订阅信息会从subscriptionTable集合中被移除。 /*** ConsumerGroupInfo的方法* 更新订阅信息** param subList 订阅信息集合*/ public boolean updateSubscription(final SetSubscriptionData subList) {boolean updated false;//遍历订阅信息集合for (SubscriptionData sub : subList) {//根据订阅的topic在ConsumerGroup的subscriptionTable缓存中此前的订阅信息SubscriptionData old this.subscriptionTable.get(sub.getTopic());//如果此前没有关于该topic的订阅信息那么表示此topic为新增订阅if (old null) {//存入subscriptionTableSubscriptionData prev this.subscriptionTable.putIfAbsent(sub.getTopic(), sub);//此前没有关于该topic的订阅信息那么表示此topic为新增订阅那么需要通知if (null prev) {updated true;log.info(subscription changed, add new topic, group: {} {},this.groupName,sub.toString());}} else if (sub.getSubVersion() old.getSubVersion()) {//更新数据if (this.consumeType ConsumeType.CONSUME_PASSIVELY) {log.info(subscription changed, group: {} OLD: {} NEW: {},this.groupName,old.toString(),sub.toString());}this.subscriptionTable.put(sub.getTopic(), sub);}}/** 遍历ConsumerGroup的subscriptionTable缓存*/IteratorEntryString, SubscriptionData it this.subscriptionTable.entrySet().iterator();while (it.hasNext()) {EntryString, SubscriptionData next it.next();//获取此前订阅的topicString oldTopic next.getKey();boolean exist false;//判断当前的subList是否存在该topic的订阅信息for (SubscriptionData sub : subList) {//如果存在则退出循环if (sub.getTopic().equals(oldTopic)) {exist true;break;}}//当前的subList不存在该topic的订阅信息说明consumer移除了对于该topic的订阅if (!exist) {log.warn(subscription changed, group: {} remove topic {} {},this.groupName,oldTopic,next.getValue().toString());//移除数据it.remove();//那么需要通知updated true;}}this.lastUpdateTimestamp System.currentTimeMillis();return updated; }3.consumerIdsChangeListener.handle监听器通知 通知监听器处理对应的事件, 事件为ConsumerGroupEvent.CHANGE。 如果运行通知, 则遍历该ConsumerGroup的连接集合, 然后对每个连接调用notifyConsumerIdsChanged方法通知对应的客户端消费者执行负载均衡。 notifyConsumerIdsChanged: broker发送客户端一个重平衡请求, Code为NOTIFY_CONSUMER_IDS_CHANGED。 /*** DefaultConsumerIdsChangeListener的方法* * 处理监听到的事件* param event 事件* param group 消费者组* param args 参数*/ Override public void handle(ConsumerGroupEvent event, String group, Object... args) {if (event null) {return;}switch (event) {//改变事件需要通知该消费者组的每一个消费者case CHANGE:if (args null || args.length 1) {return;}//获取参数ListChannel channels (ListChannel) args[0];//如果允许通知if (channels ! null brokerController.getBrokerConfig().isNotifyConsumerIdsChangedEnable()) {//遍历连接集合for (Channel chl : channels) {//通知该消费者客户端执行负载均衡this.brokerController.getBroker2Client().notifyConsumerIdsChanged(chl, group);}}break;case UNREGISTER:this.brokerController.getConsumerFilterManager().unRegister(group);break;case REGISTER:if (args null || args.length 1) {return;}CollectionSubscriptionData subscriptionDataList (CollectionSubscriptionData) args[0];this.brokerController.getConsumerFilterManager().register(group, subscriptionDataList);break;default:throw new RuntimeException(Unknown event event);} }/*** Broker2Client的方法* p* 通知消费者变更** param channel 连接* param consumerGroup 消费者组*/ public void notifyConsumerIdsChanged(final Channel channel,final String consumerGroup) {if (null consumerGroup) {log.error(notifyConsumerIdsChanged consumerGroup is null);return;}//构建请求头NotifyConsumerIdsChangedRequestHeader requestHeader new NotifyConsumerIdsChangedRequestHeader();requestHeader.setConsumerGroup(consumerGroup);//构建远程命令对象请求code为NOTIFY_CONSUMER_IDS_CHANGEDRemotingCommand request RemotingCommand.createRequestCommand(RequestCode.NOTIFY_CONSUMER_IDS_CHANGED, requestHeader);try {//发送单向请求无需等待客户端回应this.brokerController.getRemotingServer().invokeOneway(channel, request, 10);} catch (Exception e) {log.error(notifyConsumerIdsChanged exception. group{}, error{}, consumerGroup, e.toString());} } 1.4.2 客户端处理重平衡请求 broker的请求在客户端是通过ClientRemotingProcessor#processRequest处理的。 NOTIFY_CONSUMER_IDS_CHANGED请求通过客户端的ClientRemotingProcessor#notifyConsumerIdsChanged方法处理。 /*** ClientRemotingProcessor的方法* p* 处理来自远程服务端的请求*/ Override public RemotingCommand processRequest(ChannelHandlerContext ctx,RemotingCommand request) throws RemotingCommandException {switch (request.getCode()) {case RequestCode.CHECK_TRANSACTION_STATE:return this.checkTransactionState(ctx, request);case RequestCode.NOTIFY_CONSUMER_IDS_CHANGED://处理NOTIFY_CONSUMER_IDS_CHANGED请求return this.notifyConsumerIdsChanged(ctx, request);case RequestCode.RESET_CONSUMER_CLIENT_OFFSET:return this.resetOffset(ctx, request);case RequestCode.GET_CONSUMER_STATUS_FROM_CLIENT:return this.getConsumeStatus(ctx, request);case RequestCode.GET_CONSUMER_RUNNING_INFO:return this.getConsumerRunningInfo(ctx, request);case RequestCode.CONSUME_MESSAGE_DIRECTLY:return this.consumeMessageDirectly(ctx, request);case RequestCode.PUSH_REPLY_MESSAGE_TO_CLIENT:return this.receiveReplyMessage(ctx, request);default:break;}return null; }notifyConsumerIdsChanged客户端重平衡 客户端接口到broker的重平衡请求之后, 调用这个方法。内部仅仅是调用我们之前讲的rebalanceImmediately 方法唤醒负载均衡服务rebalanceService, 进行重平衡。 public RemotingCommand notifyConsumerIdsChanged(ChannelHandlerContext ctx,RemotingCommand request) throws RemotingCommandException {try {//解析请求头final NotifyConsumerIdsChangedRequestHeader requestHeader (NotifyConsumerIdsChangedRequestHeader) request.decodeCommandCustomHeader(NotifyConsumerIdsChangedRequestHeader.class);//打印日志log.info(receive brokers notification[{}], the consumer group: {} changed, rebalance immediately,RemotingHelper.parseChannelRemoteAddr(ctx.channel()),requestHeader.getConsumerGroup());//熟悉的方法立即进行重平衡this.mqClientFactory.rebalanceImmediately();} catch (Exception e) {log.error(notifyConsumerIdsChanged exception, RemotingHelper.exceptionSimpleDesc(e));}return null; }
http://www.zqtcl.cn/news/307791/

相关文章:

  • 西安专题门户响应式网站建设系统网站有哪些
  • 山东省建设局网站监理员考试asp.net mvc6电商网站开发实践
  • 做网站需要提供什么资料网站备案是什么意思
  • 河南网站建设及推广东莞百度代做网站联系方式
  • 大型企业网站制作浦东新区做网站
  • 简单大气网站源码织梦怎么用框架实现在浏览器的地址栏只显示网站的域名而不显示出文件名
  • 电子商务型网站建设线上推广营销策划
  • 网站建设管理工作情况的通报网站开发vs设计报告
  • 嘉定网站网站建设公司官网制作
  • 做旅游广告在哪个网站做效果好财经网站建设
  • 网站样式下载网站地图定位用什么技术做
  • 自己做网站怎么做的百中搜优化软件
  • 南宁建站平台与网站建设相关的论文题目
  • 足球网站建设意义做股权众筹的网站
  • 北京网站建设设计一流的锦州网站建设
  • 专业手机移动网站建设什么网站可以做期刊封面
  • cms建站系统哪个好网站建设 柳州
  • 安徽省住房与城乡建设部网站八戒电影在线观看免费7
  • 江苏省建设考试网站准考证打印佛山网站建设锐艺a068
  • 展示型网站功能如何设计网站风格
  • wordpress图床网站网站什么时候做等保
  • 怎么创办网站浅谈博物馆网站建设的意义
  • 如何做擦边球网站网站seo规划
  • 建站知乎做网站销售工资
  • 仙居住房和城乡建设局网站用手机看网站源代码
  • 网架加工厂家seo关键词优化推广报价表
  • 开发新闻类网站门户网站搭建方案
  • 东莞网站搭建建站公司wordpress+链接跳转
  • 福州网站设计软件公司学校网站源码wordpress
  • 网站seo推广优化报价表广州哪个区封了