当前位置: 首页 > news >正文

口碑好网站建设报价网站建设 主机托管

口碑好网站建设报价,网站建设 主机托管,汕头网站建设,品牌网站建设的作用简介 Name server 负责broker注册、心跳#xff0c;路由等功能#xff0c;类似Kafka的ZKname server节点之间不互相通信#xff0c;broker需要和所有name server进行通信。扩容name server需要重启broker#xff0c;不然broker不会和name server建立连接producer和consum… 简介 Name server 负责broker注册、心跳路由等功能类似Kafka的ZKname server节点之间不互相通信broker需要和所有name server进行通信。扩容name server需要重启broker不然broker不会和name server建立连接producer和consumer也都是和name server建立长连接获取路由信息拿到对应的broker信息与broker建立长连接然后发送/消费消息 路由发现 Pull的模式。当topic路由信息发生变化时name server不回主动推送给客户端而是客户端定时拉取。默认客户端每30秒会拉取一次最新的路由信息 扩展 1push模型实时性好但是需要维护一个长链接消耗服务端资源。client数量不多实时性要求高server数据变化比较频繁的场景适合此种模式 2pull模型实时性差 3long polling模型长轮询模式。客户端定时发送拉取请求服务端会hold住连接一段时间在此期间的数据变动通过此连接推送。超过hold时间后才断开连接。兼顾以上两种方式 Broker broker每30s给name server发送一次心跳name server每120s检查一次所有的broker心跳时间超过阈值踢出brokerbroker节点集群是主从集群master负责处理读写请求slave负责对master中的数据进行备份。master和slave有相同的broker name但broker id不同broker id为0的是master非0的是slave。每个broker与name server集群中的所有节点建立长连接定时注册topic信息到所有name server 源码分析 NameServer NameServer的启动过程分析 NameServer服务器相关的源码在namesrv模块下目录结构如下 NamesrvStartup类就是Name Server服务器启动的启动类NamesrvStartup类中有一个main启动类main方法调用main0main0主要流程代码 main0 方法的主要作用就是创建Name Server服务器的控制器并且启动Name Server服务器的控制器。NamesrvController类的作用就是为Name Server服务的启动提供具体的逻辑实现主要包括配置信息的加载、远程通信服务器的创建和加载、默认处理器的注册以及心跳检测机器监控Broker的健康状态等。Name Server服务器的控制器的创建方法为createNamesrvController方法createNamesrvController方法的主要流程代码如下 //代码位置org.apache.rocketmq.namesrv.NamesrvStartup#createNamesrvController public static NamesrvController createNamesrvController(String[] args){//设置rocketMQ的版本信息REMOTING_VERSION_KEY的值为rocketmq.remoting.versionCURRENT_VERSION的值为V4_7_0System.setProperty(RemotingCommand.REMOTING_VERSION_KEY, Integer.toString(MQVersion.CURRENT_VERSION));//构建命令行添加帮助命令和Name Server的提示命令将createNamesrvController方法的args参数进行解析//代码省略//nameServer 服务器配置类和netty 服务器配置类final NamesrvConfig namesrvConfig new NamesrvConfig();final NettyServerConfig nettyServerConfig new NettyServerConfig();//设置netty服务器的监听端口nettyServerConfig.setListenPort(9876);// 判断上述构建的命令行是否有configFile缩写为C配置文件,如果有的话则读取configFile配置文件的配置信息// 并将转为NamesrvConfig和NettyServerConfig的配置信息// 代码省略// 如果构建的命令行存在字符p就打印所有的配置信息病区退出方法// 代码省略//首先将构建的命令行转换为Properties然后将通过反射的方式将Properties的属性转换为namesrvConfig的配置项和配置值。MixAll.properties2Object(ServerUtil.commandLine2Properties(commandLine), namesrvConfig);//打印nameServer 服务器配置类和 netty 服务器配置类的配置信息MixAll.printObjectProperties(log, namesrvConfig);MixAll.printObjectProperties(log, nettyServerConfig);//将namesrvConfig和nettyServerConfig作为参数创建nameServer 服务器的控制器final NamesrvController controller new NamesrvController(namesrvConfig, nettyServerConfig);//将所有的配置保存在内存中Propertiescontroller.getConfiguration().registerConfig(properties);return controller; }createNamesrvController方法主要做了几件事读取和解析配置信息包括Name Server服务的配置信息、Netty 服务器的配置信息、打印读取或者解析的配置信息、保存配置信息到本地文件中以及根据namesrvConfig配置和nettyServerConfig配置作为参数创建nameServer 服务器的控制器。创建好Name server控制器以后就可以启动它了。启动Name Server的方法的主流程如下 //代码位置org.apache.rocketmq.namesrv.NamesrvStartup#start public static NamesrvController start(final NamesrvController controller){//初始化nameserver 服务器如果初始化失败则退出boolean initResult controller.initialize();if (!initResult) {controller.shutdown();System.exit(-3);}//添加关闭的钩子进行内存清理、对象销毁等惭怍Runtime.getRuntime().addShutdownHook(new ShutdownHookThread(log, new CallableVoid() {Overridepublic Void call() throws Exception {controller.shutdown();return null;}}));//启动controller.start(); }start方法没什么逻辑主要作用就是进行初始化工作然后进行启动Name Server控制器接下来看看进行了哪些初始化工作以及如何启动Name Server的初始化initialize方法的主要流程如下 //代码位置org.apache.rocketmq.namesrv.NamesrvStartup#initialize public boolean initialize() {// key-value 配置加载this.kvConfigManager.load();// //创建netty远程服务器用来进行网络传输以及通信this.remotingServer new NettyRemotingServer(this.nettyServerConfig, this.brokerHousekeepingService);//远程服务器线程池this.remotingExecutor Executors.newFixedThreadPool(nettyServerConfig.getServerWorkerThreads(), new ThreadFactoryImpl(RemotingExecutorThread_));//注册处理器this.registerProcessor();//每10秒扫描不活跃的brokerthis.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {Overridepublic void run() {NamesrvController.this.routeInfoManager.scanNotActiveBroker();}}, 5, 10, TimeUnit.SECONDS);//每10秒打印配置信息key-valuethis.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {Overridepublic void run() {NamesrvController.this.kvConfigManager.printAllPeriodically();}}, 1, 10, TimeUnit.MINUTES);//省略部分代码return true;}initialize方法的主要逻辑如下 加载配置文件。读取文件名为user.home/namesrv/kvConfig.json(其中user.home为用户的目录)然后将读取的文件内容转为KVConfigSerializeWrapper类最后将所有的key-value保存在如下map中 //用来保存不同命名空间的key-value private final HashMapString/* Namespace /, HashMapString/ Key /, String/ Value */ configTable new HashMapString, HashMapString, String(); 创建Netty服务器。Name Server 用netty与生产者、消费者以及Boker进行通信。注册处理器。这里主要注册的是默认的处理器DefaultRequestProcessor注册的逻辑主要是初始化DefaultRequestProcessor并保存着待需要使用的时候直接使用。处理器的作用就是处理生产者、消费者以及Broker服务器的不同请求比如获取生产者和消费者获取所有的路由信息Broker服务器注册路由信息等。处理器DefaultRequestProcessor处理不同的请求将会在下面进行讲述。执行定时任务。主要有两个定时任务一个是每十秒扫描不活跃的Broker。并且将过期的Broker清理掉。另外一个是每十秒打印key-valu的配置信息。 上面就是initialize方法的主要逻辑特别需要注意每10秒扫描不活跃的broker的定时任务 //NamesrvController.this.routeInfoManager.scanNotActiveBroker(); //代码位置org.apache.rocketmq.namesrv.routeinfo.RouteInfoManager#scanNotActiveBrokerpublic void scanNotActiveBroker() {//所有存活的BrokerIteratorEntryString, BrokerLiveInfo it this.brokerLiveTable.entrySet().iterator();//遍历Brokerwhile (it.hasNext()) {EntryString, BrokerLiveInfo next it.next();long last next.getValue().getLastUpdateTimestamp();//最后更新时间加上broker过期时间120秒小于当前时间则关闭与broker的远程连接。并且将缓存在map中的broker信息删除if ((last BROKER_CHANNEL_EXPIRED_TIME) System.currentTimeMillis()) {RemotingUtil.closeChannel(next.getValue().getChannel());it.remove();//将过期的Channel连接清理掉。以及删除缓存的Brokerthis.onChannelDestroy(next.getKey(), next.getValue().getChannel());}}}scanNotActiveBroker方法的逻辑主要是遍历缓存在brokerLiveTable的Broker将Broker最后更新时间加上120秒的结果是否小于当前时间如果小于当前时间说明Broker已经过期可能是已经下线了所以可以清除Broker信息并且关闭Name Server 服务器与Broker服务器连接这样被清除的Broker就不会与Name Server服务器进行远程通信了。brokerLiveTable的结果如下 //保存broker地址与broker存活信息的对应关系 private final HashMapString/* brokerAddr */, BrokerLiveInfo brokerLiveTable;brokerLiveTable缓存着以brokerAddr为keyBroker 地址,以BrokerLiveInfo为value的结果BrokerLiveInfo是Broker存活对象主要有如下几个属性 class BrokerLiveInfo {//最后更新时间private long lastUpdateTimestamp;//版本信息private DataVersion dataVersion;//连接private Channel channel;//高可用服务器地址private String haServerAddr;//省略代码 }从BrokerLiveInfo中删除了过期的Broker后还需要做清理Name Server服务器与Broker服务器的连接onChannelDestroy方法主要是清理缓存在如下map的信息 ////代码位置org.apache.rocketmq.namesrv.routeinfo.RouteInfoManager//保存broker地址与broker存活信息的对应关系 private final HashMapString/* brokerAddr */, BrokerLiveInfo brokerLiveTable; //保存broker地址与过滤服务器的对应关系Filter Server 与消息过滤有关系 private final HashMapString/* brokerAddr */, ListString/* Filter Server */ filterServerTable; //保存broker 名字与 broker元数据的关系 private final HashMapString/* brokerName */, BrokerData brokerAddrTable; //保存集群名字与集群下所有broker名字对应的关系 private final HashMapString/* clusterName */, SetString/* brokerName */ clusterAddrTable; //保存topic与topic下所有队列元数据的对应关系private final HashMapString/* topic */, ListQueueData topicQueueTable;在扫描过期的broker时首先找到不活跃的broker然后onChannelDestroy方法清理与该不活跃broker有关的缓存清理的主要流程如下 清理不活跃的broker存活信息。首先遍历brokerLiveTable找到不活跃的broker然后删除brokerLiveTable中的与该不活跃的broker有关的缓存信息。清理与消息过滤有关的缓存。找到不活跃的broker存活信息删除filterServerTable中的与该broker地址有关的消息过滤的服务信息。清理与不活跃broker的元素居。brokerAddrTable保存着broker名字与broker元素居对应的信息BrokerData类保存着cluster、brokerName、brokerId与broker name。遍历brokerAddrTable找到与该不活跃broker的名字相等的broker元素进行删除。清理集群下对应的不活跃broker名字。clusterAddrTable保存集群名字与集群下所有broker名字对应的关系遍历clusterAddrTable的所有key从clusterAddrTable中找到与不活跃broker名字相等的元素然后删除。清理与该不活跃broker的topic对应队列数据。topicQueueTable保存topic与topic下所有队列元数据的对应关系QueueData保存着brokerName、readQueueNums可读队列数量、writeQueueNums可写队列数量等。遍历topicQueueTable的key找到与不活跃broker名字相同的QueueData进行删除。 初始化nameserver 服务器以后接下来就可以启动nameserver 服务器 //代码位置org.apache.rocketmq.namesrv.NamesrvController#start public void start() throws Exception {//启动远程服务器netty 服务器this.remotingServer.start();//启动文件监听线程if (this.fileWatchService ! null) {this.fileWatchService.start();} }start方法做了两件事第一件就是启动netty服务器netty服务器主要负责与Broker、生产者与消费者之间的通信处理Broker、生产者与消费者的不同请求。根据nettyConfig配置设置启动的配置和各种处理器然后采用netty服务器启动的模板启动服务器具体的代码就不分析了有兴趣的可以看看netty启动代码模板是怎么样的。第二件事就是启动文件监听线程监听tts相关文件是否发生变化。 Name Server 服务器启动流程的源代码分析到此为止了在这里总结下Name Server 服务器启动流程主要做了什么事 加载和读取配置。设置Name Server 服务器启动的配置NamesrvConfig和启动Netty服务器启动的配置NettyServerConfig。初始化相关的组件。netty服务类、远程服务线程池、处理器以及定时任务的初始化。启动Netty服务器。Netty服务器用来与broker、生产者、消费者进行通信、处理与它们之间的各种请求并且对请求的响应结果进行处理。 Broker管理和路由信息的管理 Name Server 服务器的作用主要有两个Broker管理和路由信息管理。 Broker管理 在上面分析的Name Server 服务器的启动过程中也有一个与Broker管理相关的分析那就是启动一个定时线程池每十秒去扫描不活跃的Broker。将不活跃的Broker清理掉。除了在Name Server 服务器启动时启动定时任务去扫描不活跃的Broker外Name Server 服务器启动以后通过netty服务器接收Broker、生产者、消费者的不同请求将接收到请求会交给在Name Server服务器启动时注册的处理器DefaultRequestProcessor类的processRequest方法处理。processRequest方法根据请求的不同类型将请求交给不同的方法进行处理。有关Broker管理的请求主要有注册Broker、注销BrokerprocessRequest方法处理注册Broker、注销Broker请求的代吗如下 //代码位置org.apache.rocketmq.namesrv.processor.DefaultRequestProcessor#processRequestpublic RemotingCommand processRequest(ChannelHandlerContext ctx,RemotingCommand request) {switch (request.getCode()) {//省略无关代码//注册Brokercase RequestCode.REGISTER_BROKER:Version brokerVersion MQVersion.value2Version(request.getVersion());if (brokerVersion.ordinal() MQVersion.Version.V3_0_11.ordinal()) {return this.registerBrokerWithFilterServer(ctx, request);} else {return this.registerBroker(ctx, request);}//注销Brokercase RequestCode.UNREGISTER_BROKER:return this.unregisterBroker(ctx, request);//省略无关代码}}Broker注册 Broker 服务器启动时会向Name Server 服务器发送Broker 相关的信息如集群的名字、Broker地址、Broker名字、topic相关信息等注册Broker主要的代码比较长接下来会分成好几部分进行讲解。如下 //代码位置org.apache.rocketmq.namesrv.processor.RouteInfoManager#registerBroker public RegisterBrokerResult registerBroker(final String clusterName,final String brokerAddr,final String brokerName,final long brokerId,final String haServerAddr,final TopicConfigSerializeWrapper topicConfigWrapper,final ListString filterServerList,final Channel channel) {RegisterBrokerResult result new RegisterBrokerResult();this.lock.writeLock().lockInterruptibly();//根据集群的名字获取所有的broker名字SetString brokerNames this.clusterAddrTable.get(clusterName);if (null brokerNames) {brokerNames new HashSetString();this.clusterAddrTable.put(clusterName, brokerNames);}//名字保存在broker名字中brokerNames.add(brokerName);//省略代码 }registerBroker方法根据集群的名字获取该集群下所有的Broker名字的Set如果不存在就创建并添加进clusterAddrTable中clusterAddrTable保存着集群名字与该集群下所有的Broker名字对应关系最后将broker名字保存在set中。 public RegisterBrokerResult registerBroker(final String clusterName,final String brokerAddr,final String brokerName,final long brokerId,final String haServerAddr,final TopicConfigSerializeWrapper topicConfigWrapper,final ListString filterServerList,final Channel channel) {//省略代码boolean registerFirst false;//获取broker 元数据BrokerData brokerData this.brokerAddrTable.get(brokerName);if (null brokerData) {registerFirst true;brokerData new BrokerData(clusterName, brokerName, new HashMapLong, String());this.brokerAddrTable.put(brokerName, brokerData);}//获取所有的broker地址MapLong, String brokerAddrsMap brokerData.getBrokerAddrs();IteratorEntryLong, String it brokerAddrsMap.entrySet().iterator();while (it.hasNext()) {EntryLong, String item it.next();if (null ! brokerAddr brokerAddr.equals(item.getValue()) brokerId ! item.getKey()) {it.remove();}}String oldAddr brokerData.getBrokerAddrs().put(brokerId, brokerAddr);registerFirst registerFirst || (null oldAddr);//省略代码}上述代码主要做了两件事 缓存broker元数据信息。首先根据broker名字从brokerAddrTable中获取Broker元数据brokerData如果brokerData不存在说明是第一次注册创建Broker元数据并添加进brokerAddrTable中brokerAddrTable保存着Broker名字与Broker元数据对应的信息。从Broker元数据brokerData中获取该元数据中的所有Broker地址信息brokerAddrsMap。brokerAddrsMap保存着brokerId与所有Broker名字对应信息。遍历brokerAddrsMap中的所有broker地址查找与参数brokerAddr相同但是与参数borkerId不同的进行删除保证一个broker名字对应着BrokerId最后将参数brokerId与参数brokerAddr保存到brokerData元数据的brokerAddrsMap中进行缓存。 public RegisterBrokerResult registerBroker(final String clusterName,final String brokerAddr,final String brokerName,final long brokerId,final String haServerAddr,final TopicConfigSerializeWrapper topicConfigWrapper,final ListString filterServerList,final Channel channel) {//省略代码//如果topic的配置不空并且是broker masterif (null ! topicConfigWrapper MixAll.MASTER_ID brokerId) {//如果topic配置改变或者是第一次注册if (this.isBrokerTopicConfigChanged(brokerAddr, topicConfigWrapper.getDataVersion())|| registerFirst) {//获取所有的topic配置ConcurrentMapString, TopicConfig tcTable topicConfigWrapper.getTopicConfigTable();if (tcTable ! null) {//遍历topic配置创建并更新队列元素for (Map.EntryString, TopicConfig entry : tcTable.entrySet()) {this.createAndUpdateQueueData(brokerName, entry.getValue());}}}}//省略代码}如果参数topicConfigWrapper不等于空并且brokerId等于0时判断topic是否改变如果topic改变或者是第一次注册获取所有的topic配置并创建和更新队列元数据。QueueData保存着队列元数据如Broker名字、写队列数量、读队列数量如果队列缓存中不存在该队列元数据则添加否则遍历缓存map找到该队列元数据进行删除如果是新添加的则添加进队列缓存中。 public RegisterBrokerResult registerBroker(final String clusterName,final String brokerAddr,final String brokerName,final long brokerId,final String haServerAddr,final TopicConfigSerializeWrapper topicConfigWrapper,final ListString filterServerList,final Channel channel) {//省略代码//创建broker存活对象并进行保存BrokerLiveInfo prevBrokerLiveInfo this.brokerLiveTable.put(brokerAddr,new BrokerLiveInfo(System.currentTimeMillis(),topicConfigWrapper.getDataVersion(),channel,haServerAddr));if (null prevBrokerLiveInfo) {log.info(new broker registered, {} HAServer: {}, brokerAddr, haServerAddr);}//如果过滤服务地址不为空则缓存到filterServerTableif (filterServerList ! null) {if (filterServerList.isEmpty()) {this.filterServerTable.remove(brokerAddr);} else {this.filterServerTable.put(brokerAddr, filterServerList);}}//如果不是broker master获取高可用服务器地址以及master地址if (MixAll.MASTER_ID ! brokerId) {String masterAddr brokerData.getBrokerAddrs().get(MixAll.MASTER_ID);if (masterAddr ! null) {BrokerLiveInfo brokerLiveInfo this.brokerLiveTable.get(masterAddr);if (brokerLiveInfo ! null) {result.setHaServerAddr(brokerLiveInfo.getHaServerAddr());result.setMasterAddr(masterAddr);}}}return result;}最后代码片段主要做了三件事首先创建了Broker存活对象BrokerLiveInfo添加到brokerLiveTable中缓存在Name Server 启动时供定时线程任务每十秒进行扫描。以确保非正常的Broker被清理掉。然后是判断参数filterServerList是否为空如果不为空则添加到filterServerTable缓存filterServerTable保存着与消息过滤相关的过滤服务。最后判断该注册的Broker不是Broker master则设置高可用服务器地址以及master地址。到此为止Broker注册的代码就分析完成了总而言之Broker注册就是Broker将相关的元数据信息如Broker名字Broker地址、topic信息发送给Name Server服务器Name Server接收到以后将这些元数据缓存起来以供后续能够快速找到这些元数据生产者和消费者也可以通过Name Server服务器获取到Broke相关的信息这样生产者和消费者就可以和Broker服务器进行通信了生产者发送消息给Broker服务器消费者从Broker服务器消费消息。 Broker注销 Broker注销的过程刚好跟Broker注册的过程相反Broker注册是将Broker相关信息和Topic配置信息缓存起来以供生产者和消费者使用。而Broker注销则是将Broker注销缓存的Broker信息从缓存中删除Broker注销unregisterBroker方法主要代码流程如下 //代码位置org.apache.rocketmq.namesrv.routeinfo.RouteInfoManager#unregisterBroker public void unregisterBroker(final String clusterName,final String brokerAddr,final String brokerName,final long brokerId) {this.lock.writeLock().lockInterruptibly();//将缓存的broker存活对象删除BrokerLiveInfo brokerLiveInfo this.brokerLiveTable.remove(brokerAddr);//将所有的过滤服务删除this.filterServerTable.remove(brokerAddr);boolean removeBrokerName false;//删除broker元数据if (null ! brokerData) {String addr brokerData.getBrokerAddrs().remove(brokerId);if (brokerData.getBrokerAddrs().isEmpty()) {this.brokerAddrTable.remove(brokerName);removeBrokerName true;}}//如果删除broker元数据成功if (removeBrokerName) {SetString nameSet this.clusterAddrTable.get(clusterName);if (nameSet ! null) {boolean removed nameSet.remove(brokerName);if (nameSet.isEmpty()) {this.clusterAddrTable.remove(clusterName);}}//根据brokerName删除topic配置信息this.removeTopicByBrokerName(brokerName);}this.lock.writeLock().unlock(); }unregisterBroker方法的参数有集群名字、broker地址、broker名字、brokerId主要逻辑为 根据broker地址删除broker存活对象。根据broker地址删除所有消息过滤服务。删除broker元数据。如果删除元数据成功则根据集群名字删除该集群的所有broker名字以及根据根据- brokerName删除topic配置信息。 路由信息的管理 处理器DefaultRequestProcessor类的processRequest方法除了处理Broker注册和Broker注销的请求外还处路由信息管理有关的请求接收到生产者和消费者的路由信息相关的请求会交给处理器DefaultRequestProcessor类的processRequest方法处理processRequest方法则会根据不同的请求类型将请求交给RouteInfoManager类的不同方法处理。RouteInfoManager类用map进行缓存路由相关信息map如下 //topic与队列数据对应映射关系 private final HashMapString/* topic */, ListQueueData topicQueueTable; //broker 名字与broker 元数据对应映射关系 private final HashMapString/* brokerName */, BrokerData brokerAddrTable; //保存cluster的所有broker name private final HashMapString/* clusterName */, SetString/* brokerName */ clusterAddrTable; //broker 地址 与 BrokerLiveInfo存活对象的对应映射关系 private final HashMapString/* brokerAddr */, BrokerLiveInfo brokerLiveTable; //broker 地址 的所有过滤服务 private final HashMapString/* brokerAddr */, ListString/* Filter Server */ filterServerTable;RouteInfoManager类利用上面几个map缓存了Broker信息topic相关信息、集群信息、消息过滤服务信息等如果这些缓存的信息有变化就是网这些map新增或删除缓存。这就是Name Server服务的路由信息管理。processRequest方法是如何处理路由信息管理的具体实现可以阅读具体的代码无非就是将不同的请求委托给RouteInfoManager的不同方法RouteInfoManager的不同实现了上面缓存信息的管理。 Broker Broker 主要负责消息的存储投递和查询以及保证服务的高可用。Broker负责接收生产者发送的消息并存储、同时为消费者消费消息提供支持。为了实现这些功能Broker包含几个重要的子模块 通信模块负责处理来自客户端生产者、消费者的请求。 客户端管理模块:负责管理客户端生产者、消费者和维护消费者的Topic订阅信息。 存储模块提供存储消息和查询消息的能力方便Broker将消息存储到硬盘。 高可用服务HA Service提供数据冗余的能力保证数据存储到多个服务器上将Master Broker的数据同步到Slavew Broker上。 索引服务Index service对投递到Broker的消息建立索引提供快速查询消息的能力。 broker启动过程分析 在Name Server启动以后Broker就可以开始启动了启动过程将所有路由信息都注册到Name server服务器上生产者就可以发送消息到Broker消费者也可以从Broker消费消息。接下来就来看看Broker的具体启动过程。 //源代码位置org.apache.rocketmq.broker.BrokerStartup#main public static void main(String[] args) {start(createBrokerController(args)); }BrokerStartup类是Broker的启动类在BrokerStartup类的main方法中首先创建用createBrokerController方法创建Broker控制器BrokerController类Broker控制器主要负责Broker启动过程的具体的相关逻辑实现。创建好Broker 控制器以后就可以启动Broker 控制器了所以下面将从两个部分分析Broker的启动过程 创建Broker控制器初始化配置信息创建并初始化Broker控制注册Broker关闭的钩子启动Broker控制器 创建Broker控制器 Broker在启动的时候会初始化一些配置如Broker配置、netty服务端配置、netty客户端配置、消息存储配置为Broker启动提供配置准备。 //源代码位置org.apache.rocketmq.broker.BrokerStartup#createBrokerControllerpublic static BrokerController createBrokerController(String[] args) {/**省略代码注释1、设置RocketMQ的版本2、设置netty接收和发送请求的buffer大小3、构建命令行将命令行进行解析封装**///broker配置、netty服务端配置、netty客户端配置final BrokerConfig brokerConfig new BrokerConfig();final NettyServerConfig nettyServerConfig new NettyServerConfig();final NettyClientConfig nettyClientConfig new NettyClientConfig();nettyClientConfig.setUseTLS(Boolean.parseBoolean(System.getProperty(TLS_ENABLE,String.valueOf(TlsSystemConfig.tlsMode TlsMode.ENFORCING))));//设置netty监听接口nettyServerConfig.setListenPort(10911);//消息存储配置final MessageStoreConfig messageStoreConfig new MessageStoreConfig();//如果broker的角色是slave设置命中消息在内存的最大比例if (BrokerRole.SLAVE messageStoreConfig.getBrokerRole()) {int ratio messageStoreConfig.getAccessMessageInMemoryMaxRatio() - 10;messageStoreConfig.setAccessMessageInMemoryMaxRatio(ratio);}//省略代码}createBrokerController方法创建BrokerConfig、NettyServerConfig、NettyClientConfig、MessageStoreConfig配置类BrokerConfig类是Broker配置类。 BrokerConfig属性主要包括Broker相关的配置属性如Broker名字、Broker Id、Broker连接的Name server地址、集群名字等。NettyServerConfigBroker netty服务端配置类Broker netty服务端主要用来接收客户端的请求NettyServerConfig类主要属性包括监听接口、服务工作线程数、接收和发送请求的buffer大小等。NettyClientConfignetty客户端配置类用于生产者、消费者这些客户端与Broker进行通信相关配置配置属性主要包括客户端工作线程数、客户端回调线程数、连接超时时间、连接不活跃时间间隔、连接最大闲置时间等。MessageStoreConfig消息存储配置类配置属性包括存储路径、commitlog文件存储目录、CommitLog文件的大小、CommitLog刷盘的时间间隔等。 初始化配置信息 创建完这些配置类以后接下来会为这些配置类的一些配置属性设置值先看看如下代码 //源代码位置org.apache.rocketmq.broker.BrokerStartup#createBrokerControllerpublic static BrokerController createBrokerController(String[] args) {//省略代码//如果命令中包含字母c则读取配置文件将配置文件的内容设置到配置类中if (commandLine.hasOption(c)) {String file commandLine.getOptionValue(c);if (file ! null) {configFile file;InputStream in new BufferedInputStream(new FileInputStream(file));properties new Properties();properties.load(in);//读取配置文件的中namesrv地址properties2SystemEnv(properties);//将配置文件中的配置项映射到配置类中去MixAll.properties2Object(properties, brokerConfig);MixAll.properties2Object(properties, nettyServerConfig);MixAll.properties2Object(properties, nettyClientConfig);MixAll.properties2Object(properties, messageStoreConfig);//设置配置broker配置文件BrokerPathConfigHelper.setBrokerConfigPath(file);in.close();}}//设置broker配置类MixAll.properties2Object(ServerUtil.commandLine2Properties(commandLine), brokerConfig);//省略代码}上述主要的代码逻辑为如果命令行中存在命令参数为‘c’c是configFile的缩写那么就读取configFile文件的内容将configFile配置文件的配置项映射到BrokerConfig、NettyServerConfig、NettyClientConfig、MessageStoreConfig配置类中。接下来createBrokerController方法做一些判断必要配置的合法性如下代码所示 //源代码位置org.apache.rocketmq.broker.BrokerStartup#createBrokerController public static BrokerController createBrokerController(String[] args) {//省略代码//如果broker配置文件的rocketmqHome属性值为null直接结束程序if (null brokerConfig.getRocketmqHome()) {System.out.printf(Please set the %s variable in your environment to match the location of the RocketMQ installation, MixAll.ROCKETMQ_HOME_ENV);System.exit(-2);}//如果name server服务器的地址不为nullString namesrvAddr brokerConfig.getNamesrvAddr();if (null ! namesrvAddr) {try {//namesrvAddr是以;分割的多个地址String[] addrArray namesrvAddr.split(;);//每个地址是ip:port的形式检测下是否形如ip:port的形式for (String addr : addrArray) {RemotingUtil.string2SocketAddress(addr);}} catch (Exception e) {System.out.printf(The Name Server Address[%s] illegal, please set it as follows, \127.0.0.1:9876;192.168.0.1:9876\%n,namesrvAddr);System.exit(-3);}}//设置BrokerIdbroker master 的BrokerId设置为0,broker slave 设置为大于0的值switch (messageStoreConfig.getBrokerRole()) {case ASYNC_MASTER:case SYNC_MASTER:brokerConfig.setBrokerId(MixAll.MASTER_ID);break;case SLAVE://如果小于等于0退出程序if (brokerConfig.getBrokerId() 0) {System.out.printf(Slaves brokerId must be 0);System.exit(-3);}break;default:break;}//省略代码}首先会判断下RocketmqHome的值是否为空RocketmqHome是Borker相关配置保存的文件目录如果为空则直接退出程序启动Broker失败然后判断下Name server 地址是否为空如果不为空则解析以“”分割的name server地址检测下地址的合法性如果不合法则直接退出程序最后判断下Broker的角色如果是masterBrokerId设置为0如果是SLAVE则BrokerId设置为大于0的数否则直接退出程序Broker启动失败。 createBrokerController方法进行必要配置参数的判断以后将进行日志的设置、以及打印配置信息主要代码如下 //源代码位置org.apache.rocketmq.broker.BrokerStartup#createBrokerController public static BrokerController createBrokerController(String[] args) {//省略代码//注释日志设置//printConfigItem 打印配置信息if (commandLine.hasOption(p)) {InternalLogger console InternalLoggerFactory.getLogger(LoggerName.BROKER_CONSOLE_NAME);MixAll.printObjectProperties(console, brokerConfig);MixAll.printObjectProperties(console, nettyServerConfig);MixAll.printObjectProperties(console, nettyClientConfig);MixAll.printObjectProperties(console, messageStoreConfig);System.exit(0);} else if (commandLine.hasOption(m)) {//printImportantConfig 打印重要配置信息InternalLogger console InternalLoggerFactory.getLogger(LoggerName.BROKER_CONSOLE_NAME);MixAll.printObjectProperties(console, brokerConfig, true);MixAll.printObjectProperties(console, nettyServerConfig, true);MixAll.printObjectProperties(console, nettyClientConfig, true);MixAll.printObjectProperties(console, messageStoreConfig, true);System.exit(0);}//打印配置信息log InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);MixAll.printObjectProperties(log, brokerConfig);MixAll.printObjectProperties(log, nettyServerConfig);MixAll.printObjectProperties(log, nettyClientConfig);MixAll.printObjectProperties(log, messageStoreConfig);//代码省略}createBrokerController方法的以上代码逻辑打印配置信息先判断命令行参数是否包含字母‘p’printConfigItem的缩写如果包含字母‘p’则打印配置信息否则判断下命令行是否包含字母‘m’则打印被ImportantField注解的配置属性也就是重要的配置属性。最后不管命令行中是否存在字母‘p’或者字母‘m’都打印配置信息。 以上就是初始化配置信息的全部代码初始化配置信息主要是创建BrokerConfig、NettyServerConfig、NettyClientConfig、MessageStoreConfig配置类并为这些配置类设置配置的值同时根据命令行参数判断打印配置信息。 初始化Broker控制器 //源代码位置org.apache.rocketmq.broker.BrokerStartup#createBrokerController public static BrokerController createBrokerController(String[] args) {//省略代码//创建BrokerControllerbroker 控制器final BrokerController controller new BrokerController(brokerConfig,nettyServerConfig,nettyClientConfig,messageStoreConfig);// remember all configs to prevent discard//将所有的配置信息保存在内存controller.getConfiguration().registerConfig(properties);//初始化broker控制器boolean initResult controller.initialize();//如果初始化失败则退出if (!initResult) {controller.shutdown();System.exit(-3);}//省略代码 }创建并初始化Broker控制的代码比较简单创建以配置类作为参数的BrokerController对象并将所有的配置信息保存在内容中方便在其他地方使用创建完Broker控制器对象以后对控制器进行初始化当初始化失败以后则直接退出程序。 initialize方法主要是加载一些保存在本地的一些配置数据总结起来做了如下几方面的事情 加载topic配置、消费者位移数据、订阅组数据、消费者过滤配置创建消息相关的组件并加载消息数据创建netty服务器创建一系列线程注册处理器启动一系列定时任务初始化事务组件初始化acl组件注册RpcHook 加载topic配置、消费者位移数据、订阅组数据、消费者过滤配置 //源代码位置org.apache.rocketmq.broker.BrokerController#initialize public boolean initialize() throws CloneNotSupportedException {//加载topic配置 topics.jsonboolean result this.topicConfigManager.load();//加载消费者位移数据 consumerOffset.jsonresult result this.consumerOffsetManager.load();//加载订阅组数据 subscriptionGroup.jsonresult result this.subscriptionGroupManager.load();//加载消费者过滤 consumerFilter.jsonresult result this.consumerFilterManager.load();//省略代码 }load方法是抽象类ConfigManager的方法该方法读取文件的内容解码成对应的配置对象如果文件中的内容为空就读取备份文件中的内容进行解码。读取的文件都是保存在user.home/store/config/下user.home是用户目录不同人的电脑user.home一般不同。topicConfigManager.load()读取topics.json文件如果该文件的内容为空那么就读取topics.json.bak文件内容topics.json保存的是topic数据同理consumerOffsetManager.load()方法读取consumerOffset.json和consumerOffset.json.bak文件保存的是消费者位移数据subscriptionGroupManager.load()方法读取subscriptionGroup.json和subscriptionGroup.json.bak文件保存订阅组数据消费者分组数据、consumerFilterManager.load()方法读取的是consumerFilter.json和consumerFilter.json.bak的内容保存的是消费者过滤数据。 创建消息相关的组件并加载消息数据 //源代码位置org.apache.rocketmq.broker.BrokerController#initialize public boolean initialize() throws CloneNotSupportedException {//省略代码//如果上述都加载成功if (result) {try {//创建消息存储器this.messageStore new DefaultMessageStore(this.messageStoreConfig, this.brokerStatsManager, this.messageArrivingListener,this.brokerConfig);//如果开启了容灾、主从自动切换添加DLedger角色改变处理器if (messageStoreConfig.isEnableDLegerCommitLog()) {DLedgerRoleChangeHandler roleChangeHandler new DLedgerRoleChangeHandler(this, (DefaultMessageStore) messageStore);((DLedgerCommitLog)((DefaultMessageStore) messageStore).getCommitLog()).getdLedgerServer().getdLedgerLeaderElector().addRoleChangeHandler(roleChangeHandler);}//broker 相关统计this.brokerStats new BrokerStats((DefaultMessageStore) this.messageStore);//load plugin//加载消息存储插件MessageStorePluginContext context new MessageStorePluginContext(messageStoreConfig, brokerStatsManager, messageArrivingListener, brokerConfig);this.messageStore MessageStoreFactory.build(context, this.messageStore);this.messageStore.getDispatcherList().addFirst(new CommitLogDispatcherCalcBitMap(this.brokerConfig, this.consumerFilterManager));} catch (IOException e) {result false;log.error(Failed to initialize, e);}}//加载消息文件result result this.messageStore.load();//省略代码 }如果加载topic配置、消费者位移数据、订阅组数据、消费者过滤配置成功以后就创建消息相关的组件并加载消息数据这个过程创建了消息存储器、DLedger角色改变处理器、Broker统计相关组件以及消息存储插件然后加载消息文件中的数据。接下来具体看看加载消息文件中的messageStore.load()方法 //代码位置org.apache.rocketmq.store.DefaultMessageStore#load public boolean load() {boolean result true;try {//判断abort是否存在boolean lastExitOK !this.isTempFileExist();log.info(last shutdown {}, lastExitOK ? normally : abnormally);//加载定时消费服务器if (null ! scheduleMessageService) {//读取delayOffset.json文件result result this.scheduleMessageService.load();}// load Commit Log//加载 Commit log 文件result result this.commitLog.load();// load Consume Queue//加载消费者队列 文件consumequeueresult result this.loadConsumeQueue();if (result) {//加载检查点文件checkpointthis.storeCheckpoint new StoreCheckpoint(StorePathConfigHelper.getStoreCheckpoint(this.messageStoreConfig.getStorePathRootDir()));//加载索引文件this.indexService.load(lastExitOK);//数据恢复this.recover(lastExitOK);log.info(load over, and the max phy offset {}, this.getMaxPhyOffset());}} catch (Exception e) {log.error(load exception, e);result false;}if (!result) {this.allocateMappedFileService.shutdown();}return result; }load方法主要逻辑就是加载各种数据文件主要有以下几方面进行加载数据 isTempFileExist方法判断abort是否存在如果不存在说明Broker是正常关闭的否则就是异常关闭。scheduleMessageService.load()方法读取user.home/store/config/下的delayOffset.json文件的内容该文件内容保存延迟消息的位移数据。commitLog.load()加载 CommitLog 文件, CommitLog 文件保存的是消息内容loadConsumeQueue()方法加载consumequeue目录下的内容ConsumeQueue消息消费队列是消费消息的索引消费者通过ConsumeQueue可以快速找到查找待消费的消息consumequeue目录下的文件组织方式是topic/queueId/fileName所以就可以快速找待消费的消息在哪一个Commit log 文件中。indexService.load(lastExitOK)加载索引文件加载的是user.home/store/index/目录下文件文件名fileName是以创建时的时间戳命名的所以可以通过时间区间来快速查询消息IndexFile的底层存储设计为在文件系统中实现HashMap结构故底层实现为hash索引。recover(lastExitOK)方法将CommitLog 文件的内容加载到内存中以及topic队列。 创建netty服务器 //源代码位置org.apache.rocketmq.broker.BrokerController#initialize public boolean initialize() throws CloneNotSupportedException {//省略代码if (result) {//创建netty远程服务器this.remotingServer new NettyRemotingServer(this.nettyServerConfig, this.clientHousekeepingService);NettyServerConfig fastConfig (NettyServerConfig) this.nettyServerConfig.clone();fastConfig.setListenPort(nettyServerConfig.getListenPort() - 2);this.fastRemotingServer new NettyRemotingServer(fastConfig, this.clientHousekeepingService);//省略代码}//省略代码 }创建netty服务器的时候创建了两个一个是普通的一个是快速的remotingServer用来与生产者、消费者进行通信。当isSendMessageWithVIPChanneltrue的时候会选择port-2的fastRemotingServer进行的消息的处理为了防止某些很重要的业务阻塞就再开启了一个remotingServer进行处理但是现在默认是不开启的fastRemotingServer主要是为了兼容老版本的RocketMQ.。 创建一系列线程池 //源代码位置org.apache.rocketmq.broker.BrokerController#initialize public boolean initialize() throws CloneNotSupportedException {//代码省略//发送消息线程池this.sendMessageExecutor new BrokerFixedThreadPoolExecutor(this.brokerConfig.getSendMessageThreadPoolNums(),this.brokerConfig.getSendMessageThreadPoolNums(),1000 * 60,TimeUnit.MILLISECONDS,this.sendThreadPoolQueue,new ThreadFactoryImpl(SendMessageThread_));//拉取消息线程池//this.pullMessageExecutor //回复消息线程池//this.replyMessageExecutor //查询消息线程池//this.queryMessageExecutor //broker 管理线程池//this.adminBrokerExecutor//客户端管理线程池//this.clientManageExecutor //心跳线程池//this.heartbeatExecutor //事务线程池// this.endTransactionExecutor //消费者管理线程池this.consumerManageExecutor Executors.newFixedThreadPool(this.brokerConfig.getConsumerManageThreadPoolNums(), new ThreadFactoryImpl(ConsumerManageThread_));//代码省略 }创建的线程池对象有发送消息线程池、拉取消息线程池、回复消息线程池、查询消息线程池、broker 管理线程池、客户端管理线程池、心跳线程池、事务线程池、消费者管理线程池。 注册请求处理器 //源代码位置org.apache.rocketmq.broker.BrokerController#initialize public boolean initialize() throws CloneNotSupportedException {//省略代码//注册处理器this.registerProcessor();//省略代码 }registerProcessor()方法如下 //源代码位置org.apache.rocketmq.broker.BrokerController#registerProcessor public void registerProcessor() {//发送消息处理器SendMessageProcessor sendProcessor new SendMessageProcessor(this);sendProcessor.registerSendMessageHook(sendMessageHookList);sendProcessor.registerConsumeMessageHook(consumeMessageHookList);//远程服务注册发送消息处理器this.remotingServer.registerProcessor(RequestCode.SEND_MESSAGE, sendProcessor, this.sendMessageExecutor);this.remotingServer.registerProcessor(RequestCode.SEND_MESSAGE_V2, sendProcessor, this.sendMessageExecutor);this.remotingServer.registerProcessor(RequestCode.SEND_BATCH_MESSAGE, sendProcessor, this.sendMessageExecutor);this.remotingServer.registerProcessor(RequestCode.CONSUMER_SEND_MSG_BACK, sendProcessor, this.sendMessageExecutor);this.fastRemotingServer.registerProcessor(RequestCode.SEND_MESSAGE, sendProcessor, this.sendMessageExecutor);this.fastRemotingServer.registerProcessor(RequestCode.SEND_MESSAGE_V2, sendProcessor, this.sendMessageExecutor);this.fastRemotingServer.registerProcessor(RequestCode.SEND_BATCH_MESSAGE, sendProcessor, this.sendMessageExecutor);this.fastRemotingServer.registerProcessor(RequestCode.CONSUMER_SEND_MSG_BACK, sendProcessor, this.sendMessageExecutor);//注册拉消息处理器//注册回复消息处理器//注册查询消息处理器//注册客户端管理处理器//注册消费者管理处理器//注册事务处理器//注册broker处理器AdminBrokerProcessor adminProcessor new AdminBrokerProcessor(this);this.remotingServer.registerDefaultProcessor(adminProcessor, this.adminBrokerExecutor);this.fastRemotingServer.registerDefaultProcessor(adminProcessor, this.adminBrokerExecutor); }registerProcessor方法注册了发送消息处理器、远程服务注册发送消息处理器、拉消息处理器、回复消息处理器、查询消息处理器、客户端管理处理器、消费者管理处理器、事务处理器、broker处理器。registerProcessor注册方法也很简单就是以RequestCode作为key以Pair处理器线程池作为Value保存在名字为processorTable的HashMap中。每个请求都是在线程池中处理的这样可以提高处理请求的性能。对于每个传入的请求根据RequestCode就可以在processorTable查找处理器来处理请求。每个处理器都有有一个processRequest方法进行处理请求。 启动一系列定时任务 Broker初始化方法initialize中会启动一系列的后台定时线程任务这些后台任务包括都是由scheduledExecutorService线程池执行的scheduledExecutorService是单线程线程池 Executors.newSingleThreadScheduledExecutor()只用单线程线程池执行后台定时任务有一个好处就是减少线程过多反而导致线程为了抢占CPU加剧了竞争。这一些后台定时线程任务如下 每24小时打印昨天产生了多少消息消费了多少消息每五秒保存消费者位移到文件中每10秒保存消费者过滤到文件中每3分钟定时检测消费的进度每秒打印队列的大小以及队列头部元素存在的时间每分钟打印已存储在CommitLog中但尚未分派到消费队列的字节数每两分钟定时获取获取name server 地址每分钟定时打印slave 数据同步落后多少 //源代码位置org.apache.rocketmq.broker.BrokerController#initialize public boolean initialize() throws CloneNotSupportedException {//省略代码final long period 1000 * 60 * 60 * 24;//每24小时打印昨天产生了多少消息消费了多少消息this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {Overridepublic void run() {try {BrokerController.this.getBrokerStats().record();} catch (Throwable e) {log.error(schedule record error., e);}}}, initialDelay, period, TimeUnit.MILLISECONDS);//省略代码 }每24小时打印昨天产生了多少消息消费了多少消息的定时任务比较简单就是将昨天消息的生产和消费的数量统计出来然后把这两个指标打印出来。 //源代码位置org.apache.rocketmq.broker.BrokerController#initialize public boolean initialize() throws CloneNotSupportedException {//省略代码//每五秒保存消费者位移this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {Overridepublic void run() {try {BrokerController.this.consumerOffsetManager.persist();} catch (Throwable e) {log.error(schedule persist consumerOffset error., e);}}}, 1000 * 10, this.brokerConfig.getFlushConsumerOffsetInterval(), TimeUnit.MILLISECONDS);//每10秒保存消费者过滤this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {Overridepublic void run() {try {BrokerController.this.consumerFilterManager.persist();} catch (Throwable e) {log.error(schedule persist consumer filter error., e);}}}, 1000 * 10, 1000 * 10, TimeUnit.MILLISECONDS);//省略代码 }每五秒保存消费者位移和每10秒保存消费者过滤定时任务都是保存在文件中每五秒保存消费者位移定时任务将消费者位移保存在consumerOffset.json文件中每10秒保存消费者过滤定时任务将消费者过滤保存在consumerFilter.json文件中。 //源代码位置org.apache.rocketmq.broker.BrokerController#initialize public boolean initialize() throws CloneNotSupportedException {//省略代码//每3分钟定时检测消费的进度this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {Overridepublic void run() {try {BrokerController.this.protectBroker();} catch (Throwable e) {log.error(protectBroker error., e);}}}, 3, 3, TimeUnit.MINUTES);//省略代码 }每3分钟定时检测消费进度的定时任务的作用是检测消费者的消费进度当消费者消费消息的进度落后大于配置的最大落后阈值时就停止消费者消费具体的实现看protectBroker的源码 //源代码位置org.apache.rocketmq.broker.BrokerController#protectBroker public void protectBroker() { //是否开启慢消费检测开关默认未开启 if (this.brokerConfig.isDisableConsumeIfConsumerReadSlowly()) { //遍历统计项 final IteratorMap.EntryString, MomentStatsItem it this.brokerStatsManager.getMomentStatsItemSetFallSize().getStatsItemTable().entrySet().iterator(); while (it.hasNext()) { final Map.EntryString, MomentStatsItem next it.next(); final long fallBehindBytes next.getValue().getValue().get(); //消费者消费消息的进度落后消费者落后阈值 if (fallBehindBytes this.brokerConfig.getConsumerFallbehindThreshold()) { final String[] split next.getValue().getStatsKey().split(“”); final String group split[2]; LOG_PROTECTION.info(“[PROTECT_BROKER] the consumer[{}] consume slowly, {} bytes, disable it”, group, fallBehindBytes); //设置消费者消费的标志关闭消费 this.subscriptionGroupManager.disableConsume(group); } } } } protectBroker方法首先判别是否开启慢消费检测开关如果开启了就进行遍历统计项判断消费者消费消息的进度落后消费者落后阈值的时候就停止该消费者停止消费来保护broker如果消费者消费比较慢那么在Broker的消费会越来越多积压在Broker上所以停止慢消费者消费消息让其他消费者消费减少消息的积压。 //源代码位置org.apache.rocketmq.broker.BrokerController#initialize public boolean initialize() throws CloneNotSupportedException {//代码省略//每秒打印队列的大小以及队列头部元素存在的时间this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {Overridepublic void run() {try {BrokerController.this.printWaterMark();} catch (Throwable e) {log.error(printWaterMark error., e);}}}, 10, 1, TimeUnit.SECONDS);//代码省略 }每秒打印队列的大小以及队列头部元素存在的时间定时任务会打印发送消息线程池队列、拉取消息线程池队列、查询消息线程池队列、结束事务线程池队列的大小以及打印队列头部元素存在的时间这个时间等于当前时间减去头部元素创建的时间就是该元素创建到现在已经花费了多长时间。具体的代码如下 //源代码位置org.apache.rocketmq.broker.BrokerController#headSlowTimeMills public long headSlowTimeMills(BlockingQueueRunnable q) {long slowTimeMills 0;//队列的头final Runnable peek q.peek();if (peek ! null) {RequestTask rt BrokerFastFailure.castRunnable(peek);//当前时间减去创建时间slowTimeMills rt null ? 0 : this.messageStore.now() - rt.getCreateTimestamp();}if (slowTimeMills 0) {slowTimeMills 0;}return slowTimeMills; }初始化事务消息 //源码位置org.apache.rocketmq.broker.BrokerController#initialTransaction private void initialTransaction() {//加载transactionalMessageService利用spithis.transactionalMessageService ServiceProvider.loadClass(ServiceProvider.TRANSACTION_SERVICE_ID, TransactionalMessageService.class);if (null this.transactionalMessageService) {this.transactionalMessageService new TransactionalMessageServiceImpl(new TransactionalMessageBridge(this, this.getMessageStore()));log.warn(Load default transaction message hook service: {}, TransactionalMessageServiceImpl.class.getSimpleName());}//创建transactionalMessage检查监听器this.transactionalMessageCheckListener ServiceProvider.loadClass(ServiceProvider.TRANSACTION_LISTENER_ID, AbstractTransactionalMessageCheckListener.class);if (null this.transactionalMessageCheckListener) {this.transactionalMessageCheckListener new DefaultTransactionalMessageCheckListener();log.warn(Load default discard message hook service: {}, DefaultTransactionalMessageCheckListener.class.getSimpleName());}this.transactionalMessageCheckListener.setBrokerController(this);//创建事务消息检查服务this.transactionalMessageCheckService new TransactionalMessageCheckService(this);}initialTransaction方法主要创建与事务消息相关的类创建transactionalMessageService事务消息服务、transactionalMessageCheckListener事务消息检查监听器、transactionalMessageCheckService事务消息检查服务。transactionalMessageService用于处理事务消息transactionalMessageCheckListener主要用来回查消息监听transactionalMessageCheckService用于检查超时的 Half 消息是否需要回查。RocketMQ发送事务消息是将消费先写入到事务相关的topic的中这个消息就称为半消息当本地事务成功执行那么半消息会还原为原来的消息然后再进行保存。initialTransaction在创建transactionalMessageService和transactionalMessageCheckListener都使用了ServiceProvider.loadClass方法这个方法就是采用SPI原理SPI原理就是利用反射加载META-INF/service目录下的某个接口的所有实现只要实现接口然后META-INF/service目录下添加文件名为全类名的文件这样SPI就可以加载具体的实现类具有可拓展性。 初始化acl组件 //源码位置org.apache.rocketmq.broker.BrokerController#initialAcl private void initialAcl() {if (!this.brokerConfig.isAclEnable()) {log.info(The broker dose not enable acl);return;}//利用SPI加载权限相关的校验器ListAccessValidator accessValidators ServiceProvider.load(ServiceProvider.ACL_VALIDATOR_ID, AccessValidator.class);if (accessValidators null || accessValidators.isEmpty()) {log.info(The broker dose not load the AccessValidator);return;}//将所有的权限校验器进行缓存以及注册for (AccessValidator accessValidator: accessValidators) {final AccessValidator validator accessValidator;accessValidatorMap.put(validator.getClass(),validator);this.registerServerRPCHook(new RPCHook() {Overridepublic void doBeforeRequest(String remoteAddr, RemotingCommand request) {//Do not catch the exceptionvalidator.validate(validator.parse(request, remoteAddr));}Overridepublic void doAfterResponse(String remoteAddr, RemotingCommand request, RemotingCommand response) {}});}}initialAcl方法主要是加载权限相关校验器RocketMQ的相关的管理的权限验证和安全就交给这里的加载的校验器了。initialAcl方法也利用SPI原理加载接口的具体实现类将所有加载的校验器缓存在map中然后再注册RPC钩子在请求之前调用校验器的validate的方法。 注册RpcHook //源码位置org.apache.rocketmq.broker.BrokerController#initialRpcHooks private void initialRpcHooks() {//利用SPI加载钩子ListRPCHook rpcHooks ServiceProvider.load(ServiceProvider.RPC_HOOK_ID, RPCHook.class);if (rpcHooks null || rpcHooks.isEmpty()) {return;}//注册钩子for (RPCHook rpcHook: rpcHooks) {this.registerServerRPCHook(rpcHook);} }initialRpcHooks方法加RPC钩子利用SPI原理加载具体的钩子实现然后将所有的钩子进行注册钩子的注册是将钩子保存在List中。 以上分析就是创建Broker控制器的全过程这个过程首先进行一些必要的初始化配置如Broker配置、网络通信Neety配置以及存储相关配置等。然后在创建并初始化Broker控制器创建并初始化Broker控制器的过程中又进行了多个步骤如加载topic配置、消费者位移数据、启动一系列后台定时任务、创建事务消息相关组件等。 Broker控制器的启动 //源码位置org.apache.rocketmq.broker.BrokerController#start public static BrokerController start(BrokerController controller) {try {//Broker控制器启动controller.start();//打印Broker成功的消息String tip The broker[ controller.getBrokerConfig().getBrokerName() , controller.getBrokerAddr() ] boot success. serializeType RemotingCommand.getSerializeTypeConfigInThisServer();if (null ! controller.getBrokerConfig().getNamesrvAddr()) {tip and name server is controller.getBrokerConfig().getNamesrvAddr();}log.info(tip);System.out.printf(%s%n, tip);return controller;} catch (Throwable e) {e.printStackTrace();System.exit(-1);}return null; }controller.start()方法主要是启动各种组件 启动消息消息存储器netty服务的启动文件监听器启动broker 对外api启动长轮询拉取消息服务启动客户端长连接服务启动过滤服务管理启动broker 相关统计启动broker 快速失败启动 //源码位置org.apache.rocketmq.broker.BrokerController#start public void start() throws Exception {if (this.messageStore ! null) {//启动消息消息存储this.messageStore.start();}if (this.remotingServer ! null) {//netty服务的启动this.remotingServer.start();}if (this.fastRemotingServer ! null) {this.fastRemotingServer.start();}//文件改变监听启动if (this.fileWatchService ! null) {this.fileWatchService.start();}//broker 对外api启动if (this.brokerOuterAPI ! null) {this.brokerOuterAPI.start();}//保持长轮询请求的服务启动if (this.pullRequestHoldService ! null) {this.pullRequestHoldService.start();}//客户端长连接服务启动if (this.clientHousekeepingService ! null) {this.clientHousekeepingService.start();}//过滤服务管理启动if (this.filterServerManager ! null) {this.filterServerManager.start();}//如果没有采用主从切换多副本if (!messageStoreConfig.isEnableDLegerCommitLog()) {startProcessorByHa(messageStoreConfig.getBrokerRole());handleSlaveSynchronize(messageStoreConfig.getBrokerRole());this.registerBrokerAll(true, false, true);}//定时注册brokerthis.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {Overridepublic void run() {try {BrokerController.this.registerBrokerAll(true, false, brokerConfig.isForceRegister());} catch (Throwable e) {log.error(registerBrokerAll Exception, e);}}}, 1000 * 10, Math.max(10000, Math.min(brokerConfig.getRegisterNameServerPeriod(), 60000)), TimeUnit.MILLISECONDS);//broker 相关统计启动if (this.brokerStatsManager ! null) {this.brokerStatsManager.start();}//broker 快速失败启动if (this.brokerFastFailure ! null) {this.brokerFastFailure.start();}}启动过程还有很多细节没有分析放到下个文章吧吧吧吧吧 文章大量参考https://www.zhihu.com/column/c_1437729921845690368
http://www.zqtcl.cn/news/729591/

