当前位置: 首页 > news >正文

如何让百度搜到我的网站杭州做网站建设

如何让百度搜到我的网站,杭州做网站建设,手机做任务的网站,天津网站建设维护大家好#xff0c;我是烤鸭#xff1a; 上一篇介绍了注册中心#xff0c;这一篇看下broker。基于 rocketmq 4.9 版本。 BrokerStartup#BrokerController 按照代码的先后顺序撸源码#xff1a; BrokerController.createBrokerController public static BrokerController…大家好我是烤鸭 上一篇介绍了注册中心这一篇看下broker。基于 rocketmq 4.9 版本。 BrokerStartup#BrokerController 按照代码的先后顺序撸源码 BrokerController.createBrokerController public static BrokerController createBrokerController(String[] args) {// ...try {// ...final MessageStoreConfig messageStoreConfig new MessageStoreConfig();// salve节点的messageMmeory 比例由40% 降低至 30%if (BrokerRole.SLAVE messageStoreConfig.getBrokerRole()) {int ratio messageStoreConfig.getAccessMessageInMemoryMaxRatio() - 10;messageStoreConfig.setAccessMessageInMemoryMaxRatio(ratio);}// ...switch (messageStoreConfig.getBrokerRole()) {case ASYNC_MASTER:case SYNC_MASTER:// 0 是masterbrokerConfig.setBrokerId(MixAll.MASTER_ID);break;case SLAVE:if (brokerConfig.getBrokerId() 0) {System.out.printf(Slaves brokerId must be 0);System.exit(-3);}break;default:break;}// DLeger模式,brokerId 为 -1if (messageStoreConfig.isEnableDLegerCommitLog()) {brokerConfig.setBrokerId(-1);}// ...final BrokerController controller new BrokerController(brokerConfig,nettyServerConfig,nettyClientConfig,messageStoreConfig);// remember all configs to prevent discardcontroller.getConfiguration().registerConfig(properties);// 初始化boolean initResult controller.initialize();if (!initResult) {controller.shutdown();System.exit(-3);}// 注册 shutdown 事件Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {private volatile boolean hasShutdown false;private AtomicInteger shutdownTimes new AtomicInteger(0);Overridepublic void run() {synchronized (this) {log.info(Shutdown hook was invoked, {}, this.shutdownTimes.incrementAndGet());if (!this.hasShutdown) {this.hasShutdown true;long beginTime System.currentTimeMillis();controller.shutdown();long consumingTimeTotal System.currentTimeMillis() - beginTime;log.info(Shutdown hook over, consuming total time(ms): {}, consumingTimeTotal);}}}}, ShutdownHook));return controller;} catch (Throwable e) {e.printStackTrace();System.exit(-1);}return null; }BrokerController.initialize() public boolean initialize() throws CloneNotSupportedException {// 加载本地配置文件(topic,consumerOffset等信息),加载不到加载 .bak文件boolean result this.topicConfigManager.load();result result this.consumerOffsetManager.load();result result this.subscriptionGroupManager.load();result result this.consumerFilterManager.load();// 初始化 messageStore(持久化)if (result) {try {this.messageStore new DefaultMessageStore(this.messageStoreConfig, this.brokerStatsManager, this.messageArrivingListener,this.brokerConfig);if (messageStoreConfig.isEnableDLegerCommitLog()) {DLedgerRoleChangeHandler roleChangeHandler new DLedgerRoleChangeHandler(this, (DefaultMessageStore) messageStore);((DLedgerCommitLog)((DefaultMessageStore) messageStore).getCommitLog()).getdLedgerServer().getdLedgerLeaderElector().addRoleChangeHandler(roleChangeHandler);}this.brokerStats new BrokerStats((DefaultMessageStore) this.messageStore);//load pluginMessageStorePluginContext context new MessageStorePluginContext(messageStoreConfig, brokerStatsManager, messageArrivingListener, brokerConfig);this.messageStore MessageStoreFactory.build(context, this.messageStore);this.messageStore.getDispatcherList().addFirst(new CommitLogDispatcherCalcBitMap(this.brokerConfig, this.consumerFilterManager));} catch (IOException e) {result false;log.error(Failed to initialize, e);}}// commitlog、consumer和topic关系、索引恢复(临时文件存在的话)result result this.messageStore.load();if (result) {this.remotingServer new NettyRemotingServer(this.nettyServerConfig, this.clientHousekeepingService);// netty 配置NettyServerConfig fastConfig (NettyServerConfig) this.nettyServerConfig.clone();fastConfig.setListenPort(nettyServerConfig.getListenPort() - 2);this.fastRemotingServer new NettyRemotingServer(fastConfig, this.clientHousekeepingService);this.sendMessageExecutor new BrokerFixedThreadPoolExecutor(this.brokerConfig.getSendMessageThreadPoolNums(),this.brokerConfig.getSendMessageThreadPoolNums(),1000 * 60,TimeUnit.MILLISECONDS,this.sendThreadPoolQueue,new ThreadFactoryImpl(SendMessageThread_));// 不同的线程池注册到对应的processorthis.pullMessageExecutor new BrokerFixedThreadPoolExecutor(this.brokerConfig.getPullMessageThreadPoolNums(),this.brokerConfig.getPullMessageThreadPoolNums(),1000 * 60,TimeUnit.MILLISECONDS,this.pullThreadPoolQueue,new ThreadFactoryImpl(PullMessageThread_));// ...// 线程池注册到processor,后续仔细说下this.registerProcessor();final long initialDelay UtilAll.computeNextMorningTimeMillis() - System.currentTimeMillis();final long period 1000 * 60 * 60 * 24;// 延迟1天,每天记录昨天存取的消息数量this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {Overridepublic void run() {try {BrokerController.this.getBrokerStats().record();} catch (Throwable e) {log.error(schedule record error., e);}}}, initialDelay, period, TimeUnit.MILLISECONDS);// 每隔5s检查是否更新consumer和offset数据this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {Overridepublic void run() {try {BrokerController.this.consumerOffsetManager.persist();} catch (Throwable e) {log.error(schedule persist consumerOffset error., e);}}}, 1000 * 10, this.brokerConfig.getFlushConsumerOffsetInterval(), TimeUnit.MILLISECONDS);// 每隔10s检查是否更新consumer过滤规则数据this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {Overridepublic void run() {try {BrokerController.this.consumerFilterManager.persist();} catch (Throwable e) {log.error(schedule persist consumer filter error., e);}}}, 1000 * 10, 1000 * 10, TimeUnit.MILLISECONDS);// 每隔3分钟检查,开启consumer消费过慢后移除该consumer(默认关闭)this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {Overridepublic void run() {try {BrokerController.this.protectBroker();} catch (Throwable e) {log.error(protectBroker error., e);}}}, 3, 3, TimeUnit.MINUTES);// 每秒打印send\pull\query\transaction队列大小和的最慢的消费耗时this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {Overridepublic void run() {try {BrokerController.this.printWaterMark();} catch (Throwable e) {log.error(printWaterMark error., e);}}}, 10, 1, TimeUnit.SECONDS);// 主broker同步从broker的时候重试this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {Overridepublic void run() {try {log.info(dispatch behind commit log {} bytes, BrokerController.this.getMessageStore().dispatchBehindBytes());} catch (Throwable e) {log.error(schedule dispatchBehindBytes error., e);}}}, 1000 * 10, 1000 * 60, TimeUnit.MILLISECONDS);// 没配置注册中心的话,会每隔2分钟去拉取(url为默认读取系统变量 rocketmq.namesrv.domain:8080/rocketmq)if (this.brokerConfig.getNamesrvAddr() ! null) {this.brokerOuterAPI.updateNameServerAddressList(this.brokerConfig.getNamesrvAddr());log.info(Set user specified name server address: {}, this.brokerConfig.getNamesrvAddr());} else if (this.brokerConfig.isFetchNamesrvAddrByAddressServer()) {this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {Overridepublic void run() {try {BrokerController.this.brokerOuterAPI.fetchNameServerAddr();} catch (Throwable e) {log.error(ScheduledTask fetchNameServerAddr exception, e);}}}, 1000 * 10, 1000 * 60 * 2, TimeUnit.MILLISECONDS);}// DLeger模式,从节点不需要定期更新 HA主节点地址if (!messageStoreConfig.isEnableDLegerCommitLog()) {if (BrokerRole.SLAVE this.messageStoreConfig.getBrokerRole()) {if (this.messageStoreConfig.getHaMasterAddress() ! null this.messageStoreConfig.getHaMasterAddress().length() 6) {this.messageStore.updateHaMasterAddress(this.messageStoreConfig.getHaMasterAddress());this.updateMasterHAServerAddrPeriodically false;} else {this.updateMasterHAServerAddrPeriodically true;}} else {// 每分钟打印主节点和从节点的offset的不同this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {Overridepublic void run() {try {BrokerController.this.printMasterAndSlaveDiff();} catch (Throwable e) {log.error(schedule printMasterAndSlaveDiff error., e);}}}, 1000 * 10, 1000 * 60, TimeUnit.MILLISECONDS);}}// tls模式需要证书建立sslif (TlsSystemConfig.tlsMode ! TlsMode.DISABLED) {// Register a listener to reload SslContexttry {fileWatchService new FileWatchService(new String[] {TlsSystemConfig.tlsServerCertPath,TlsSystemConfig.tlsServerKeyPath,TlsSystemConfig.tlsServerTrustCertPath},new FileWatchService.Listener() {boolean certChanged, keyChanged false;Overridepublic void onChanged(String path) {if (path.equals(TlsSystemConfig.tlsServerTrustCertPath)) {log.info(The trust certificate changed, reload the ssl context);reloadServerSslContext();}if (path.equals(TlsSystemConfig.tlsServerCertPath)) {certChanged true;}if (path.equals(TlsSystemConfig.tlsServerKeyPath)) {keyChanged true;}if (certChanged keyChanged) {log.info(The certificate and private key changed, reload the ssl context);certChanged keyChanged false;reloadServerSslContext();}}private void reloadServerSslContext() {((NettyRemotingServer) remotingServer).loadSslContext();((NettyRemotingServer) fastRemotingServer).loadSslContext();}});} catch (Exception e) {log.warn(FileWatchService created error, cant load the certificate dynamically);}}// 事务初始化initialTransaction();// acl鉴权初始化initialAcl();// rpc钩子初始化(acl和匿名的)initialRpcHooks();}return result; }BrokerStartup.start() public void start() throws Exception {// 消息存储,包含 commitLog 和 集群模式下消息同步if (this.messageStore ! null) {this.messageStore.start();}// nettyServer 的初始化,初探(二)有详细的if (this.remotingServer ! null) {this.remotingServer.start();}// 同上if (this.fastRemotingServer ! null) {this.fastRemotingServer.start();}// 每500ms监测 tls的3个证书,如果变了就重新加载(tls.server.certPath...)if (this.fileWatchService ! null) {this.fileWatchService.start();}// nettyRemotingClient.satrt,一会单独看下if (this.brokerOuterAPI ! null) {this.brokerOuterAPI.start();}// 需要hold的拉取请求统一处理,每隔5s或1s检测消息是否到达if (this.pullRequestHoldService ! null) {this.pullRequestHoldService.start();}// 每10s扫描 producer、consumer、broker 的在线情况 if (this.clientHousekeepingService ! null) {this.clientHousekeepingService.start();}// 过滤服务器,在broker机器启动多个filter进程用来进程consumer消息过滤if (this.filterServerManager ! null) {this.filterServerManager.start();}// 启用DLedger,slave 每隔10s更新配置broker注册到nameserverif (!messageStoreConfig.isEnableDLegerCommitLog()) {startProcessorByHa(messageStoreConfig.getBrokerRole());handleSlaveSynchronize(messageStoreConfig.getBrokerRole());this.registerBrokerAll(true, false, true);}// 随机 30-60s,定时 broker注册到nameserverthis.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {Overridepublic void run() {try {BrokerController.this.registerBrokerAll(true, false, brokerConfig.isForceRegister());} catch (Throwable e) {log.error(registerBrokerAll Exception, e);}}}, 1000 * 10, Math.max(10000, Math.min(brokerConfig.getRegisterNameServerPeriod(), 60000)), TimeUnit.MILLISECONDS);if (this.brokerStatsManager ! null) {this.brokerStatsManager.start();}// 快速失败,如果是刷盘过慢,返回 system busy清理队列(发送、拉取、心跳、请求)if (this.brokerFastFailure ! null) {this.brokerFastFailure.start();}}小结 这篇主要是介绍了broker的init和start。 DLegerCommit开启模式(替代原有的commitLog可以直接读取CommitLog的API) 初始化 消费持久化 messageStore commitlog、consumer和topic关系、索引恢复(临时文件存在的话) netty配置(remotingServer 和 fastRemotingServer) processor(PullMessage、SendMessage 等) 定时器(记录消费总量、更新consumer和offset数据 等) acl鉴权、事务、rpchook 启动 消息持久化 messageStorenetty server启动(remotingServer 和 fastRemotingServer)tls模式检测证书变化pullRequestHold模式下启动监听消息每10s扫描 producer、consumer、broker 的在线情况启用DLedger,slave 每隔10s更新配置broker注册到nameserver随机 30-60s,定时 broker注册到nameserver快速失败,如果是刷盘过慢,返回 system busy清理队列(发送、拉取、心跳、请求)
http://www.zqtcl.cn/news/755320/

