怎么做网站访问量,上海网站排名提升,养老保险网站,电子商务怎样建立网站的fdbus中的消息如何发送出去#xff0c;前面的文章有的讲的很详细了#xff0c;但是对于如何接收消息涉及的较少#xff0c;本篇重点讲述fdbus是如何接收消息及消息在传递过程、传递方式#xff08;零拷贝#xff1f;#xff09;。
还是从通过源码来讲述吧#xff0c;更…fdbus中的消息如何发送出去前面的文章有的讲的很详细了但是对于如何接收消息涉及的较少本篇重点讲述fdbus是如何接收消息及消息在传递过程、传递方式零拷贝。
还是从通过源码来讲述吧更简单一些消息的接收肯定是通过socket来实现的但是我并没有找到循环读取socket的地方只能从中间部分的函数讲起但是这一部分已经能够将消息接收的处理讲明白了。话不多少上代码
void CFdbSession::onInput()
{if (fatalError()){goto _quit;}uint8_t prefix_buffer[CFdbMessage::mPrefixSize];if (receiveData(prefix_buffer, sizeof(prefix_buffer))){parsePrefix(prefix_buffer);}if (fatalError()){goto _quit;}if (receiveData(mPayloadBuffer CFdbMessage::mPrefixSize,mMsgPrefix.mTotalLength - CFdbMessage::mPrefixSize)){processPayload();}_quit:if (mPayloadBuffer){delete[] mPayloadBuffer;mPayloadBuffer 0;}
}
通过上面的代码可以看到通过调用receiveData方法获取消息结构中的prefix然后通过调用parsePrefix解析消息中的prefix部分。然后调用receiveData获取消息结构中的消息内容最后调用processPayload进行消息处理processPayload方法的源码如下
void CBaseSession::processPayload()
{const uint8_t *data mPayloadBuffer CFdbMessage::mPrefixSize;int32_t size mMsgPrefix.mTotalLength - CFdbMessage::mPrefixSize;if (size 0){fatalError(true);return;}CFdbMessageHeader head;CFdbParcelableParser parser(head);if (!parser.parse(data, mMsgPrefix.mHeadLength)){LOG_E(CBaseSession: Session %d: Unable to deserialize message head!\n, sid());fatalError(true);return;}auto type head.type();switch (type){case FDB_MT_REQUEST:case FDB_MT_SIDEBAND_REQUEST:case FDB_MT_GET_EVENT:case FDB_MT_PUBLISH:case FDB_MT_SET_EVENT:doStatistics(type, head.flag(), mStatistics.mRx);doRequest(head);break;case FDB_MT_RETURN_EVENT:case FDB_MT_REPLY:case FDB_MT_SIDEBAND_REPLY:case FDB_MT_STATUS:doResponse(head);break;case FDB_MT_BROADCAST:doStatistics(type, head.flag(), mStatistics.mRx);doBroadcast(head);break;case FDB_MT_SUBSCRIBE_REQ:if (head.code() FDB_CODE_SUBSCRIBE){doSubscribeReq(head, true);}else if (head.code() FDB_CODE_UNSUBSCRIBE){doSubscribeReq(head, false);}break;default:LOG_E(CBaseSession: Message %d: Unknown type!\n, (int32_t)head.serial_number());fatalError(true);break;}
}首先获取消息内容从消息头部开始首地址data和消息内容长度调用CFdbParcelableParser类解析消息头部的内容获取消息头部后就获取了消息的基本信息其中就包括消息类型mType。根据消息类型的不同进入不同的分支进行处理这里着重介绍FDB_MT_REPLY类型因为这个是消息响应类型会涉及到刚才提到的如何将受到的回复的消息内容替换到调用线程中函数的输入输出参数中。详细实现请看如下
void CBaseSession::doResponse(CFdbMessageHeader head)
{bool found;PendingMsgTable_t::EntryContainer_t::iterator it;CBaseJob::Ptr msg_ref mPendingMsgTable.retrieveEntry(head.serial_number(), it, found);if (found){auto msg castToMessageCFdbMessage *(msg_ref);auto object_id head.object_id();if (msg-objectId() ! object_id){LOG_E(CFdbSession: object id of response %d does not match that in request: %d\n,object_id, msg-objectId());terminateMessage(msg_ref, FDB_ST_OBJECT_NOT_FOUND, Object ID does not match.);mPendingMsgTable.deleteEntry(it);delete[] mPayloadBuffer;mPayloadBuffer 0;return;}auto object mContainer-owner()-getObject(msg, false);if (object){msg-update(head, mMsgPrefix);msg-decodeDebugInfo(head);msg-replaceBuffer(mPayloadBuffer, head.payload_size(), mMsgPrefix.mHeadLength);mPayloadBuffer 0;auto type msg-type();doStatistics(type, head.flag(), mStatistics.mRx);if (!msg-sync()){switch (type){case FDB_MT_REQUEST:object-doReply(msg_ref);break;case FDB_MT_SIDEBAND_REQUEST:object-onSidebandReply(msg_ref);break;case FDB_MT_GET_EVENT:object-doReturnEvent(msg_ref);break;case FDB_MT_SET_EVENT:default:if (head.type() FDB_MT_STATUS){object-doStatus(msg_ref);}else{LOG_E(CFdbSession: request type %d doesnt match response type %d!\n,type, head.type());}break;}}}msg_ref-terminate(msg_ref);mPendingMsgTable.deleteEntry(it);}
}
上面的代码可以看到首先获取消息头部的序列号mSn每个发出的消息中的序列号是唯一的在网络端点处理完成之后进行回复时也要将消息的序列号返回以便原始请求消息发送方根据该序列号判断是否为自己发出消息匹配。
上面代码
CBaseJob::Ptr msg_ref mPendingMsgTable.retrieveEntry(head.serial_number(), it, found);
就是完成该逻辑。若找到匹配的消息则进行处理通过该序列号找到原始消息请求方的消息对象指针最关键的部分来了每个session对象都包含一个指向收到消息的指针mPayloadBuffer该指针指向的内存每收到一条消息都会重新申请然后通过下面的语句
msg-replaceBuffer(mPayloadBuffer, head.payload_size(), mMsgPrefix.mHeadLength);
完成消息体内容的替换。
最后通过调用下面的语句如果存在同步调用的情况则通过唤醒条件变量来唤醒调用线程解除阻塞调用线程。同时也完成了回复消息的内容替换到了原始消息对象指针指向的mBuffer等成员更直白的说就是收到一条消息session对象首先申请一片内存然后消息收到的消息读取到该内存中然后解析消息头解析消息头后获取消息的序列号通过序列号再本地找到发送的消息对象该消息对象包含mBuffer mPayloadSize mHeadSize mOffset等成员找到该对象后首先释放掉mBuffer指向的内存然后将session对象申请的内存赋值给mBuffer这样就完成了消息的传递过程这样在同步调用的时候CBaseJob::Ptr msg_ref参数指向的对象就完成了从原始请求消息变成了回复消息用户就可以从这个参数获取完整的回复消息。
msg_ref-terminate(msg_ref);
这样大家就明白了一个消息的完整的过程了吧。我觉的还是挺清晰的。