当前位置: 首页 > news >正文

域名购买哪个网站好维护网站

域名购买哪个网站好,维护网站,西安做网站哪里好,天元建设集团有限公司财务分析RocketMQ源码阅读-Producer发消息 1. 从单元测试入手2. 启动过程3. 同步消息发送过程4. 异步消息发送过程5. 小结 Producer是消息的生产者。 Producer和Consummer对Rocket来说都是Client#xff0c;Server是NameServer。 客户端在源码中是一个单独的Model#xff0c;目录为ro… RocketMQ源码阅读-Producer发消息 1. 从单元测试入手2. 启动过程3. 同步消息发送过程4. 异步消息发送过程5. 小结 Producer是消息的生产者。 Producer和Consummer对Rocket来说都是ClientServer是NameServer。 客户端在源码中是一个单独的Model目录为rocketmq/client。 类DefaultMQProducer是Producer的默认入口实现类。继承了类ClientConfig客户端配置类存储上下文配置信息。实现接口MQProducer定义了Producer对外提供的接口也就是所有的发送消息的方法同时MQProducer接口继承了MQAdmin接口。如上图中MQAdmin是元数据管理接口定义了对Message操作的一些方法。 DefaultMQProducer中有一个重要的成员变量 protected final transient DefaultMQProducerImpl defaultMQProducerImpl;DefaultMQProducer的所有操作基本没什么业务逻辑都是调用DefaultMQProducerImpl类中的方法。 DefaultMQProducerImpl类是Producer操作的具体实现类。 1. 从单元测试入手 看源码的流程都是从单元测试入手 Producer的单元测试在类org.apache.rocketmq.client.producer.DefaultMQProducerTest中。单元测试的所有方法就对应着这个类的全部功能。DefaultMQProducerTest的方法列表如下其中 init 和 terminate 是测试开始初始化和测试结束销毁时需要执行的代码。其他的方法是测试不同功能的测试用例也就是测试不同的发消息的方式。 2. 启动过程 从单元测试中的init方法入手 Before public void init() throws Exception {String producerGroupTemp producerGroupPrefix System.currentTimeMillis();// 创建一个Producer并赋予名字producer new DefaultMQProducer(producerGroupTemp);// 设置NameServer的地址producer.setNamesrvAddr(127.0.0.1:9876);// 消息长度大于16开启压缩producer.setCompressMsgBodyOverHowmuch(16);// 创建不同的Messagemessage new Message(topic, new byte[] {a});zeroMsg new Message(topic, new byte[] {});bigMessage new Message(topic, This is a very huge message!.getBytes());// 启动Producerproducer.start();// 设置mQClientFactory和mQClientAPIImpl后面再讲Field field DefaultMQProducerImpl.class.getDeclaredField(mQClientFactory);field.setAccessible(true);field.set(producer.getDefaultMQProducerImpl(), mQClientFactory);field MQClientInstance.class.getDeclaredField(mQClientAPIImpl);field.setAccessible(true);field.set(mQClientFactory, mQClientAPIImpl);// 注册Producerproducer.getDefaultMQProducerImpl().getmQClientFactory().registerProducer(producerGroupTemp, producer.getDefaultMQProducerImpl());when(mQClientAPIImpl.sendMessage(anyString(), anyString(), any(Message.class), any(SendMessageRequestHeader.class), anyLong(), any(CommunicationMode.class),nullable(SendMessageContext.class), any(DefaultMQProducerImpl.class))).thenCallRealMethod();when(mQClientAPIImpl.sendMessage(anyString(), anyString(), any(Message.class), any(SendMessageRequestHeader.class), anyLong(), any(CommunicationMode.class),nullable(SendCallback.class), nullable(TopicPublishInfo.class), nullable(MQClientInstance.class), anyInt(), nullable(SendMessageContext.class), any(DefaultMQProducerImpl.class))).thenReturn(createSendResult(SendStatus.SEND_OK)); }这段代码就是创建了一个DefaultMQProducer设置参数并调用start()方法启动之后注册到NameServer中。 首先看下DefaultMQProducer#start()方法 Override public void start() throws MQClientException {this.setProducerGroup(withNamespace(this.producerGroup));// 直接调用defaultMQProducerImpl的start方法this.defaultMQProducerImpl.start();if (null ! traceDispatcher) {try {traceDispatcher.start(this.getNamesrvAddr(), this.getAccessChannel());} catch (MQClientException e) {log.warn(trace dispatcher start failed , e);}} }其中直接调用了DefaultMQProducerImpl的start方法 public void start() throws MQClientException {this.start(true); }public void start(final boolean startFactory) throws MQClientException {switch (this.serviceState) {case CREATE_JUST:this.serviceState ServiceState.START_FAILED;this.checkConfig();if (!this.defaultMQProducer.getProducerGroup().equals(MixAll.CLIENT_INNER_PRODUCER_GROUP)) {this.defaultMQProducer.changeInstanceNameToPID();}// 获取MQClientInstance实例this.mQClientFactory MQClientManager.getInstance().getAndCreateMQClientInstance(this.defaultMQProducer, rpcHook);// 注册boolean registerOK mQClientFactory.registerProducer(this.defaultMQProducer.getProducerGroup(), this);if (!registerOK) {this.serviceState ServiceState.CREATE_JUST;throw new MQClientException(The producer group[ this.defaultMQProducer.getProducerGroup() ] has been created before, specify another name please. FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),null);}this.topicPublishInfoTable.put(this.defaultMQProducer.getCreateTopicKey(), new TopicPublishInfo());if (startFactory) {// 启动mQClientFactorymQClientFactory.start();}log.info(the producer [{}] start OK. sendMessageWithVIPChannel{}, this.defaultMQProducer.getProducerGroup(),this.defaultMQProducer.isSendMessageWithVIPChannel());this.serviceState ServiceState.RUNNING;break;case RUNNING:case START_FAILED:case SHUTDOWN_ALREADY:throw new MQClientException(The producer service state not OK, maybe started once, this.serviceState FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),null);default:break;}// 给所有的broker发心跳this.mQClientFactory.sendHeartbeatToAllBrokerWithLock(); }RocketMQ 使用一个成员变量 serviceState 来记录和管理自身的服务状态这实际上是状态模式 (State Pattern) 这种设计模式的变种实现。 对于启动过程状态serviceState为CREATE_JUST。CREATE_JUST分支中会获取一个MQClientManager单例模式通过方法getAndCreateMQClientInstance()获取一个MQClientInstance实例赋值给成员变量 private MQClientInstance mQClientFactory;然后调用MQClientInstance的registerProducer()方法将自己注册到MQClientInstance中。随后调用MQClientInstance的start()启动mQClientFactory。最后给所有的broker发心跳。 进一步看MQClientInstance的start()方法 public void start() throws MQClientException {synchronized (this) {switch (this.serviceState) {case CREATE_JUST:this.serviceState ServiceState.START_FAILED;// If not specified,looking address from name serverif (null this.clientConfig.getNamesrvAddr()) {this.mQClientAPIImpl.fetchNameServerAddr();}// Start request-response channelthis.mQClientAPIImpl.start();// Start various schedule tasksthis.startScheduledTask();// Start pull servicethis.pullMessageService.start();// Start rebalance servicethis.rebalanceService.start();// Start push servicethis.defaultMQProducer.getDefaultMQProducerImpl().start(false);log.info(the client factory [{}] start OK, this.clientId);this.serviceState ServiceState.RUNNING;break;case RUNNING:break;case SHUTDOWN_ALREADY:break;case START_FAILED:throw new MQClientException(The Factory object[ this.getClientId() ] has been created before, and failed., null);default:break;}} }这一部分代码的注释比较清楚流程是这样的 启动实例 mQClientAPIImpl其中 mQClientAPIImpl 是类 MQClientAPIImpl 的实例封装了客户端与 Broker 通信的方法启动各种定时任务包括与 Broker 之间的定时心跳定时与 NameServer 同步数据等任务启动拉取消息服务启动 Rebalance 服务启动默认的 Producer 服务。 以上就是 Producer 的启动流程。 3. 同步消息发送过程 分析 Producer 发送消息的流程。接口 MQProducer 中定义了 19 个不同参数的发消息的方法。这19个接口可以分为3类 单向发送Oneway发送消息后立即返回不处理响应不关心是否发送成功同步发送Sync发送消息后等待响应异步发送Async发送消息后立即返回在提供的回调方法中处理响应。 先看下同步发送消息的方法异步发送消息只是将同步发送方法提交给线程池。DefaultMQProducer中对同步发送方法的实现为 Override public SendResult send(Message msg) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {Validators.checkMessage(msg, this);msg.setTopic(withNamespace(msg.getTopic()));return this.defaultMQProducerImpl.send(msg); }实际调用的为DefaultMQProducerImpl的send()方法 public SendResult send(Message msg) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {return send(msg, this.defaultMQProducer.getSendMsgTimeout()); }public SendResult send(Message msg,long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {return this.sendDefaultImpl(msg, CommunicationMode.SYNC, null, timeout); }最终调用的方法为DefaultMQProducerImpl的sendDefaultImpl()方法源码位置为517行源码偏长在下方进行解读 private SendResult sendDefaultImpl(Message msg,final CommunicationMode communicationMode,final SendCallback sendCallback,final long timeout ) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {this.makeSureStateOK();Validators.checkMessage(msg, this.defaultMQProducer);final long invokeID random.nextLong();long beginTimestampFirst System.currentTimeMillis();long beginTimestampPrev beginTimestampFirst;long endTimestamp beginTimestampFirst;// 获取Topic信息TopicPublishInfo topicPublishInfo this.tryToFindTopicPublishInfo(msg.getTopic());if (topicPublishInfo ! null topicPublishInfo.ok()) {boolean callTimeout false;MessageQueue mq null;Exception exception null;SendResult sendResult null;int timesTotal communicationMode CommunicationMode.SYNC ? 1 this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1;int times 0;String[] brokersSent new String[timesTotal];for (; times timesTotal; times) {String lastBrokerName null mq ? null : mq.getBrokerName();MessageQueue mqSelected this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);if (mqSelected ! null) {mq mqSelected;brokersSent[times] mq.getBrokerName();try {beginTimestampPrev System.currentTimeMillis();if (times 0) {//Reset topic with namespace during resend.msg.setTopic(this.defaultMQProducer.withNamespace(msg.getTopic()));}long costTime beginTimestampPrev - beginTimestampFirst;if (timeout costTime) {callTimeout true;break;}// 发消息sendResult this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime);endTimestamp System.currentTimeMillis();this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);switch (communicationMode) {case ASYNC:return null;case ONEWAY:return null;case SYNC:if (sendResult.getSendStatus() ! SendStatus.SEND_OK) {if (this.defaultMQProducer.isRetryAnotherBrokerWhenNotStoreOK()) {continue;}}return sendResult;default:break;}} catch (RemotingException e) {endTimestamp System.currentTimeMillis();this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);log.warn(String.format(sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s, invokeID, endTimestamp - beginTimestampPrev, mq), e);log.warn(msg.toString());exception e;continue;} catch (MQClientException e) {endTimestamp System.currentTimeMillis();this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);log.warn(String.format(sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s, invokeID, endTimestamp - beginTimestampPrev, mq), e);log.warn(msg.toString());exception e;continue;} catch (MQBrokerException e) {endTimestamp System.currentTimeMillis();this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);log.warn(String.format(sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s, invokeID, endTimestamp - beginTimestampPrev, mq), e);log.warn(msg.toString());exception e;switch (e.getResponseCode()) {case ResponseCode.TOPIC_NOT_EXIST:case ResponseCode.SERVICE_NOT_AVAILABLE:case ResponseCode.SYSTEM_ERROR:case ResponseCode.NO_PERMISSION:case ResponseCode.NO_BUYER_ID:case ResponseCode.NOT_IN_CURRENT_UNIT:continue;default:if (sendResult ! null) {return sendResult;}throw e;}} catch (InterruptedException e) {endTimestamp System.currentTimeMillis();this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);log.warn(String.format(sendKernelImpl exception, throw exception, InvokeID: %s, RT: %sms, Broker: %s, invokeID, endTimestamp - beginTimestampPrev, mq), e);log.warn(msg.toString());log.warn(sendKernelImpl exception, e);log.warn(msg.toString());throw e;}} else {break;}}if (sendResult ! null) {return sendResult;}String info String.format(Send [%d] times, still failed, cost [%d]ms, Topic: %s, BrokersSent: %s,times,System.currentTimeMillis() - beginTimestampFirst,msg.getTopic(),Arrays.toString(brokersSent));info FAQUrl.suggestTodo(FAQUrl.SEND_MSG_FAILED);MQClientException mqClientException new MQClientException(info, exception);if (callTimeout) {throw new RemotingTooMuchRequestException(sendDefaultImpl call timeout);}if (exception instanceof MQBrokerException) {mqClientException.setResponseCode(((MQBrokerException) exception).getResponseCode());} else if (exception instanceof RemotingConnectException) {mqClientException.setResponseCode(ClientErrorCode.CONNECT_BROKER_EXCEPTION);} else if (exception instanceof RemotingTimeoutException) {mqClientException.setResponseCode(ClientErrorCode.ACCESS_BROKER_TIMEOUT);} else if (exception instanceof MQClientException) {mqClientException.setResponseCode(ClientErrorCode.BROKER_NOT_EXIST_EXCEPTION);}throw mqClientException;}ListString nsList this.getmQClientFactory().getMQClientAPIImpl().getNameServerAddressList();if (null nsList || nsList.isEmpty()) {throw new MQClientException(No name server address, please set it. FAQUrl.suggestTodo(FAQUrl.NAME_SERVER_ADDR_NOT_EXIST_URL), null).setResponseCode(ClientErrorCode.NO_NAME_SERVER_EXCEPTION);}throw new MQClientException(No route info of this topic, msg.getTopic() FAQUrl.suggestTodo(FAQUrl.NO_TOPIC_ROUTE_INFO),null).setResponseCode(ClientErrorCode.NOT_FOUND_TOPIC_EXCEPTION); }首先是获取Topic信息然后调用sendKernelImpl()方法发送消息sendKernelImpl()源码如下 private SendResult sendKernelImpl(final Message msg,final MessageQueue mq,final CommunicationMode communicationMode,final SendCallback sendCallback,final TopicPublishInfo topicPublishInfo,final long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {long beginStartTime System.currentTimeMillis();String brokerAddr this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());if (null brokerAddr) {tryToFindTopicPublishInfo(mq.getTopic());brokerAddr this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());}SendMessageContext context null;if (brokerAddr ! null) {brokerAddr MixAll.brokerVIPChannel(this.defaultMQProducer.isSendMessageWithVIPChannel(), brokerAddr);byte[] prevBody msg.getBody();try {//for MessageBatch,ID has been set in the generating processif (!(msg instanceof MessageBatch)) {MessageClientIDSetter.setUniqID(msg);}boolean topicWithNamespace false;if (null ! this.mQClientFactory.getClientConfig().getNamespace()) {msg.setInstanceId(this.mQClientFactory.getClientConfig().getNamespace());topicWithNamespace true;}int sysFlag 0;boolean msgBodyCompressed false;if (this.tryToCompressMessage(msg)) {sysFlag | MessageSysFlag.COMPRESSED_FLAG;msgBodyCompressed true;}final String tranMsg msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);if (tranMsg ! null Boolean.parseBoolean(tranMsg)) {sysFlag | MessageSysFlag.TRANSACTION_PREPARED_TYPE;}if (hasCheckForbiddenHook()) {CheckForbiddenContext checkForbiddenContext new CheckForbiddenContext();checkForbiddenContext.setNameSrvAddr(this.defaultMQProducer.getNamesrvAddr());checkForbiddenContext.setGroup(this.defaultMQProducer.getProducerGroup());checkForbiddenContext.setCommunicationMode(communicationMode);checkForbiddenContext.setBrokerAddr(brokerAddr);checkForbiddenContext.setMessage(msg);checkForbiddenContext.setMq(mq);checkForbiddenContext.setUnitMode(this.isUnitMode());this.executeCheckForbiddenHook(checkForbiddenContext);}if (this.hasSendMessageHook()) {context new SendMessageContext();context.setProducer(this);context.setProducerGroup(this.defaultMQProducer.getProducerGroup());context.setCommunicationMode(communicationMode);context.setBornHost(this.defaultMQProducer.getClientIP());context.setBrokerAddr(brokerAddr);context.setMessage(msg);context.setMq(mq);context.setNamespace(this.defaultMQProducer.getNamespace());String isTrans msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);if (isTrans ! null isTrans.equals(true)) {context.setMsgType(MessageType.Trans_Msg_Half);}if (msg.getProperty(__STARTDELIVERTIME) ! null || msg.getProperty(MessageConst.PROPERTY_DELAY_TIME_LEVEL) ! null) {context.setMsgType(MessageType.Delay_Msg);}this.executeSendMessageHookBefore(context);}SendMessageRequestHeader requestHeader new SendMessageRequestHeader();requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup());requestHeader.setTopic(msg.getTopic());requestHeader.setDefaultTopic(this.defaultMQProducer.getCreateTopicKey());requestHeader.setDefaultTopicQueueNums(this.defaultMQProducer.getDefaultTopicQueueNums());requestHeader.setQueueId(mq.getQueueId());requestHeader.setSysFlag(sysFlag);requestHeader.setBornTimestamp(System.currentTimeMillis());requestHeader.setFlag(msg.getFlag());requestHeader.setProperties(MessageDecoder.messageProperties2String(msg.getProperties()));requestHeader.setReconsumeTimes(0);requestHeader.setUnitMode(this.isUnitMode());requestHeader.setBatch(msg instanceof MessageBatch);if (requestHeader.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {String reconsumeTimes MessageAccessor.getReconsumeTime(msg);if (reconsumeTimes ! null) {requestHeader.setReconsumeTimes(Integer.valueOf(reconsumeTimes));MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_RECONSUME_TIME);}String maxReconsumeTimes MessageAccessor.getMaxReconsumeTimes(msg);if (maxReconsumeTimes ! null) {requestHeader.setMaxReconsumeTimes(Integer.valueOf(maxReconsumeTimes));MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_MAX_RECONSUME_TIMES);}}SendResult sendResult null;switch (communicationMode) {case ASYNC:Message tmpMessage msg;boolean messageCloned false;if (msgBodyCompressed) {//If msg body was compressed, msgbody should be reset using prevBody.//Clone new message using commpressed message body and recover origin massage.//Fix bug:https://github.com/apache/rocketmq-externals/issues/66tmpMessage MessageAccessor.cloneMessage(msg);messageCloned true;msg.setBody(prevBody);}if (topicWithNamespace) {if (!messageCloned) {tmpMessage MessageAccessor.cloneMessage(msg);messageCloned true;}msg.setTopic(NamespaceUtil.withoutNamespace(msg.getTopic(), this.defaultMQProducer.getNamespace()));}long costTimeAsync System.currentTimeMillis() - beginStartTime;if (timeout costTimeAsync) {throw new RemotingTooMuchRequestException(sendKernelImpl call timeout);}sendResult this.mQClientFactory.getMQClientAPIImpl().sendMessage(brokerAddr,mq.getBrokerName(),tmpMessage,requestHeader,timeout - costTimeAsync,communicationMode,sendCallback,topicPublishInfo,this.mQClientFactory,this.defaultMQProducer.getRetryTimesWhenSendAsyncFailed(),context,this);break;case ONEWAY:case SYNC:long costTimeSync System.currentTimeMillis() - beginStartTime;if (timeout costTimeSync) {throw new RemotingTooMuchRequestException(sendKernelImpl call timeout);}// 同步发送消息sendResult this.mQClientFactory.getMQClientAPIImpl().sendMessage(brokerAddr,mq.getBrokerName(),msg,requestHeader,timeout - costTimeSync,communicationMode,context,this);break;default:assert false;break;}if (this.hasSendMessageHook()) {context.setSendResult(sendResult);this.executeSendMessageHookAfter(context);}return sendResult;} catch (RemotingException e) {if (this.hasSendMessageHook()) {context.setException(e);this.executeSendMessageHookAfter(context);}throw e;} catch (MQBrokerException e) {if (this.hasSendMessageHook()) {context.setException(e);this.executeSendMessageHookAfter(context);}throw e;} catch (InterruptedException e) {if (this.hasSendMessageHook()) {context.setException(e);this.executeSendMessageHookAfter(context);}throw e;} finally {msg.setBody(prevBody);msg.setTopic(NamespaceUtil.withoutNamespace(msg.getTopic(), this.defaultMQProducer.getNamespace()));}}throw new MQClientException(The broker[ mq.getBrokerName() ] not exist, null); }也很长但是逻辑简单主要功能就是构建发送消息的头 RequestHeader 和上下文 SendMessageContext然后调用方法 MQClientAPIImpl#sendMessage()上述代码第150行将消息发送给队列所在的 Broker。 至此消息被发送给远程调用的封装类 MQClientAPIImpl完成后续序列化和网络传输等步骤。 这一篇主要看发送代码后续序列化和网络传输等步骤的代码后面再去探究。 ps推荐IEDA插件SequenceDiagram能一键生成时序图 整体的时序图为 4. 异步消息发送过程 上一节讲到异步发送消息只是将同步发送方法提交给线程池。对于MQProducer接口中方法为 void send(final Message msg, final SendCallback sendCallback, final long timeout)throws MQClientException, RemotingException, InterruptedException;也就是带有回调方法DefaultMQProducer中的实现为 Override public void send(Message msg, SendCallback sendCallback, long timeout) throws MQClientException, RemotingException, InterruptedException {msg.setTopic(withNamespace(msg.getTopic()));this.defaultMQProducerImpl.send(msg, sendCallback, timeout); }调用了DefaultMQProducerImpl的send方法 Deprecated public void send(final Message msg, final SendCallback sendCallback, final long timeout) throws MQClientException, RemotingException, InterruptedException {final long beginStartTime System.currentTimeMillis();ExecutorService executor this.getAsyncSenderExecutor();try {executor.submit(new Runnable() {Overridepublic void run() {long costTime System.currentTimeMillis() - beginStartTime;if (timeout costTime) {try {sendDefaultImpl(msg, CommunicationMode.ASYNC, sendCallback, timeout - costTime);} catch (Exception e) {sendCallback.onException(e);}} else {sendCallback.onException(new RemotingTooMuchRequestException(DEFAULT ASYNC send call timeout));}}});} catch (RejectedExecutionException e) {throw new MQClientException(executor rejected , e);}}可以看到实际上是将DefaultMQProducerImpl的sendDefaultImpl()方法对应源码517行提交到线程池执行后面的流程都和同步发送一致参看上一节分析。 异步发送消息时序图为至此异步消息发送也完成了。 5. 小结 MQProducer定义了19种发送消息的方法其默认实现为DefaultMQProducer。DefaultMQProducer中的方法没有业务逻辑最终会调用DefaultMQProducerImpl类中的具体逻辑。 DefaultMQProducerImpl中发消息的方法最终都会调用其sendKernelImpl()方法其主要功能就是构建发送消息的头 RequestHeader 和上下文 SendMessageContext然后调用方法 MQClientAPIImpl#sendMessage()将消息发送给队列所在的 Broker。 最终消息被发送给远程调用的封装类 MQClientAPIImpl完成后续序列化和网络传输等步骤。 类之间的关系为
http://www.zqtcl.cn/news/759585/