相关文章:

  • 企业宣传网站建设图示《高性能网站建设》
  • 福州志愿者官方网站怎么做erp管理系统介绍
  • 高端网站建设费用情况广州开发区控股集团有限公司
  • 精湛的网站设计云南网招聘
  • 南昌网站建设公司行情Wordpress添加分页按钮
  • 论坛网站建设流程wordpress速度优化插件
  • PHP套模板做网站建设银行保定分行网站
  • 怎样免费注册网站域名wordpress网站回调域
  • 东莞个人免费建网站乐清网约车事件
  • 备案查询网站网站的登录弹窗怎么做
  • 网站开发 mvc北京建设工程主管部门网站
  • 淮安建设机械网站制作代理公司注册需要多少钱
  • 站长收录茌平建设局网站
  • 如何进行网站开发开发区人才
  • 网站制作 视频网站维护的主要内容包括
  • 快速企业建站深圳网站关键词优化推广
  • 如何开网店详细步骤东莞市网络seo推广
  • 个人可以做哪些有意思的网站网站和网站的app
  • 北京高端网站开发公司网站建设后台实训体会
  • 青岛海川建设集团有限公司网站wordpress 变私有云
  • 网站备案人可以改吗石大网页设计与网站建设客观题
  • 宁波网站优化方案免费关键词挖掘工具
  • 网站制作想法免费做效果图网站
  • 晓风彩票网站建设软件微信上发的链接网站怎么做的
  • 关键词有哪几种台州优秀关键词优化
  • 盐田区住房和建设局网站软件开发文档怎么编写
  • 网站响应式建设seo排名优化怎样
  • 山东 网站备案德清县建设局网站
  • 中英语双语网站咋做提供网站建设设计外包
  • 云网站功能江门网站seo关键词排名优化