当前位置: 首页 > news >正文

山东省建设部官方网站南京建行网站

山东省建设部官方网站,南京建行网站,建设网站前期准备工作,昆明网络推广服务二.小试牛刀阶段 ​ 开始理解一些比较简单的业务逻辑 3、Netty服务注册框架 1、关注重点 ​ 网络通信服务是构建分布式应用的基础#xff0c;也是我们去理解RocketMQ底层业务的基础。这里就重点梳理RocketMQ的这个服务注册框架#xff0c;理解各个业务进程之间是如何进行…二.小试牛刀阶段 ​ 开始理解一些比较简单的业务逻辑 3、Netty服务注册框架 1、关注重点 ​ 网络通信服务是构建分布式应用的基础也是我们去理解RocketMQ底层业务的基础。这里就重点梳理RocketMQ的这个服务注册框架理解各个业务进程之间是如何进行RPC远程通信的。 ​ Netty的所有远程通信功能都由remoting模块实现。RemotingServer模块里包含了RPC的服务端RemotingServer以及客户端RemotingClient。在RocketMQ中涉及到的远程服务非常多在RocketMQ中NameServer主要是RPC的服务端RemotingServerBroker对于客户端来说是RPC的服务端RemotingServer而对于NameServer来说又是RPC的客户端。各种Client是RPC的客户端RemotingClient。 ​ 需要理解的是RocketMQ基于Netty保持客户端与服务端的长连接Channel。只要Channel是稳定的那么即可以从客户端发请求到服务端同样服务端也可以发请求到客户端。例如在事务消息场景中就需要Broker多次主动向Producer发送请求确认事务的状态。所以RemotingServer和RemotingClient都需要注册自己的服务。 2、源码重点 ​ 1、哪些组件需要Netty服务端哪些组件需要Netty客户端 比较好理解的NameServer需要NettyServer。客户端Producer和Consuer需要NettyClient。Broker需要NettyServer响应客户端请求需要NettyClient向NameServer注册心跳。但是有个问题 事务消息的Producer也需要响应Broker的事务状态回查他需要NettyServer吗 NameServer不需要NettyClient这也验证了之前介绍的NameServer之间不需要进行数据同步的说法。 ​ 2、所有的RPC请求数据都封账成RemotingCommand对象。而每个处理消息的服务逻辑都会封装成一个NettyRequestProcessor对象。 ​ 3、服务端和客户端都维护一个processorTable这是个HashMap。key是服务码requestCodevalue是对应的运行单元 PairNettyRequestProcessor,ExecutorService类型包含了处理Processor和执行线程的线程池。具体的Processor由业务系统自行注册。Broker服务注册见BrokerController.registerProcessor()客户端的服务注册见MQClientAPIImpl。NameServer则会注册一个大的DefaultRequestProcessor统一处理所有服务。 ​ 4、请求类型分为REQUEST和RESPONSE。这是为了支持异步的RPC调用。NettyServer处理完请求后可以先缓存到responseTable中等NettyClient下次来获取这样就不用阻塞Channel了可以提升请求吞吐量。猜一猜Producer的同步请求的流程是什么样的 ​ 5、重点理解remoting包中是如何实现全流程异步化。 整体RPC框架流程如下图 ​ RocketMQ使用Netty框架提供了一套基于服务码的服务注册机制让各种不同的组件都可以按照自己的需求注册自己的服务方法。RocketMQ的这一套服务注册机制是非常简洁使用的。在使用Netty进行其他相关应用开发时都可以借鉴他的这一套服务注册机制。例如要开发一个大型的IM项目要加减好友、发送文本图片甚至红包、维护群聊信息等等各种各样的请求这些请求如何封装就可以很好的参考这个框架。 3、关于RocketMQ的同步结果推送与异步结果推送 ​ RocketMQ的RemotingServer服务端会维护一个responseTable这是一个线程同步的Map结构。 key为请求的IDvalue是异步的消息结果。ConcurrentMapInteger /* opaque */, ResponseFuture 。 ​ 处理同步请求(NettyRemotingAbstract#invokeSyncImpl)时处理的结果会存入responseTable通过ResponseFuture提供一定的服务端异步处理支持提升服务端的吞吐量。 请求返回后立即从responseTable中移除请求记录。 ​ 实际上同步也是通过异步实现的。 //org.apache.rocketmq.remoting.netty.ResponseFuture//发送消息后通过countDownLatch阻塞当前线程造成同步等待的效果。public RemotingCommand waitResponse(final long timeoutMillis) throws InterruptedException {this.countDownLatch.await(timeoutMillis, TimeUnit.MILLISECONDS);return this.responseCommand;}//等待异步获取到消息后再通过countDownLatch释放当前线程。public void putResponse(final RemotingCommand responseCommand) {this.responseCommand responseCommand;this.countDownLatch.countDown();}​ 处理异步请求(NettyRemotingAbstract#invokeAsyncImpl)时处理的结果依然会存入responsTable等待客户端后续再来请求结果。但是他保存的依然是一个ResponseFuture也就是在客户端请求结果时再去获取真正的结果。 另外在RemotingServer启动时会启动一个定时的线程任务不断扫描responseTable将其中过期的response清除掉。 //org.apache.rocketmq.remoting.netty.NettyRemotingServer this.timer.scheduleAtFixedRate(new TimerTask() {Overridepublic void run() {try {NettyRemotingServer.this.scanResponseTable();} catch (Throwable e) {log.error(scanResponseTable exception, e);}}}, 1000 * 3, 1000);4、Broker心跳注册管理 1、关注重点 ​ 之前介绍过Broker会在启动时向所有NameServer注册自己的服务信息并且会定时往NameServer发送心跳信息。而NameServer会维护Broker的路由列表并对路由表进行实时更新。这一轮就重点梳理这个过程。 2、源码重点 ​ Broker启动后会立即发起向NameServer注册心跳。方法入口BrokerController.this.registerBrokerAll。 然后启动一个定时任务以10秒延迟默认30秒的间隔持续向NameServer发送心跳。 ​ NameServer内部会通过RouteInfoManager组件及时维护Broker信息。同时在NameServer启动时会启动定时任务扫描不活动的Broker。方法入口NamesrvController.initialize方法。 3、极简化的服务注册发现流程 ​ 为什么RocketMQ要自己实现一个NameServer而不用Zookeeper、Nacos这样现成的注册中心 ​ 首先依赖外部组件会对产品的独立性形成侵入不利于自己的版本演进。Kafka要抛弃Zookeeper就是一个先例。 ​ 另外其实更重要的还是对业务的合理设计。NameServer之间不进行信息同步而是依赖Broker端向所有NameServer同时发起注册。这让NameServer的服务可以非常轻量。如果可能你可以与Nacos或Zookeeper的核心流程做下对比。 ​ 但是要知道这种极简的设计其实是以牺牲数据一致性为代价的。Broker往多个NameServer同时发起注册有可能部分NameServer注册成功而部分NameServer注册失败了。这样多个NameServer之间的数据是不一致的。作为注册中心这是不可接受的。但是对于RocketMQ这又变得可以接受了。因为客户端从NameServer上获得的只要有一个正常运行的Broker就可以了并不需要完整的Broker列表。 5、Producer发送消息过程 1、关注重点 首先回顾下我们之前的Producer使用案例。 Producer有两种 一种是普通发送者DefaultMQProducer。只负责发送消息发送完消息就可以停止了。另一种是事务消息发送者 TransactionMQProducer。支持事务消息机制。需要在事务消息过程中提供事务状态确认的服务这就要求事务消息发送者虽然是一个客户端但是也要完成整个事务消息的确认机制后才能退出。 ​ 事务消息机制后面将结合Broker进行整理分析。这一步暂不关注。我们只关注DefaultMQProducer的消息发送过程。 然后整个Producer的使用流程大致分为两个步骤一是调用start方法进行一大堆的准备工作。 二是各种send方法进行消息发送。 ​ 那我们重点关注以下几个问题 1、Producer启动过程中启动了哪些服务 2、Producer如何管理broker路由信息。 可以设想一下如果Producer启动了之后NameServer挂了那么Producer还能不能发送消息希望你先从源码中进行猜想然后自己设计实验进行验证。 3、关于Producer的负载均衡。也就是Producer到底将消息发到哪个MessageQueue中。这里可以结合顺序消息机制来理解一下。消息中那个莫名奇妙的MessageSelector到底是如何工作的。 2、源码重点 1、Producer的核心启动流程 ​ 所有Producer的启动过程最终都会调用到DefaultMQProducerImpl#start方法。在start方法中的通过一个mQClientFactory对象启动生产者的一大堆重要服务。 ​ 这里其实就是一种设计模式虽然有很多种不同的客户端但是这些客户端的启动流程最终都是统一的全是交由mQClientFactory对象来启动。而不同之处在于这些客户端在启动过程中按照服务端的要求注册不同的信息。例如生产者注册到producerTable消费者注册到consumerTable管理控制端注册到adminExtTable 2、发送消息的核心流程 ​ 核心流程如下 ​ 1、发送消息时会维护一个本地的topicPublishInfoTable缓存DefaultMQProducer会尽量保证这个缓存数据是最新的。但是如果NameServer挂了那么DefaultMQProducer还是会基于这个本地缓存去找Broker。只要能找到Broker还是可以正常发送消息到Broker的。 --可以在生产者示例中start后打一个断点然后把NameServer停掉这时Producer还是可以发送消息的。 ​ 2、生产者如何找MessageQueue 默认情况下生产者是按照轮训的方式依次轮训各个MessageQueue。但是如果某一次往一个Broker发送请求失败后下一次就会跳过这个Broker。 //org.apache.rocketmq.client.impl.producer.TopicPublishInfo//如果进到这里lastBrokerName不为空那么表示上一次向这个Broker发送消息是失败的这时就尽量不要再往这个Broker发送消息了。public MessageQueue selectOneMessageQueue(final String lastBrokerName) {if (lastBrokerName  null) {return selectOneMessageQueue();} else {for (int i  0; i  this.messageQueueList.size(); i) {int index  this.sendWhichQueue.incrementAndGet();int pos  Math.abs(index) % this.messageQueueList.size();if (pos  0)pos  0;MessageQueue mq  this.messageQueueList.get(pos);if (!mq.getBrokerName().equals(lastBrokerName)) {return mq;}}return selectOneMessageQueue();}} ​ 3、如果在发送消息时传了Selector那么Producer就不会走这个负载均衡的逻辑而是会使用Selector去寻找一个队列。 具体参见org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#sendSelectImpl 方法。 6、Consumer拉取消息过程 1、关注重点 结合我们之前的示例回顾下消费者这一块的几个重点问题 消费者也是有两种推模式消费者和拉模式消费者。优秀的MQ产品都会有一个高级的目标就是要提升整个消息处理的性能。而要提升性能服务端的优化手段往往不够直接最为直接的优化手段就是对消费者进行优化。所以在RocketMQ中整个消费者的业务逻辑是非常复杂的甚至某种程度上来说比服务端更复杂所以在这里我们重点关注用得最多的推模式的消费者。 消费者组之间有集群模式和广播模式两种消费模式。我们就要了解下这两种集群模式是如何做的逻辑封装。 然后我们关注下消费者端的负载均衡的原理。即消费者是如何绑定消费队列的哪些消费策略到底是如何落地的。 最后我们来关注下在推模式的消费者中MessageListenerConcurrently 和MessageListenerOrderly这两种消息监听器的处理逻辑到底有什么不同为什么后者能保持消息顺序。 2、源码重点 ​ Consumer的核心启动过程和Producer是一样的 最终都是通过mQClientFactory对象启动。不过之间添加了一些注册信息。整体的启动过程如下 3、广播模式与集群模式的Offset处理 在DefaultMQPushConsumerImpl的start方法中启动了非常多的核心服务。 比如对于广播模式与集群模式的Offset处理 if (this.defaultMQPushConsumer.getOffsetStore() ! null) {this.offsetStore  this.defaultMQPushConsumer.getOffsetStore();} else {switch (this.defaultMQPushConsumer.getMessageModel()) {case BROADCASTING:this.offsetStore  new LocalFileOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());break;case CLUSTERING:this.offsetStore  new RemoteBrokerOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());break;default:break;}this.defaultMQPushConsumer.setOffsetStore(this.offsetStore);}this.offsetStore.load();​ 可以看到广播模式是使用LocalFileOffsetStore在Consumer本地保存Offset而集群模式是使用RemoteBrokerOffsetStore在Broker端远程保存offset。而这两种Offset的存储方式最终都是通过维护本地的offsetTable缓存来管理Offset。 4、Consumer与MessageQueue建立绑定关系 ​ start方法中还一个比较重要的东西是给rebalanceImpl设定了一个AllocateMessageQueueStrategy用来给Consumer分配MessageQueue的。 this.rebalanceImpl.setMessageModel(this.defaultMQPushConsumer.getMessageModel()); //Consumer负载均衡策略 this.rebalanceImpl.setAllocateMessageQueueStrategy(this.defaultMQPushConsumer.getAllocateMessageQueueStrategy());​ 这个AllocateMessageQueueStrategy就是用来给Consumer和MessageQueue之间建立一种对应关系的。也就是说只要Topic当中的MessageQueue以及同一个ConsumerGroup中的Consumer实例都没有变动那么某一个Consumer实例只是消费固定的一个或多个MessageQueue上的消息其他Consumer不会来抢这个Consumer对应的MessageQueue。 ​ 关于负载均衡机制会在后面结合Producer的发送消息策略一起总结。不过这里你可以想一下为什么要让一个MessageQueue只能由同一个ConsumerGroup中的一个Consumer实例来消费。 ​ 其实原因很简单因为Broker需要按照ConsumerGroup管理每个MessageQueue上的Offset如果一个MessageQueue上有多个同属一个ConsumerGroup的Consumer实例他们的处理进度就会不一样。这样的话Offset就乱套了。 5、顺序消费与并发消费 ​ 同样在start方法中启动了consumerMessageService线程进行消息拉取。 //Consumer中自行指定的回调函数。if (this.getMessageListenerInner() instanceof MessageListenerOrderly) {this.consumeOrderly  true;this.consumeMessageService new ConsumeMessageOrderlyService(this, (MessageListenerOrderly) this.getMessageListenerInner());} else if (this.getMessageListenerInner() instanceof MessageListenerConcurrently) {this.consumeOrderly  false;this.consumeMessageService new ConsumeMessageConcurrentlyService(this, (MessageListenerConcurrently) this.getMessageListenerInner());}​ 可以看到 Consumer通过registerMessageListener方法指定的回调函数都被封装成了ConsumerMessageService的子实现类。 ​ 而对于这两个服务实现类的调用会延续到DefaultMQPushConsumerImpl的pullCallback对象中。也就是Consumer每拉过来一批消息后就向Broker提交下一个拉取消息的的请求。 这里也可以印证一个点就是顺序消息只对异步消费也就是推模式有效。同步消费的拉模式是无法进行顺序消费的。因为这个pullCallback对象在拉模式的同步消费时根本就没有往下传。 当然这并不是说拉模式不能锁定队列进行顺序消费拉模式在Consumer端应用就可以指定从哪个队列上拿消息。 PullCallback pullCallback  new PullCallback() {Overridepublic void onSuccess(PullResult pullResult) {if (pullResult ! null) {//...switch (pullResult.getPullStatus()) {case FOUND://...DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(pullResult.getMsgFoundList(),processQueue,pullRequest.getMessageQueue(),dispatchToConsume);//... break;//...}}}​ 而这里提交的实际上是一个ConsumeRequest线程。而提交的这个ConsumeRequest线程在两个不同的ConsumerService中有不同的实现。 ​ 这其中两者最为核心的区别在于ConsumerMessageOrderlyService是锁定了一个队列处理完了之后再消费下一个队列。 public void run() {// ....final Object objLock  messageQueueLock.fetchLockObject(this.messageQueue);synchronized (objLock) {//....}}​ 为什么给队列加个锁就能保证顺序消费呢结合顺序消息的实现机制理解一下。 ​ 从源码中可以看到Consumer提交请求时都是往线程池里异步提交的请求。如果不加队列锁那么就算Consumer提交针对同一个MessageQueue的拉取消息请求这些请求都是异步执行他们的返回顺序是乱的无法进行控制。给队列加个锁之后就保证了针对同一个队列的第二个请求必须等第一个请求处理完了之后释放了锁才可以提交。这也是在异步情况下保证顺序的基础思路。 6、实际拉取消息还是通过PullMessageService完成的。 ​ start方法中相当于对很多消费者的服务进行初始化包括指定一些服务的实现类以及启动一些定时的任务线程比如清理过期的请求缓存等。最后会随着mQClientFactory组件的启动启动一个PullMessageService。实际的消息拉取都交由PullMesasgeService进行。 ​ 所谓消息推模式其实还是通过Consumer拉消息实现的。 //org.apache.rocketmq.client.impl.consumer.PullMessageServiceprivate void pullMessage(final PullRequest pullRequest) {final MQConsumerInner consumer  this.mQClientFactory.selectConsumer(pullRequest.getConsumerGroup());if (consumer ! null) {DefaultMQPushConsumerImpl impl  (DefaultMQPushConsumerImpl) consumer;impl.pullMessage(pullRequest);} else {log.warn(No matched consumer for the PullRequest {}, drop it, pullRequest);}}7、客户端负载均衡管理总结 ​ 从之前Producer发送消息的过程以及Conmer拉取消息的过程我们可以抽象出RocketMQ中一个消息分配的管理模型。这个模型是我们在使用RocketMQ时很重要的进行性能优化的依据。 1 Producer负载均衡 ​ Producer发送消息时默认会轮询目标Topic下的所有MessageQueue并采用递增取模的方式往不同的MessageQueue上发送消息以达到让消息平均落在不同的queue上的目的。而由于MessageQueue是分布在不同的Broker上的所以消息也会发送到不同的broker上。 ​ 在之前源码中看到过Producer轮训时如果发现往某一个Broker上发送消息失败了那么下一次会尽量避免再往同一个Broker上发送消息。但是如果你的应用场景允许发送消息长延迟也可以给Producer设定setSendLatencyFaultEnable(true)。这样对于某些Broker集群的网络不是很好的环境可以提高消息发送成功的几率。 ​ 同时生产者在发送消息时可以指定一个MessageQueueSelector。通过这个对象来将消息发送到自己指定的MessageQueue上。这样可以保证消息局部有序。 2 Consumer负载均衡 ​ Consumer也是以MessageQueue为单位来进行负载均衡。分为集群模式和广播模式。 1、集群模式 ​ 在集群消费模式下每条消息只需要投递到订阅这个topic的Consumer Group下的一个实例即可。RocketMQ采用主动拉取的方式拉取并消费消息在拉取的时候需要明确指定拉取哪一条message queue。 ​ 而每当实例的数量有变更都会触发一次所有实例的负载均衡这时候会按照queue的数量和实例的数量平均分配queue给每个实例。 ​ 每次分配时都会将MessageQueue和消费者ID进行排序后再用不同的分配算法进行分配。内置的分配的算法共有六种分别对应AllocateMessageQueueStrategy下的六种实现类可以在consumer中直接set来指定。默认情况下使用的是最简单的平均分配策略。 AllocateMachineRoomNearby 将同机房的Consumer和Broker优先分配在一起。 ​ 这个策略可以通过一个machineRoomResolver对象来定制Consumer和Broker的机房解析规则。然后还需要引入另外一个分配策略来对同机房的Broker和Consumer进行分配。一般也就用简单的平均分配策略或者轮询分配策略。 感觉这东西挺鸡肋的直接给个属性指定机房不是挺好的吗。 ​ 源码中有测试代码AllocateMachineRoomNearByTest。 ​ 在示例中Broker的机房指定方式 messageQueue.getBrokerName().split(-)[0]而Consumer的机房指定方式clientID.split(-)[0] ​ clinetID的构建方式见ClientConfig.buildMQClientId方法。按他的测试代码应该是要把clientIP指定为IDC1-CID-0这样的形式。 AllocateMessageQueueAveragely平均分配。将所有MessageQueue平均分给每一个消费者 AllocateMessageQueueAveragelyByCircle 轮询分配。轮流的给一个消费者分配一个MessageQueue。 AllocateMessageQueueByConfig 不分配直接指定一个messageQueue列表。类似于广播模式直接指定所有队列。 AllocateMessageQueueByMachineRoom按逻辑机房的概念进行分配。又是对BrokerName和ConsumerIdc有定制化的配置。 AllocateMessageQueueConsistentHash。源码中有测试代码AllocateMessageQueueConsitentHashTest。这个一致性哈希策略只需要指定一个虚拟节点数是用的一个哈希环的算法虚拟节点是为了让Hash数据在换上分布更为均匀。 最常用的就是平均分配和轮训分配了。例如平均分配时的分配情况是这样的 ​ 而轮训分配就不计算了每次把一个队列分给下一个Consumer实例。 2、广播模式 ​ 广播模式下每一条消息都会投递给订阅了Topic的所有消费者实例所以也就没有消息分配这一说。而在实现上就是在Consumer分配Queue时所有Consumer都分到所有的Queue。 ​ 广播模式实现的关键是将消费者的消费偏移量不再保存到broker当中而是保存到客户端当中由客户端自行维护自己的消费偏移量。
http://www.zqtcl.cn/news/302100/