相关文章:

  • 网站策划报告公司简介模板范文高大上
  • 做信息图的免费网站如何获取网站是哪个公司制作
  • 乐清建设网站哪家好seo一个月赚多少钱
  • 哈尔滨专业官网建站企业h5公众号开发
  • 商城网站建设精英wordpress实例配置
  • 国内网站开发语言模板兔自用主题WordPress
  • 天津营销网站建设公司哪家好市场营销平台
  • 上海企业响应式网站建设推荐网站建设类织梦模板
  • 洛阳最好的做网站的公司哪家好信誉好的邢台做网站
  • 织梦 旅游网站模板seo百家外链网站
  • 做网站提升公司形象摄影网站建设任务书
  • wordpress建站不好用wordpress共用用户多站点
  • 企业网站设计请示杭州做企业网站的公司
  • 苏宁易购网站建设的不足之处wordpress myisam
  • 互联网站建设维护是做什么的网站建设模板成功案例
  • 制作网站需要什么语言wordpress 免签约支付宝
  • 西安网站开发的未来发展易企网络网站建设
  • 贵州做网站怎么推广vs2012 做网站教程
  • 完全菜鸟七天学会建网站网络营销的四大基础理论
  • 东莞网站优化案例网站职业技术培训学校
  • 银川网站建设公司电话公司在百度做网站找谁
  • 交换链接适用于哪些网站网络规划与设计的目的
  • 网站做标签寺院网站模板
  • 高端h5网站柳州建站
  • 百度商桥网站郑州有做网站的公司没
  • 做专业网站济南品牌网站建设低价
  • 网站制作客户寻找数据中台厂商
  • 免费找图片素材的网站西安企业seo
  • 网站建设 名词解释国内网站建设建设
  • 文山州建设局网站域名查询seo