如何承接设计网站建设,网站网页设计心得,wordpress 访客统计插件,做一个平台app需要多少钱基于Tars高并发IM系统的设计与实现-实战篇5
群聊服务 GroupChatServer
群聊服务既可以接受来自BrokerServer的用户请求#xff0c;也需要接收来自其他服务的RPC请求;所以本服务提供两套RPC接口#xff1a;通用RPC接口和专用RPC接口。
通用RPC接口
通用RPC接口主要处理如下…基于Tars高并发IM系统的设计与实现-实战篇5
群聊服务 GroupChatServer
群聊服务既可以接受来自BrokerServer的用户请求也需要接收来自其他服务的RPC请求;所以本服务提供两套RPC接口通用RPC接口和专用RPC接口。
通用RPC接口
通用RPC接口主要处理如下请求
创建群聊群聊加成员群聊减成员修改群资料发群消息换群主解散群聊同步用户群聊获取群成员解散群聊判断一个人是否为群成员
针对以上每个业务根据用户请求的类型进行不同的业务逻辑处理处理代码如下 switch(req.header.type){case otim::PT_MSG_GROUP_CHAT:this-sendMsg(clientContext, req, resp);break;case otim::PT_GROUPCHAT_SYNC:this-syncGroup(clientContext, req, resp);break;case otim::PT_GROUPCHAT_CREATE:this-createGroup(clientContext, req, resp);break;case otim::PT_GROUPCHAT_JION:this-joinGroup(clientContext, req, resp);break;case otim::PT_GROUPCHAT_QUIT:this-quitGroup(clientContext, req, resp);break;case otim::PT_GROUPCHAT_DISMISS:this-dismissGroup(clientContext, req, resp);break;case otim::PT_GROUPCHAT_UPDATE_CREATOR:this-updateGroupCreator(clientContext, req, resp);break;case otim::PT_GROUPCHAT_INFO_UPDATE:this-updateGroupInfo(clientContext, req, resp);break;case otim::PT_GROUPCHAT_MEMBERS_GET:this-getGroupMember(clientContext, req, resp);break;default:MLOG_DEBUG(the type is invalid:otim::etos((otim::PACK_TYPE)req.header.type));return otim::EC_PROTOCOL;}群聊相关请求实现方法 int syncGroup(const otim::ClientContext clientContext, const otim::OTIMPack req, otim::OTIMPack resp);int createGroup(const otim::ClientContext clientContext, const otim::OTIMPack req, otim::OTIMPack resp);int dismissGroup(const otim::ClientContext clientContext, const otim::OTIMPack req, otim::OTIMPack resp);int updateGroupCreator(const otim::ClientContext clientContext, const otim::OTIMPack req, otim::OTIMPack resp);int updateGroupInfo(const otim::ClientContext clientContext, const otim::OTIMPack req, otim::OTIMPack resp);int joinGroup(const otim::ClientContext clientContext, const otim::OTIMPack req, otim::OTIMPack resp);int quitGroup(const otim::ClientContext clientContext, const otim::OTIMPack req, otim::OTIMPack resp);int getGroupMember(const otim::ClientContext clientContext, const otim::OTIMPack req, otim::OTIMPack resp);
专用RPC接口
主要提供两个接口
interface GroupChatRPCServant
{int getGroupMember(string groupId, out vectorstring memberIds);bool isGroupMember(string groupId, string memberId);};
历史消息服务 HistoryMsgServer
该服务主要处理用户历史消息即相关的业务
热会话同步历史消息存取高优先级消息存取
该服务提供通用RPC服务主要服务对象为接入服务BrokerServer 用户所有消息都通过该服务进行存取为高效存取历史消息主要存储在redis,存储量及时长可以根据需求进一步来做配置开发。
业务逻辑处理接口
该服务采用通用接口来处理客户端请求 tars::Int32 processHotsessionReq(const otim::ClientContext clientContext,const otim::OTIMPack req,otim::OTIMPack resp);tars::Int32 processPullHistoryReq(const otim::ClientContext clientContext,const otim::OTIMPack reqPack,otim::OTIMPack respPack);tars::Int32 processHighPriorMsgSyncReq(const otim::ClientContext clientContext,const otim::OTIMPack reqPack,otim::OTIMPack respPack);冷存储服务器 OlapServer
该服务主要将IM数据存储到mysql中永久保存专用RPC服务
业务逻辑处理接口
interface OlapServant
{int saveMsg(otim::ClientContext clientContext, OTIMPack pack, string sessionId, long seqId);
};消息操作服务 MsgOperatorServer
该服务主要有如下功能逻辑处理
消息控制请求包含撤回删除覆写消息已读处理
业务逻辑处理接口 tars::Int32 processMsgUnreadReq(const otim::ClientContext clientContext,const otim::OTIMPack reqPack,otim::OTIMPack respPack);tars::Int32 processMsgCTRLReq(const otim::ClientContext clientContext,const otim::OTIMPack reqPack,otim::OTIMPack respPack);
消息撤回删除覆写逻辑处理
tars::Int32 MsgOperatorServantImp::processMsgCTRLReq(const otim::ClientContext clientContext,const otim::OTIMPack reqPack,otim::OTIMPack respPack)
{SCOPELOGGER(scopelogger);scopeloggerreqPack:reqPack.header.writeToJsonString();otim::MsgControl req;otim::unpackTarsotim::MsgControl(reqPack.payload, req);MLOG_DEBUG(clientContext:clientContext.writeToJsonString() req:req.writeToJsonString());respPack reqPack;respPack.header.flags | otim::PF_ISACK;otim::CommonErrorCode respData;respData.code otim::EC_SUCCESS;if (req.sessionId.empty() || req.seqId 0 || req.packId.empty()){respData.code otim::EC_PARAM;otim::packTarsotim::CommonErrorCode(respData, respPack.payload);MLOG_DEBUG(sessionId,packId or seqId is is empty req:req.writeToJsonString());return respData.code;}otim::RedisConnPtr redis(otim::RedisPool::instance());//get old msgstd::vectorstd::string msgs;std::vectorstd::string scores;EMRStatus ret redis-ZRangeByScoreAndLimit(otim::RKEY_MSG req.sessionId, req.seqId, 5, msgs);if (EMRStatus::EM_KVDB_ERROR ret){MLOG_ERROR(get msg fail!, sessionId: req.sessionId msgId:req.seqId);respData.code otim::EC_DB_ERROR;otim::packTarsotim::CommonErrorCode(respData, respPack.payload);return otim::EC_DB_ERROR;}MLOG_DEBUG(get old msg size:msgs.size());otim::OTIMPack packOrg;for (auto item : msgs){std::vectorchar vctItem(item.begin(), item.end());otim::OTIMPack packItem;otim::unpackTarsotim::OTIMPack(vctItem, packItem);MLOG_DEBUG(msgs :packItem.header.writeToJsonString());if (packItem.header.packId req.packId){packOrg packItem;}}if (packOrg.header.packId.empty()){MLOG_WARN(The org msg is not exist:req.sessionId packId:req.packId seqId:req.seqId);respData.code otim::EC_MSG_NOT_EXIST;otim::packTarsotim::CommonErrorCode(respData, respPack.payload);return respData.code;}MLOG_DEBUG(org msg:packOrg.header.writeToJsonString());std::string to;if (req.command otim::MC_REVOKE){packOrg.header.flags | otim::PF_REVOKE;MLOG_DEBUG(revoke msg:req.packId);}else if (req.command otim::MC_OVERRIDE){packOrg.header.flags | otim::PF_OVERRIDE;otim::MsgReq msgReq;otim::unpackTarsotim::MsgReq(packOrg.payload, msgReq);msgReq.content req.content;otim::packTarsotim::MsgReq(msgReq, packOrg.payload);to msgReq.to;MLOG_DEBUG(override msg:req.packId);}else if (req.command otim::MC_DELETE){
// packOrg.header.flags | otim::PF_REVOKE;MLOG_DEBUG(delete msg:req.packId);}else{MLOG_WARN(The command is error:req.command packId:req.packId);respData.code otim::EC_MSG_OP_CMD;otim::packTarsotim::CommonErrorCode(respData, respPack.payload);return otim::EC_MSG_OP_CMD;}ret redis-ZSetRemoveByScore(otim::RKEY_MSG req.sessionId, req.seqId, req.seqId);if (EMRStatus::EM_KVDB_SUCCESS ! ret ){MLOG_ERROR(delete original msg fail:(int)ret);}//增加新的消息if (req.command ! otim::MC_DELETE){std::string msgSave;otim::packTarsotim::OTIMPack(packOrg, msgSave);ret redis-ZSetAdd(otim::RKEY_MSG req.sessionId, req.seqId, msgSave);if ( EMRStatus::EM_KVDB_SUCCESS ! ret ){MLOG_ERROR(save cancel msg fail!);}}//通知在线接收者其他端otim::sendPackToMySelf(clientContext, reqPack);// send to userstd::vectorstd::string vctUserId;if (packOrg.header.type otim::PT_MSG_SINGLE_CHAT || packOrg.header.type otim::PT_MSG_BIZ_NOTIFY){if (to.empty()){otim::MsgReq msgReq;otim::unpackTarsotim::MsgReq(packOrg.payload, msgReq);to msgReq.to;}vctUserId.push_back(to);MLOG_DEBUG(single or notify chat packId:packOrg.header.packId to:to);}else if (packOrg.header.type otim::PT_MSG_GROUP_CHAT){//get groupMemberotim::GroupChatRPCServantPrx groupChatRPCServantPrx otim::getServantPrxotim::GroupChatRPCServantPrx(PRXSTR_GROUP_CHAT_RPC);groupChatRPCServantPrx-getGroupMember(req.sessionId, vctUserId);MLOG_DEBUG(group chat packId:packOrg.header.packId to:req.sessionId member Size:vctUserId.size());}int64_t seqId otim::genSeqId();for (auto userId : vctUserId){otim::savePriorityMsg(redis.get(), reqPack, userId, seqId);otim::dispatchMsg(clientContext, reqPack, userId);}respData.code otim::EC_SUCCESS;otim::packTarsotim::CommonErrorCode(respData, respPack.payload);return otim::EC_SUCCESS;
}
Http接口服务
该服务针对第三方提供消息能力主要提供如下接口
发送消息(简单文本消息复杂消息添加好友删除好友查看好友
功能实现函数 std::string doSendSimpleMsgCmd(TC_HttpRequest cRequest);std::string doSendMsgCmd(TC_HttpRequest cRequest);std::string doAddFriend(TC_HttpRequest request);std::string doDelFriend(TC_HttpRequest request);std::string doGetFriends(TC_HttpRequest request);
Push推送服务
该服务主要实现IM消息的离线推送能力 APP客户端不在线的场景下将消息通过离线通道push用户的手机上以提高消息的触达率。
主要实现iOS APNSAndroid FCM华为小米oppovivo等厂商的离线消息推送功能 需要根据各个厂商开放平台提供的API进行开发集成。
Android 厂商开发平台地址
华为: https://developer.huawei.com/consumer/cn/hms/huawei-pushkit小米: https://dev.mi.com/console/appservice/push.html魅族: http://open-wiki.flyme.cn/doc-wiki/indexvivo: https://push.vivo.com.cn/#/oppo: https://push.oppo.com/
RPC接口
enum PushServiceType
{PS_TYPE_NONE 0, //无 Push服务提供商PS_TYPE_IOS 1, //IOS Push服务提供商PS_YPE_HUAWEI 2, //华为 Push服务提供商PS_TYPE_XIAOMI 3, //小米 Push服务提供商PS_TYPE_MEIZU 4, //魅族 Push服务提供商PS_TYPE_VIVO 5, //vivi服务PS_TYPE_OPPO 6, //oppo服务PS_TYPE_FCM 7, //FCM服务
};struct RegInfo {0 require string packId ; //消息的id1 require PushServiceType serviceType 0; //push服务提供商2 require string packageName ; //包名3 require string userId ; //用户id4 optional string appVersion ; //app version
};struct PushInfo {0 require string packId ; //消息的id1 require string userId ; //用户id2 require int unReadCount 0; //未读消息数3 require string title ; /push标题4 require string content ; //push内容5 optional string uri ; //跳转uri6 optional string extraData; //业务自定义字段
};interface PushServant
{int register(RegInfo regInfo);int pushMessage(PushInfo pushInfo);
};服务端部署
编译打包
所有服务开发完成后执行如下命令进行编译打包
make release
make clean all
make tar程序包部署
根据前期部署好的Tars框架环境、web管理系统将程序包逐个发布发布后的系统如图