当前位置: 首页 > news >正文

西安东郊网站建设公司php网站开发教程 pdf

西安东郊网站建设公司,php网站开发教程 pdf,网站文明专栏建设,织梦wordpress帝国对比前面分析完Broker启动会启动RemotingServer服务同时会注册Processor处理器#xff0c;接着分析Producer进行消息的发送#xff0c;当Producer发送完消息后就得到Broker去接收Producer发送的消息了。 Producer发送给Broker消息时候#xff0c;发送的请求code为SEND_MESSAGE(这…前面分析完Broker启动会启动RemotingServer服务同时会注册Processor处理器接着分析Producer进行消息的发送当Producer发送完消息后就得到Broker去接收Producer发送的消息了。 Producer发送给Broker消息时候发送的请求code为SEND_MESSAGE(这里在上一章节有过分析)根据消息发送过来的Code这时会调用NettyRemotingAbstract的processRequestCommand方法该方法里面会根据消息传输的Code来取出对应的Processor进入Processor系列类的SendMessageProcessor的asyncProcessRequest方法前面这一部分之前都有过分析接下来我们一起看看后面的操作正好也将之前的知识串在一起更有利于理解和记忆 public CompletableFutureRemotingCommand asyncProcessRequest(ChannelHandlerContext ctx,RemotingCommand request) throws RemotingCommandException {final SendMessageContext mqtraceContext;switch (request.getCode()) {// 消息重回队列case RequestCode.CONSUMER_SEND_MSG_BACK:return this.asyncConsumerSendMsgBack(ctx, request);default:// 解析消息头SendMessageRequestHeader requestHeader parseRequestHeader(request);if (requestHeader null) {return CompletableFuture.completedFuture(null);}// 构建上下文并调用处理前钩子函数mqtraceContext buildMsgContext(ctx, requestHeader);this.executeSendMessageHookBefore(ctx, request, mqtraceContext);// 判断批量消息还是单条消息if (requestHeader.isBatch()) {return this.asyncSendBatchMessage(ctx, request, mqtraceContext, requestHeader);} else {return this.asyncSendMessage(ctx, request, mqtraceContext, requestHeader);}} }首先解析消息头构建上下文处理消息发送前钩子函数最后异步处理消息请求如果是批量消息调用asyncSendBatchMessage方法如果是单条消息调用asyncSendMessage方法。 处理单条消息 private CompletableFutureRemotingCommand asyncSendMessage(ChannelHandlerContext ctx, RemotingCommand request,SendMessageContext mqtraceContext,SendMessageRequestHeader requestHeader) {// 准备响应命令对象final RemotingCommand response preSend(ctx, request, requestHeader);final SendMessageResponseHeader responseHeader (SendMessageResponseHeader)response.readCustomHeader();if (response.getCode() ! -1) {return CompletableFuture.completedFuture(response);}final byte[] body request.getBody();int queueIdInt requestHeader.getQueueId();TopicConfig topicConfig this.brokerController.getTopicConfigManager().selectTopicConfig(requestHeader.getTopic());if (queueIdInt 0) {queueIdInt randomQueueId(topicConfig.getWriteQueueNums());}MessageExtBrokerInner msgInner new MessageExtBrokerInner();msgInner.setTopic(requestHeader.getTopic());msgInner.setQueueId(queueIdInt);if (!handleRetryAndDLQ(requestHeader, response, request, msgInner, topicConfig)) {return CompletableFuture.completedFuture(response);}msgInner.setBody(body);msgInner.setFlag(requestHeader.getFlag());MapString, String origProps MessageDecoder.string2messageProperties(requestHeader.getProperties());MessageAccessor.setProperties(msgInner, origProps);// 时间msgInner.setBornTimestamp(requestHeader.getBornTimestamp());// 远程地址msgInner.setBornHost(ctx.channel().remoteAddress());// 主机msgInner.setStoreHost(this.getStoreHost());// 重试次数msgInner.setReconsumeTimes(requestHeader.getReconsumeTimes() null ? 0 : requestHeader.getReconsumeTimes());String clusterName this.brokerController.getBrokerConfig().getBrokerClusterName();// ...省略CompletableFuturePutMessageResult putMessageResult null;String transFlag origProps.get(MessageConst.PROPERTY_TRANSACTION_PREPARED);// 事务消息if (Boolean.parseBoolean(transFlag)) {if (this.brokerController.getBrokerConfig().isRejectTransactionMessage()) {response.setCode(ResponseCode.NO_PERMISSION);response.setRemark(the broker[ this.brokerController.getBrokerConfig().getBrokerIP1() ] sending transaction message is forbidden);return CompletableFuture.completedFuture(response);}// 事务消息的状态后面再分析putMessageResult this.brokerController.getTransactionalMessageService().asyncPrepareMessage(msgInner);} else {// 消息存储putMessageResult this.brokerController.getMessageStore().asyncPutMessage(msgInner);}// 生成结果返回return handlePutMessageResultFuture(putMessageResult, response, request, msgInner, responseHeader, mqtraceContext, ctx, queueIdInt);}构建MessageExtBrokerInner对象设置相关属性执行asyncPutMessage方法存储消息并将结果返回客户端。 创建响应验证以及自动创建topic // 准备响应验证以及自动创建topic private RemotingCommand preSend(ChannelHandlerContext ctx, RemotingCommand request,SendMessageRequestHeader requestHeader) {// 准备响应final RemotingCommand response RemotingCommand.createResponseCommand(SendMessageResponseHeader.class);// 设置唯一idresponse.setOpaque(request.getOpaque());response.addExtField(MessageConst.PROPERTY_MSG_REGION, this.brokerController.getBrokerConfig().getRegionId());response.addExtField(MessageConst.PROPERTY_TRACE_SWITCH, String.valueOf(this.brokerController.getBrokerConfig().isTraceOn()));log.debug(Receive SendMessage request command {}, request);// 获取broker处理请求服务的起始时间final long startTimestamp this.brokerController.getBrokerConfig().getStartAcceptSendRequestTimeStamp();if (this.brokerController.getMessageStore().now() startTimestamp) {response.setCode(ResponseCode.SYSTEM_ERROR);response.setRemark(String.format(broker unable to service, until %s, UtilAll.timeMillisToHumanString2(startTimestamp)));return response;}response.setCode(-1);// 验证topic以及自动创建逻辑super.msgCheck(ctx, requestHeader, response);return response; }this.brokerController.getBrokerConfig().isAutoCreateTopicEnable()用来判断是否支持自动创建topic根据权限来判断如果是不支持自动创建就将权限设置为可读可写不可继承后面我们去判断是否可以去继承如果能继承就说明支持自动创建这是就会new一个TopicConfig这样就通过autoCreateTopicEnable自动来控制是否能够自动创建topic同时也会调用registerBrokerAll方法注册到Broker路由信息里面当然官方建议我们还是不要开启这个配置因为它没有做到压力的分摊。 存盘 asyncPutMessage方法 根据topic查询对应的路由信息即broker。 public CompletableFuturePutMessageResult asyncPutMessage(final MessageExtBrokerInner msg) { msg.setStoreTimestamp(System.currentTimeMillis());msg.setBodyCRC(UtilAll.crc32(msg.getBody()));AppendMessageResult result null;StoreStatsService storeStatsService this.defaultMessageStore.getStoreStatsService();String topic msg.getTopic();final int tranType MessageSysFlag.getTransactionValue(msg.getSysFlag());if (tranType MessageSysFlag.TRANSACTION_NOT_TYPE|| tranType MessageSysFlag.TRANSACTION_COMMIT_TYPE) {// Delay Delivery// 延迟消息转到系统Topic(后面在分析)if (msg.getDelayTimeLevel() 0) {// ...省略}}// 发送消息地址InetSocketAddress bornSocketAddress (InetSocketAddress) msg.getBornHost();if (bornSocketAddress.getAddress() instanceof Inet6Address) {msg.setBornHostV6Flag();}// 存储消息地址InetSocketAddress storeSocketAddress (InetSocketAddress) msg.getStoreHost();if (storeSocketAddress.getAddress() instanceof Inet6Address) {msg.setStoreHostAddressV6Flag();}// 更新消息大小PutMessageThreadLocal putMessageThreadLocal this.putMessageThreadLocal.get();updateMaxMessageSize(putMessageThreadLocal);if (!multiDispatch.isMultiDispatchMsg(msg)) {PutMessageResult encodeResult putMessageThreadLocal.getEncoder().encode(msg);if (encodeResult ! null) {return CompletableFuture.completedFuture(encodeResult);}msg.setEncodedBuff(putMessageThreadLocal.getEncoder().getEncoderBuffer());}PutMessageContext putMessageContext new PutMessageContext(generateKey(putMessageThreadLocal.getKeyBuilder(), msg));long elapsedTimeInLock 0;MappedFile unlockMappedFile null;// 写入CommitLog文件前加锁,保证文件操作并发安全putMessageLock.lock(); //spin or ReentrantLock ,depending on store configtry {// 获取最后一个mapperFileMappedFile mappedFile this.mappedFileQueue.getLastMappedFile();long beginLockTimestamp this.defaultMessageStore.getSystemClock().now();this.beginTimeInLock beginLockTimestamp;msg.setStoreTimestamp(beginLockTimestamp);// 如果不存在或者满了就创建一个if (null mappedFile || mappedFile.isFull()) {mappedFile this.mappedFileQueue.getLastMappedFile(0); // Mark: NewFile may be cause noise}if (null mappedFile) {log.error(create mapped file1 error, topic: msg.getTopic() clientAddr: msg.getBornHostString());return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, null));}// 实际写入CommitLog在后面追加result mappedFile.appendMessage(msg, this.appendMessageCallback, putMessageContext);switch (result.getStatus()) {// 添加成功直接breakcase PUT_OK:break;// 表示当前文件存放不下只保存了一部分case END_OF_FILE:unlockMappedFile mappedFile;// 创建一个新的文件mappedFile this.mappedFileQueue.getLastMappedFile(0);if (null mappedFile) {// XXX: warn and notify melog.error(create mapped file2 error, topic: msg.getTopic() clientAddr: msg.getBornHostString());return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, result));}// 继续追加进去result mappedFile.appendMessage(msg, this.appendMessageCallback, putMessageContext);break;case MESSAGE_SIZE_EXCEEDED:case PROPERTIES_SIZE_EXCEEDED:return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, result));case UNKNOWN_ERROR:return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result));default:return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result));}// 锁的时间elapsedTimeInLock this.defaultMessageStore.getSystemClock().now() - beginLockTimestamp;} finally {beginTimeInLock 0;putMessageLock.unlock();}if (elapsedTimeInLock 500) {log.warn([NOTIFYME]putMessage in lock cost time(ms){}, bodyLength{} AppendMessageResult{}, elapsedTimeInLock, msg.getBody().length, result);}if (null ! unlockMappedFile this.defaultMessageStore.getMessageStoreConfig().isWarmMapedFileEnable()) {this.defaultMessageStore.unlockMappedFile(unlockMappedFile);}PutMessageResult putMessageResult new PutMessageResult(PutMessageStatus.PUT_OK, result);// StatisticsstoreStatsService.getSinglePutMessageTopicTimesTotal(msg.getTopic()).add(1);storeStatsService.getSinglePutMessageTopicSizeTotal(topic).add(result.getWroteBytes());// 提交刷盘申请CompletableFuturePutMessageStatus flushResultFuture submitFlushRequest(result, msg);// 提交主从复制申请CompletableFuturePutMessageStatus replicaResultFuture submitReplicaRequest(result, msg);return flushResultFuture.thenCombine(replicaResultFuture, (flushStatus, replicaStatus) - {if (flushStatus ! PutMessageStatus.PUT_OK) {putMessageResult.setPutMessageStatus(flushStatus);}if (replicaStatus ! PutMessageStatus.PUT_OK) {putMessageResult.setPutMessageStatus(replicaStatus);}return putMessageResult;});}首先它会去处理延时消息这里我不做过细的分析后面针对各种消息在来具体分析接着就将消息进行编码然后加锁并写入消息以获取最后文件进行追加的方式来将消息内存文件里面最后进行刷盘以及通知主从同步的操作。
http://www.zqtcl.cn/news/528437/