相关文章:

  • 电商网站后台报价公司如何建站
  • 查网站有没有做推广企业网站建设的目标
  • 北京网站维护公司专业外贸网站建设_诚信_青岛
  • 网站自己做还是用程序制作网站一般使用的软件有哪些
  • 晨雷文化传媒网站建设济南互联网品牌设计
  • 怎样给自己的网站做防红连接梵客装饰公司官网
  • 甘肃省城乡与住房建设厅网站纪检网站建设动态主题
  • 关于做好全国网站建设网站建设哪个好
  • 灵犀科技网站建设企业建设网站作用
  • 做网站架构图无版权图片网站
  • 赌场需要网站维护吗通过服务推广网站的案例
  • 阿里云网站空间网站建设犭金手指六六壹柒
  • 网站排名软件包年农业网站开发
  • 建设信用卡网银网站crm客户关系管理论文
  • 阿里巴巴网站的搜索引擎优化案例软件开发收费价目表
  • 企业网站建设之域名篇wordpress 文章居中
  • 萍乡网站建设行吗南康建设局官方网站
  • 一键部署wordpress爱站seo工具
  • 大连网站建设服务做进料加工在哪个网站上做
  • 南昌行业网站建设网站版权信息修改
  • 百度网站关键词排名助手低成本做网站 白之家
  • 怎么查询网站是谁做的部队网站建设报告
  • 租房网站开发专业网站建设品牌策划方案
  • 电子商务网站建设方案书软件开发工具图片
  • 案例建网站宿松网站建设公司
  • 秦皇岛网站开发wordpress免费国内主题
  • seo网站推广推荐阳江房管局查询房产信息网
  • php服装商城网站建设个人网站免费空间
  • 做内贸注册什么网站广州市建设交易中心网站
  • 点样用外网访问自己做的网站北京市网站设计公司网址