相关文章:

  • 铁威马 Nas 做网站百度广告代运营
  • 有没有帮别人做网站小说关键词生成器
  • 那些开店的网站是自己做的吗装修平台排行榜前十名
  • 重庆智能网站建设价格毕业设计做系统跟做网站哪个容易
  • 淘宝美工做兼职的网站多多返利网站建设
  • 如何承接设计网站建设电商平台开发流程
  • 安康做网站简洁高端的wordpress个人博客
  • 酒店网站建设协议手机怎么做销售网站
  • 屏蔽网站接口js广告seminar
  • 谁有手机网站啊介绍一下wordpress 流量插件
  • 杭州网站公司google网站建设
  • 莱芜住房和城乡建设厅网站网站头部设计
  • 织梦响应式茶叶网站模板邯郸最新通告今天
  • 深圳公司网站改版通知做网站分类链接
  • 电子商务网站建设答案网络运营与维护
  • 网站登陆怎么做网站app的区别
  • 获取网站缩略图工信部2017网站备案
  • 有哪些网站可以做ps挣钱自己制作游戏
  • 旅游网站开发团队四川住房和城乡建设网站
  • 网站框架设计商城网站制作需要多少费用
  • 网站建设哪个公司个人网站做哪种能赚钱
  • 福建建设人才与科技发展中心seo导航站
  • 修文县生态文明建设局网站郑州制作网站哪家好
  • 泉州网站优化排名东莞长安做网站公司
  • 网站制作公司 顺的有口碑的赣州网站建设
  • 成都网站设计制作苏州新闻
  • 黑色网站设计iis 网站 红
  • 专业做家居的网站佛山做网站永网
  • 医疗网站建设讯息企业门户网站建设思路
  • 四川建设安全监督管理局网站网站传送门怎么做