相关文章:

  • 自己做网站卖什么给个网站好人有好报2020免费
  • 网站源码安装步骤网站开发用c 语言
  • 网站首页是什么产品网络推广方案
  • 网站首页制作方案南通市规划建设局网站
  • 网站建设费用兴田德润团队西宁网站策划公司
  • 手机价格网站建设用别人备案域名做违法网站
  • 成都武侯区建设厅官方网站石家庄住房和城乡建设部网站
  • 前端做网站的步骤酉阳网站建设
  • 湖北省住房与建设厅网站php做网站访问记录
  • 做网站的公司没有技术吉林北京网站建设
  • 产品设计培训机构哪家好贵州整站优化seo平台
  • 天津网站制作推广wordpress 果酱
  • 写给初学网站开发们的一封信企业网站建设 ppt
  • 做装修网站多少钱做网站百度一下
  • 用asp做网站的可行性分析9免费建网站
  • 网站域名注册商查询徐州集团网站建设报价
  • 句容网站设计公司做网站充值犯法吗
  • 网站建设所用系统网站备案目的
  • 苏州做网站优化公司哪家好网站的大小
  • 四川省住房和城乡建设厅官方网站网站建设图标图片
  • 做影视网站侵权吗评论凡科网站建设怎么样
  • 建设个人网站流程建设游戏网站需要哪些设备
  • 四字母net做网站怎么样河南做网站优化
  • 怎样做网站快照网站当前位置怎么做
  • 网站模板移植现在c 做网站用什么框架
  • 国内专业的室内设计网站盐城网站开发代理商
  • 外贸网站建设 评价wordpress 函数调用
  • 广告支持模式的网站二级域名做网站域名
  • 空间 两个网站购物网站建设图标大全
  • 17.zwd一起做网站广州网站制作费用