网站外包,附近临时工500元一天,网站安全防护,软件商店安装下载1 模块入口代码的功能 本节介绍入口代码的功能#xff0c;阅读源码的时候#xff0c;很多人喜欢根据执行逻辑#xff0c;先从入口代码看起。NameServer部分入口代码主要完成命令行参数解析#xff0c;初始化Controller的功能。
1.1 入口函数
首先看一下NameServer的源码目…1 模块入口代码的功能 本节介绍入口代码的功能阅读源码的时候很多人喜欢根据执行逻辑先从入口代码看起。NameServer部分入口代码主要完成命令行参数解析初始化Controller的功能。
1.1 入口函数
首先看一下NameServer的源码目录见图10-1。
NamesrvStartup是模块的启动入口NamesrvController是用来协块各个调模功能的代码。
我们从启动代码开始分析找到NamesrvStartup.java里的main函数public static void mainString[]args{main0args}发现它又把逻辑转到main0这个函数里。 图10-1 NameServer源码目录
1.2 解析命令行参数
main0函数主要完成两个功能第一个功能是解析命令行参数我们通过源码来看一看重点是解析-c和-p参数如代码清单10-1所示。
代码清单10-1 解析NameServer命令行参数 Options options ServerUtil.buildCommandlineOptions(new Options()); commandLine ServerUtil.parseCmdLine(mqnamesrv, args, buildCommandlineOptions(options), new PosixParser()); if (null commandLine) { System.exit(-1); return null; } final NamesrvConfig namesrvConfig new NamesrvConfig(); final NettyServerConfig nettyServerConfig new NettyServerConfig(); nettyServerConfig.setListenPort(9876); if (commandLine.hasOption(c)) { String file commandLine.getOptionValue(c); if (file ! null) { InputStream in new BufferedInputStream(new FileInputStream(file)); properties new Properties(); properties.load(in); MixAll.properties2Object(properties, namesrvConfig); MixAll.properties2Object(properties, nettyServerConfig); namesrvConfig.setConfigStorePath(file); System.out.printf(load config properties file OK, file %n); in.close(); } } if (commandLine.hasOption(p)) { MixAll.printObjectProperties(null, namesrvConfig); MixAll.printObjectProperties(null, nettyServerConfig); System.exit(0); } -c命令行参数用来指定配置文件的位置-p命令行参数用来打印所有配置项的值。注意用-p参数打印配置项的值之后程序就退出了这是一个帮助调试的选项。
1.3 初始化NameServer的Controller
main0函数的另外一个功能是初始化Controller如代码清单10-2所示。
代码清单10-2 初始化并启动Controller // remember all configs to prevent discard controller.getConfiguration().registerConfig(properties); boolean initResult controller.initialize(); if (!initResult) { controller.shutdown(); System.exit(-3); } Runtime.getRuntime().addShutdownHook(new ShutdownHookThread(log, new CallableVoid() { Override public Void call() throws Exception { controller.shutdown(); return null; } })); controller.start(); 根据解析出的配置参数调用controller.initialize来初始化然后调用controller.start让NameServer开始服务。
还有一个逻辑是注册ShutdownHookThread当程序退出的时候会调用controller.shutdown来做退出前的清理工作。
2 NameServer的总控逻辑
NameServer的总控逻辑在NamesrvController.java代码中。NameServer是集群的协调者它只是简单地接收其他角色报上来的状态然后根据请求返回相应的状态。首先NameserverController把执行线程池初始化好如代码清单10-3所示。
代码清单10-3 线程池初始化 this.remotingExecutor Executors.newFixedThreadPool(nettyServerConfig .getServerWorkerThreads(), new ThreadFactoryImpl (RemotingExecutorThread_)); this.registerProcessor(); this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { Override public void run() { NamesrvController.this.routeInfoManager.scanNotActiveBroker(); } }, 5, 10, TimeUnit.SECONDS); this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { Override public void run() { NamesrvController.this.kvConfigManager.printAllPeriodically(); } }, 1, 10, TimeUnit.MINUTES); 启动了一个默认是8个线程的线程池private int serverWorkerThreads8还有两个定时执行的线程一个用来扫描失效的BrokerscanNotActiveBroker另一个用来打印配置信息printAllPeriodically。
然后启动负责通信的服务remotingServerremotingServer监听一些端口收到Broker、Client等发过来的请求后根据请求的命令调用不同的Processor来处理。这些不同的处理逻辑被放到上面初始化的线程池中执行如代码清单10-4所示。
代码清单10-4 启动通信服务关联初始化的线程池 this.remotingServer new NettyRemotingServer(this.nettyServerConfig, this.brokerHousekeepingService); …… if (namesrvConfig.isClusterTest()) { this.remotingServer.registerDefaultProcessor(new ClusterTestRequestProcessor(this, namesrvConfig .getProductEnvName()), this.remotingExecutor); } else { this.remotingServer.registerDefaultProcessor(new DefaultRequestProcessor(this), this.remotingExecutor); } remotingServer是基于Netty封装的一个网络通信服务要了解remoting-Server需要先对Netty有个基本的认知后面会单独介绍。
3 核心业务逻辑处理
NameServer的核心业务逻辑在DefaultRequestProcessor.java中可以一目了然地看出。网络通信服务模块收到请求后就调用这个Processor来处理如代码清单10-5所示。
代码清单10-5 根据请求码调用相应的处理逻辑 switch (request.getCode()) { case RequestCode.PUT_KV_CONFIG: return this.putKVConfig(ctx, request); case RequestCode.GET_KV_CONFIG: return this.getKVConfig(ctx, request); case RequestCode.DELETE_KV_CONFIG: return this.deleteKVConfig(ctx, request); case RequestCode.REGISTER_BROKER: Version brokerVersion MQVersion.value2Version(request .getVersion()); if (brokerVersion.ordinal() MQVersion.Version .V3_0_11.ordinal()) { return this.registerBrokerWithFilterServer(ctx, request); } else { return this.registerBroker(ctx, request); } case RequestCode.UNREGISTER_BROKER: return this.unregisterBroker(ctx, request); case RequestCode.GET_ROUTEINTO_BY_TOPIC: return this.getRouteInfoByTopic(ctx, request); case RequestCode.GET_BROKER_CLUSTER_INFO: return this.getBrokerClusterInfo(ctx, request); case RequestCode.WIPE_WRITE_PERM_OF_BROKER: return this.wipeWritePermOfBroker(ctx, request); case RequestCode.GET_ALL_TOPIC_LIST_FROM_NAMESERVER: return getAllTopicListFromNameserver(ctx, request); case RequestCode.DELETE_TOPIC_IN_NAMESRV: return deleteTopicInNamesrv(ctx, request); case RequestCode.GET_KVLIST_BY_NAMESPACE: return this.getKVListByNamespace(ctx, request); case RequestCode.GET_TOPICS_BY_CLUSTER: return this.getTopicsByCluster(ctx, request); case RequestCode.GET_SYSTEM_TOPIC_LIST_FROM_NS: return this.getSystemTopicListFromNs(ctx, request); case RequestCode.GET_UNIT_TOPIC_LIST: return this.getUnitTopicList(ctx, request); case RequestCode.GET_HAS_UNIT_SUB_TOPIC_LIST: return this.getHasUnitSubTopicList(ctx, request); case RequestCode.GET_HAS_UNIT_SUB_UNUNIT_TOPIC_LIST: return this.getHasUnitSubUnUnitTopicList(ctx, request); case RequestCode.UPDATE_NAMESRV_CONFIG: return this.updateConfig(ctx, request); case RequestCode.GET_NAMESRV_CONFIG: return this.getConfig(ctx, request); default: break; } 逻辑主体是个switch语句根据RequestCode调用不同的函数来处理从RequestCode可以了解到NameServer的主要功能比如REGISTER_BROKER是在集群中新加入一个Broker机器GET_ROUTEINTO_BY_TOPIC是请求获取一个Topic的路由信息WIPE_WRITE_PERM_OF_BROKER是删除一个Broker的写权限。
4 集群状态存储
NameServer作为集群的协调者需要保存和维护集群的各种元数据这是通过RouteInfoManager类来实现的如代码清单10-6所示。
代码清单10-6 RouteInfoManager的存储结构 private final HashMapString/* topic */, ListQueueData topicQueue-Table; private final HashMapString/* brokerName */, BrokerData brokerAddr-Table; private final HashMapString/* clusterName */, SetString/* brokerName */ clusterAddrTable; private final HashMapString/* brokerAddr */, BrokerLiveInfo brokerLiveTable; private final HashMapString/* brokerAddr */, ListString/* Filter Server */ filterServerTable; public RouteInfoManager() { this.topicQueueTable new HashMapString, ListQueueData(1024); this.brokerAddrTable new HashMapString, BrokerData(128); this.clusterAddrTable new HashMapString, SetString(32); this.brokerLiveTable new HashMapString, BrokerLiveInfo(256); this.filterServerTable new HashMapString, ListString(256); } 每个结构存储着一类集群信息具体含义在第5章有介绍。了解RocketMQ各个角色的功能后对每个结构的处理逻辑就好理解了。下面重点看一下控制访问这些结构的锁机制。
锁分为互斥锁、读写锁也可分为可重入锁、不可重入锁。在NameServer的场景中读取操作多更改操作少所以选择读写锁能大大提高效率。对于如何选择可重入和不可重入锁重点看函数间的调用关系比如多次获取锁的示例代码如果这个lock是不可重入的代码无法正常执行如代码清单10-7所示。
代码清单10-7 多次获取锁示例 Lock lock new Lock(); public void outer() { lock.lock(); inner(); lock.unlock(); } public void inner() { lock.lock(); //do something lock.unlock(); } } RouteInfoManager中使用的是可重入的读写锁private final ReadWriteLock locknew ReentrantReadWriteLock我们以deleteTopic函数为例看一下锁的使用方式如代码清单10-8所示。
代码清单10-8 锁的使用方式 public void deleteTopic(final String topic) { try { try { this.lock.writeLock().lockInterruptibly(); this.topicQueueTable.remove(topic); } finally { this.lock.writeLock().unlock(); } } catch (Exception e) { log.error(deleteTopic Exception, e); } } 首先锁的获取和执行逻辑要放到一个try{}里然后在finally{}中释放。这是一种典型的使用方式我们可以参考这种方式实现自己的代码。