当前位置: 首页 > news >正文

在网站中动态效果怎么做怎么用建站系统建网站

在网站中动态效果怎么做,怎么用建站系统建网站,课程网站开发合同,电子商务平台名词解释一、介绍RPC RPC#xff08;Remote Procedure Call#xff09;远程过程调用#xff0c;一种通过网络从远程计算器上请求服务#xff0c;而不需要了解底层网络通信细节#xff0c;RPC可以使用多种网络协议进行通信#xff0c;并且在TCP/IP网络四层模型中跨越了传输层和应… 一、介绍RPC RPCRemote Procedure Call远程过程调用一种通过网络从远程计算器上请求服务而不需要了解底层网络通信细节RPC可以使用多种网络协议进行通信并且在TCP/IP网络四层模型中跨越了传输层和应用层。RPC就是像调用本地方法一样调用远程方法。 项目技术选取 1.实现一个远程调用接口call然后通过传入函数名参数来调用RPC接口 2.选取JSON类型来进行网络传输的参数和返回值映射到RPC接口上 3.网络传输选取muduo库 4.序列化和反序列化JSON 主要环境为LinuxUbuntu 二、 介绍JSON Json是一种数据交换格式它采用完全独立于变成语言的文本格式来存储和表示数据 JsonCpp库主要是用于实现Json格式数据的序列化和反序列化他实现了将多个数据对象组织成为Json格式字符串以及将json格式字符串解析得到多个数据对象的功能 class JSON {public:static bool serialize(const Json::Value val, std::string body) {std::stringstream ss;//先实例化一个工厂类对象Json::StreamWriterBuilder swb;//通过工厂类对象来生产派生类对象std::unique_ptrJson::StreamWriter sw(swb.newStreamWriter());int ret sw-write(val, ss);if (ret ! 0) {ELOG(json serialize failed!);return false;}body ss.str();return true;}static bool unserialize(const std::string body, Json::Value val) {Json::CharReaderBuilder crb;std::string errs;std::unique_ptrJson::CharReader cr(crb.newCharReader());bool ret cr-parse(body.c_str(), body.c_str() body.size(), val, errs);if (ret false) {ELOG(json unserialize failed : %s, errs.c_str());return false;}return true;}};三、介绍Muduo库 Muduo库是一个基于非阻塞IO和事件驱动的C高并发TCP网络编程库是一款基于主从Reactor模型的网络库主要使用的线程模型是 one loop per thread即一个线程只能有一个事件循环用于响应计时器和IO事件 一个文件描述符只能由一个线程进行读写这里只做简单叙述后面具体项目代码中会使用 四、C11 异步操作 std::future 是C11标准库中的一个模版类他表示一个异步操作的结果当我们在多线程编程中使用异步任务时std::future可以帮助我们在需要的时候获取任务的执行结果他能够阻塞当前线程直到异步操作完成从而确保我们在获取结果时不会遇到未完成的操作。 五、项目设计 理解项目功能 本质上来讲我们要实现远程调用思想并不复杂其实就是客户端想要完成某个任务的处理但是这个处理过程并不是自己来完成而是将请求发送到服务器上让服务器来帮助其完成处理过程并返回结果客户端拿到结果后返回 但是一对多火一对一的关系中一但服务端掉线则客户端无法进行远端调用且服务器的负载也会较高我们选择设计为分布式的架构由多个节点组成一个系统这些节点通常指的是服务器将不同的业务或者同一个业务拆分在不同节点上通过协同工作解决高并发的问题提高系统扩展性和可用性主要是增加一个注册中心基于注册中心不同的服务向注册中心进行注册客户端在进行调用时先通过注册中心进行服务发现找到能够提供服务的服务器 项目还提供了发布订阅功能依托客户端围绕服务端进行消息的转发 项目主要三个功能 1.RPC调用 2.服务的注册与发现以及服务的下线上线通知 3.消息的发布订阅 六、框架设计 主要将整个项目分为三层来进行实现 1.抽象层将底层的网络通信以及应用层协议以及请求响应进行抽象使项目更具扩展性和灵活性 2.具象层针对抽象的功能进行具体的实现 3.业务层基于抽象的框架在上层实现项目所需功能 整体框架设计 服务端模块划分 首先我们要清楚服务端的功能需求 1.基于网络通信接收客户端的请求提供rpc服务 2.基于网络通信接收客户端的请求提供服务注册与发现上线下线通知 3.基于网络通信接收客户端的请求提供主题操作创建删除订阅取消订阅消息发布功能 按照功能需求可划分模块 1.Network 网络通信模块 网络通信模块实现底层的网络通信功能这个模块的本质上也是一个比较复杂庞大的模块上面我们也说了这里使用muduo库来进行搭建 2.Protocol 应用层通信协议模块 应用层通信协议模块存在的意义就是解析数据解决通信中有可能存在的粘包问题能够获取到一条完整的消息 我们选取的方式为LV格式 Length:固定为4字节长度用于表示后续的本条消息数据长度 MType固定4字节长度用于表示该条消息的类型 1.rpc调用请求相应类型消息 2.发布订阅取消订阅消息推送类型消息 3.主题创建删除类型消息 4.服务注册发现上线下线消息类型 IDLength固定4字节长度用于描述后续ID字段的实际长度 MID用于唯一标识消息ID字段长度不固定 Body消息主题正文数据字段是请求或响应的实际内容字段 3.Dispatcher 消息分发处理模块 区分消息类型根据不同的类型调用不同的业务处理函数进行消息处理 当在网络通信收到数据后在onMessage回调函数中对数据进行应用层协议解析得到一条实际消息载荷后我们就该决定这条消息代表着客户端的什么请求以及应该如何处理本模块主要哈希表来表示消息类型和回调函数的映射关系即收到消息后在模块中找到对应的处理回调函数进行调用即可 消息类型1.rpc请求与响应 2.服务注册。发现上线下线 请求与响应 3. 主题创建删除订阅取消订阅 请求与响应 消息发布的请求与响应 4.RpcRouter 远端调用路由功能模块 提供rpc请求的处理回调函数内部所要实现的功能分辨出客户端请求的服务进行处理得到结果进行响应 1.具备一个rpc路由管理其中包含对于每个服务的参数校验功能 2.具备一个方法名称和方法业务回调的映射 3.必须向外提供rpc请求的业务处理函数 5.Publish—Subscribe 发布订阅功能模块 针对发布订阅请求进行处理提供一个回调函数设置给Dispatch模块 主题的创建主题的删除主题的订阅主题的取消订阅主题消息的发布 该模块必须具备一个主题管理且主题中需要保存订阅了该主题的客户端连接当主题收到一条消息需要将这条消息推送给订阅了该主题的所有客户端 必须具备一个订阅者管理且每个订阅者描述中都必须保存自己所订阅的主题名称当一个订阅客户端断开连接时能够找到订阅信息的关联关系进行删除 必须向外提供 主题创建销毁主题订阅取消订阅能够找到订阅信息的关联关系进行删除 6.Registry—Discovery 服务注册发现上线下线功能模块 针对服务注册与发现请求的处理 1.服务注册服务provider告诉中转中心自己能提供哪些服务 2.服务发现服务caller询问中转中心谁能提供指定服务 3.服务上线在一个provider上线了指定服务后通知发现过该服务的客户端有provider可以提供该服务 4.服务下线在一个provider断开连接通知发现过该服务的caller谁下线了哪个服务 必须具备一个服务发现者的管理 方法与发现者当一个客户端进行服务发现的时候进行记录谁发现过该服务当有一个新的提供者上线的时 候可以通知该发现者 连接与发现者当一个发现者断开连接了删除关联关系往后就不需要通知了 必须具备一个服务提供者的管理 连接与提供者当一个提供者断开连接的时候能够通知该提供者提供的服务对应的发现者该主机的该服务下线了 方法与提供者能够知道谁的哪些方法下线了然后通知发现过该方法的客户端 7.Server 基于以上模块整合而出的服务端模块 RpcServerrpc功能模块与网络通信部分结合 RegistryServer服务发现注册功能与网络通信部分结合 TopicServer发布订阅功能模块与网络通信部分结合 客户端模块划分 1.Protocol应用层通信协议模块 2.Network网络通信模块 3.Dispatch消息分发处理模块 4.Requestor请求管理模块 5.RpcCaller远端调用功能模块 6.Publish-Subscribe发布订阅功能模块 7.Registry-Discovery服务注册发现上线下线功能模块 8.Client基于以上模块整合而出的客户端模块 重复模块不再过多赘述 Requestor模块 针对客户端的每一条请求进行管理以便于对请求对应的响应做出合适的操作 首先我们要思考一个问题对于客户端来说更多的是请求方是主动发起请求服务的一方而在多线程网络通信中可能会存在时序的问题其次muduo库这种异步IO网络通信库通常是IO操作都是异步操作无法直接在发送请求后去等待该条请求的响应 解决办法给每一个请求设置请求ID将数据存入hash_map中以请求ID作为映射并向外提供获取指定请求ID响应的阻塞接口这样只要知道的请求ID即可获取到自己想要的响应 RpcCall模块 向用户提供进行rpc调用模块 1.同步调用发起调用后等收到响应结果后返回 2.异步调用发起调用后立即返回在想获取结果的时候进行获取 3.回调调用发起调用的同事设置结果的处理回调收到响应后自动对结果进行回调处理 Registry-Discovery模块 注册者作为Rpc服务的提供者需要向注册中心提供服务因为需要实现向服务器注册服务的功能 发现者作为Rpc服务的调用者需要先进行服务发现获取地址后需要管理起来留用且作为发现者需要关注注册中心发过来的服务上线/下线消息及时对已经下线的服务和主机进行管理 Client整合模块: RegistryClient服务注册功能模块与网络通信客户端结合 DiscoveryClient服务发现功能模块与网络通信客户端结合 RpcClientDiscovery 与RPC功能模块与网络通信客户端结合 TopicClient发布订阅功能模块与网络通信客户端结合 七、项目实现 常用接口实现 日志 一个项目的实现日志是必不可少的主要用来开酥定位程序运行逻辑出错的位置 出现问题不是最可怕的可怕的是找不到问题出现在哪 #define LDBG 0#define LINF 1#define LERR 2#define LDEFAULT LDBG#define LOG(level, format, ...) {\if (level LDEFAULT){\time_t t time(NULL);\struct tm *lt localtime(t);\char time_tmp[32] {0};\strftime(time_tmp, 31, %m-%d %T, lt);\fprintf(stdout, [%s][%s:%d] format \n, time_tmp, __FILE__, __LINE__, ##__VA_ARGS__);\}\}#define DLOG(format, ...) LOG(LDBG, format, ##__VA_ARGS__);#define ILOG(format, ...) LOG(LINF, format, ##__VA_ARGS__);#define ELOG(format, ...) LOG(LERR, format, ##__VA_ARGS__);Json序列化/反序列化 class JSON {public:static bool serialize(const Json::Value val, std::string body) {std::stringstream ss;//先实例化一个工厂类对象Json::StreamWriterBuilder swb;//通过工厂类对象来生产派生类对象std::unique_ptrJson::StreamWriter sw(swb.newStreamWriter());int ret sw-write(val, ss);if (ret ! 0) {ELOG(json serialize failed!);return false;}body ss.str();return true;}static bool unserialize(const std::string body, Json::Value val) {Json::CharReaderBuilder crb;std::string errs;std::unique_ptrJson::CharReader cr(crb.newCharReader());bool ret cr-parse(body.c_str(), body.c_str() body.size(), val, errs);if (ret false) {ELOG(json unserialize failed : %s, errs.c_str());return false;}return true;}};UUID生成 什么是UUIDUUID也叫通用唯一识别码通常由32位16进制数字字符组成 8-4-4-4-12 我们采用生成8个随机数字8个字节序号16字节数组生成 class UUID {public:static std::string uuid() {std::stringstream ss;//1. 构造一个机器随机数对象std::random_device rd;//2. 以机器随机数为种子构造伪随机数对象std::mt19937 generator (rd());//3. 构造限定数据范围的对象std::uniform_int_distributionint distribution(0, 255);//4. 生成8个随机数按照特定格式组织成为16进制数字字符的字符串for (int i 0; i 8; i) {if (i 4 || i 6) ss -;ss std::setw(2) std::setfill(0) std::hex distribution(generator);}ss -;//5. 定义一个8字节序号逐字节组织成为16进制数字字符的字符串static std::atomicsize_t seq(1); // 00 00 00 00 00 00 00 01size_t cur seq.fetch_add(1);for (int i 7; i 0; i--) {if (i 5) ss -;ss std::setw(2) std::setfill(0) std::hex ((cur (i*8)) 0xFF);}return ss.str();}定义 1.请求字段宏定义 方法名称 方法参数 主题名称 主题消息 操作类型 IP地址 port端口 响应码 调用结果 #define KEY_METHOD method#define KEY_PARAMS parameters#define KEY_TOPIC_KEY topic_key#define KEY_TOPIC_MSG topic_msg#define KEY_OPTYPE optype#define KEY_HOST host#define KEY_HOST_IP ip#define KEY_HOST_PORT port#define KEY_RCODE rcode#define KEY_RESULT result2.消息类型的定义 PRC 请求响应 主题操作 请求响应 服务操作 请求响应 enum class MType {REQ_RPC 0,RSP_RPC,REQ_TOPIC,RSP_TOPIC,REQ_SERVICE,RSP_SERVICE};3.响应码类型定义 成功处理 解析失败 消息中字段缺失或错误导致无效消息 链接断开 无效的Rpc调用参数 Rpc服务不存在 无效的Topic操作类型 主题不存在 无效的服务操作类型 enum class RCode {RCODE_OK 0,RCODE_PARSE_FAILED,RCODE_ERROR_MSGTYPE,RCODE_INVALID_MSG,RCODE_DISCONNECTED,RCODE_INVALID_PARAMS,RCODE_NOT_FOUND_SERVICE,RCODE_INVALID_OPTYPE,RCODE_NOT_FOUND_TOPIC,RCODE_INTERNAL_ERROR};static std::string errReason(RCode code) {static std::unordered_mapRCode, std::string err_map {{RCode::RCODE_OK, 成功处理},{RCode::RCODE_PARSE_FAILED, 消息解析失败},{RCode::RCODE_ERROR_MSGTYPE, 消息类型错误},{RCode::RCODE_INVALID_MSG, 无效消息},{RCode::RCODE_DISCONNECTED, 连接已断开},{RCode::RCODE_INVALID_PARAMS, 无效的Rpc参数},{RCode::RCODE_NOT_FOUND_SERVICE, 没有找到对应的服务},{RCode::RCODE_INVALID_OPTYPE, 无效的操作类型},{RCode::RCODE_NOT_FOUND_TOPIC, 没有找到对应的主题},{RCode::RCODE_INTERNAL_ERROR, 内部错误}};auto it err_map.find(code);if (it err_map.end()) {return 未知错误;}return it-second;} 4.RPC请求类型定义 同步请求 异步请求 回调请求 enum class RType {REQ_ASYNC 0,REQ_CALLBACK};5.主题操作类型定义 主题创建 主题删除 主题订阅 主题取消订阅 主题消息发布 enum class TopicOptype {TOPIC_CREATE 0,TOPIC_REMOVE,TOPIC_SUBSCRIBE,TOPIC_CANCEL,TOPIC_PUBLISH};6.服务操作类型定义 服务注册 服务发现 服务上线 服务下线 enum class ServiceOptype {SERVICE_REGISTRY 0,SERVICE_DISCOVERY,SERVICE_ONLINE,SERVICE_OFFLINE,SERVICE_UNKNOW};通信抽象实现 提供一些必要的接口具体实现由派生类实现 BaseMessage BaseBuffer BaseProtocol BaseConnection BaseServe BaseClient namespace myrpc {class BaseMessage {public:using ptr std::shared_ptrBaseMessage;virtual ~BaseMessage(){}virtual void setId(const std::string id) {_rid id;}virtual std::string rid() { return _rid; }virtual void setMType(MType mtype) {_mtype mtype;}virtual MType mtype() { return _mtype; }virtual std::string serialize() 0;virtual bool unserialize(const std::string msg) 0;virtual bool check() 0;private:MType _mtype;std::string _rid;};class BaseBuffer {public:using ptr std::shared_ptrBaseBuffer;virtual size_t readableSize() 0;virtual int32_t peekInt32() 0;virtual void retrieveInt32() 0;virtual int32_t readInt32() 0;virtual std::string retrieveAsString(size_t len) 0;};class BaseProtocol {public:using ptr std::shared_ptrBaseProtocol;virtual bool canProcessed(const BaseBuffer::ptr buf) 0;virtual bool onMessage(const BaseBuffer::ptr buf, BaseMessage::ptr msg) 0;virtual std::string serialize(const BaseMessage::ptr msg) 0;};class BaseConnection {public:using ptr std::shared_ptrBaseConnection;virtual void send(const BaseMessage::ptr msg) 0;virtual void shutdown() 0;virtual bool connected() 0;};using ConnectionCallback std::functionvoid(const BaseConnection::ptr);using CloseCallback std::functionvoid(const BaseConnection::ptr);using MessageCallback std::functionvoid(const BaseConnection::ptr, BaseMessage::ptr);class BaseServer {public:using ptr std::shared_ptrBaseServer;virtual void setConnectionCallback(const ConnectionCallback cb) {_cb_connection cb;}virtual void setCloseCallback(const CloseCallback cb) {_cb_close cb;}virtual void setMessageCallback(const MessageCallback cb) {_cb_message cb;}virtual void start() 0;protected:ConnectionCallback _cb_connection;CloseCallback _cb_close;MessageCallback _cb_message;};class BaseClient {public:using ptr std::shared_ptrBaseClient;virtual void setConnectionCallback(const ConnectionCallback cb) {_cb_connection cb;}virtual void setCloseCallback(const CloseCallback cb) {_cb_close cb;}virtual void setMessageCallback(const MessageCallback cb) {_cb_message cb;}virtual void connect() 0;virtual void shutdown() 0;virtual bool send(const BaseMessage::ptr) 0;virtual BaseConnection::ptr connection() 0;virtual bool connected() 0;protected:ConnectionCallback _cb_connection;CloseCallback _cb_close;MessageCallback _cb_message;}; }消息抽象实现 class JsonMessage : public BaseMessage {public:using ptr std::shared_ptrJsonMessage;virtual std::string serialize() override {std::string body;bool ret JSON::serialize(_body, body);if (ret false) {return std::string();}return body;}virtual bool unserialize(const std::string msg) override {return JSON::unserialize(msg, _body);}protected:Json::Value _body;};class JsonRequest : public JsonMessage {public:using ptr std::shared_ptrJsonRequest;};class JsonResponse : public JsonMessage {public:using ptr std::shared_ptrJsonResponse;virtual bool check() override {//在响应中大部分的响应都只有响应状态码//因此只需要判断响应状态码字段是否存在类型是否正确即可if (_body[KEY_RCODE].isNull() true) {ELOG(响应中没有响应状态码);return false;}if (_body[KEY_RCODE].isIntegral() false) {ELOG(响应状态码类型错误);return false;}return true;}virtual RCode rcode() {return (RCode)_body[KEY_RCODE].asInt();}virtual void setRCode(RCode rcode) {_body[KEY_RCODE] (int)rcode;}};Muduo封装实现 #include muduo/net/TcpServer.h #include muduo/net/EventLoop.h #include muduo/net/TcpConnection.h #include muduo/net/Buffer.h #include muduo/base/CountDownLatch.h #include muduo/net/EventLoopThread.h #include muduo/net/TcpClient.h #include detail.hpp #include fields.hpp #include abstract.hpp #include message.hpp #include mutex #include unordered_mapnamespace myrpc {class MuduoBuffer : public BaseBuffer {public:using ptr std::shared_ptrMuduoBuffer;MuduoBuffer(muduo::net::Buffer *buf):_buf(buf){}virtual size_t readableSize() override {return _buf-readableBytes();}virtual int32_t peekInt32() override {//muduo库是一个网络库从缓冲区取出一个4字节整形会进行网络字节序的转换return _buf-peekInt32();}virtual void retrieveInt32() override {return _buf-retrieveInt32();}virtual int32_t readInt32() override {return _buf-readInt32();}virtual std::string retrieveAsString(size_t len) override {return _buf-retrieveAsString(len);}private:muduo::net::Buffer *_buf;};class BufferFactory {public:templatetypename ...Argsstatic BaseBuffer::ptr create(Args ...args) {return std::make_sharedMuduoBuffer(std::forwardArgs(args)...);}};class LVProtocol : public BaseProtocol {public:// |--Len--|--VALUE--|// |--Len--|--mtype--|--idlen--|--id--|--body--|using ptr std::shared_ptrLVProtocol;//判断缓冲区中的数据量是否足够一条消息的处理virtual bool canProcessed(const BaseBuffer::ptr buf) override {if (buf-readableSize() lenFieldsLength) {return false;}int32_t total_len buf-peekInt32();//DLOG(total_len:%d, total_len);if (buf-readableSize() (total_len lenFieldsLength)) {return false;}return true;}virtual bool onMessage(const BaseBuffer::ptr buf, BaseMessage::ptr msg) override {//当调用onMessage的时候默认认为缓冲区中的数据足够一条完整的消息int32_t total_len buf-readInt32(); //读取总长度MType mtype (MType)buf-readInt32(); // 读取数据类型int32_t idlen buf-readInt32(); // 读取id长度int32_t body_len total_len - idlen - idlenFieldsLength - mtypeFieldsLength;std::string id buf-retrieveAsString(idlen);std::string body buf-retrieveAsString(body_len);msg MessageFactory::create(mtype);if (msg.get() nullptr) {ELOG(消息类型错误构造消息对象失败);return false;}bool ret msg-unserialize(body);if (ret false) {ELOG(消息正文反序列化失败);return false;}msg-setId(id);msg-setMType(mtype);return true;}virtual std::string serialize(const BaseMessage::ptr msg) override {// |--Len--|--mtype--|--idlen--|--id--|--body--|std::string body msg-serialize();std::string id msg-rid();auto mtype htonl((int32_t)msg-mtype());int32_t idlen htonl(id.size());int32_t h_total_len mtypeFieldsLength idlenFieldsLength id.size() body.size();int32_t n_total_len htonl(h_total_len);//DLOG(h_total_len:%d, h_total_len);std::string result;result.reserve(h_total_len);result.append((char*)n_total_len, lenFieldsLength);result.append((char*)mtype, mtypeFieldsLength);result.append((char*)idlen, idlenFieldsLength);result.append(id);result.append(body);return result;}private:const size_t lenFieldsLength 4;const size_t mtypeFieldsLength 4;const size_t idlenFieldsLength 4;};class ProtocolFactory {public:templatetypename ...Argsstatic BaseProtocol::ptr create(Args ...args) {return std::make_sharedLVProtocol(std::forwardArgs(args)...);}};class MuduoConnection : public BaseConnection {public:using ptr std::shared_ptrMuduoConnection;MuduoConnection(const muduo::net::TcpConnectionPtr conn,const BaseProtocol::ptr protocol) : _protocol(protocol), _conn(conn) {}virtual void send(const BaseMessage::ptr msg) override {std::string body _protocol-serialize(msg);_conn-send(body);}virtual void shutdown() override {_conn-shutdown();}virtual bool connected() override {_conn-connected();}private:BaseProtocol::ptr _protocol;muduo::net::TcpConnectionPtr _conn;};class ConnectionFactory {public:templatetypename ...Argsstatic BaseConnection::ptr create(Args ...args) {return std::make_sharedMuduoConnection(std::forwardArgs(args)...);}};class MuduoServer : public BaseServer {public:using ptr std::shared_ptrMuduoServer;MuduoServer(int port) : _server(_baseloop, muduo::net::InetAddress(0.0.0.0, port), MuduoServer, muduo::net::TcpServer::kReusePort),_protocol(ProtocolFactory::create()){}virtual void start() {_server.setConnectionCallback(std::bind(MuduoServer::onConnection, this, std::placeholders::_1));_server.setMessageCallback(std::bind(MuduoServer::onMessage, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));_server.start();//先开始监听_baseloop.loop();//开始死循环事件监控}private:void onConnection(const muduo::net::TcpConnectionPtr conn) {if (conn-connected()) {std::cout 连接建立\n;auto muduo_conn ConnectionFactory::create(conn, _protocol);{std::unique_lockstd::mutex lock(_mutex);_conns.insert(std::make_pair(conn, muduo_conn));}if (_cb_connection) _cb_connection(muduo_conn);}else {std::cout 连接断开\n;BaseConnection::ptr muduo_conn;{std::unique_lockstd::mutex lock(_mutex);auto it _conns.find(conn);if (it _conns.end()) {return;}muduo_conn it-second;_conns.erase(conn);}if (_cb_close) _cb_close(muduo_conn);}}void onMessage(const muduo::net::TcpConnectionPtr conn, muduo::net::Buffer *buf, muduo::Timestamp){DLOG(连接有数据到来开始处理);auto base_buf BufferFactory::create(buf);while(1) {if (_protocol-canProcessed(base_buf) false) {//数据不足if (base_buf-readableSize() maxDataSize) {conn-shutdown();ELOG(缓冲区中数据过大);return ;}//DLOG(数据量不足);break;}//DLOG(缓冲区中数据可处理);BaseMessage::ptr msg;bool ret _protocol-onMessage(base_buf, msg);if (ret false) {conn-shutdown();ELOG(缓冲区中数据错误);return ;}//DLOG(消息反序列化成功)BaseConnection::ptr base_conn;{std::unique_lockstd::mutex lock(_mutex);auto it _conns.find(conn);if (it _conns.end()) {conn-shutdown();return;}base_conn it-second;}//DLOG(调用回调函数进行消息处理);if (_cb_message) _cb_message(base_conn, msg);}}private:const size_t maxDataSize (1 16);BaseProtocol::ptr _protocol;muduo::net::EventLoop _baseloop;muduo::net::TcpServer _server;std::mutex _mutex;std::unordered_mapmuduo::net::TcpConnectionPtr, BaseConnection::ptr _conns;};class ServerFactory {public:templatetypename ...Argsstatic BaseServer::ptr create(Args ...args) {return std::make_sharedMuduoServer(std::forwardArgs(args)...);}};class MuduoClient : public BaseClient {public:using ptr std::shared_ptrMuduoClient;MuduoClient(const std::string sip, int sport):_protocol(ProtocolFactory::create()),_baseloop(_loopthread.startLoop()),_downlatch(1),_client(_baseloop, muduo::net::InetAddress(sip, sport), MuduoClient){}virtual void connect() override {DLOG(设置回调函数连接服务器);_client.setConnectionCallback(std::bind(MuduoClient::onConnection, this, std::placeholders::_1));//设置连接消息的回调_client.setMessageCallback(std::bind(MuduoClient::onMessage, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));//连接服务器_client.connect();_downlatch.wait();DLOG(连接服务器成功);}virtual void shutdown() override {return _client.disconnect();}virtual bool send(const BaseMessage::ptr msg) override {if (connected() false) {ELOG(连接已断开);return false;}_conn-send(msg);}virtual BaseConnection::ptr connection() override {return _conn;}virtual bool connected() {return (_conn _conn-connected());}private:void onConnection(const muduo::net::TcpConnectionPtr conn) {if (conn-connected()) {std::cout 连接建立\n;_downlatch.countDown();//计数--为0时唤醒阻塞_conn ConnectionFactory::create(conn, _protocol);}else {std::cout 连接断开\n;_conn.reset();}}void onMessage(const muduo::net::TcpConnectionPtr conn, muduo::net::Buffer *buf, muduo::Timestamp){DLOG(连接有数据到来开始处理);auto base_buf BufferFactory::create(buf);while(1) {if (_protocol-canProcessed(base_buf) false) {//数据不足if (base_buf-readableSize() maxDataSize) {conn-shutdown();ELOG(缓冲区中数据过大);return ;}//DLOG(数据量不足);break;}//DLOG(缓冲区中数据可处理);BaseMessage::ptr msg;bool ret _protocol-onMessage(base_buf, msg);if (ret false) {conn-shutdown();ELOG(缓冲区中数据错误);return ;}//DLOG(缓冲区中数据解析完毕调用回调函数进行处理);if (_cb_message) _cb_message(_conn, msg);}}private:const size_t maxDataSize (1 16);BaseProtocol::ptr _protocol;BaseConnection::ptr _conn;muduo::CountDownLatch _downlatch;muduo::net::EventLoopThread _loopthread;muduo::net::EventLoop *_baseloop;muduo::net::TcpClient _client;};class ClientFactory {public:templatetypename ...Argsstatic BaseClient::ptr create(Args ...args) {return std::make_sharedMuduoClient(std::forwardArgs(args)...);}}; }不同消息封装实现 Request RpcTopicService 继承于JsonReque ResponseRpcTopicService 继承于JsonResponse class RpcRequest : public JsonRequest {public:using ptr std::shared_ptrRpcRequest;virtual bool check() override {//rpc请求中包含请求方法名称-字符串参数字段-对象if (_body[KEY_METHOD].isNull() true ||_body[KEY_METHOD].isString() false) {ELOG(RPC请求中没有方法名称或方法名称类型错误);return false;}if (_body[KEY_PARAMS].isNull() true ||_body[KEY_PARAMS].isObject() false) {ELOG(RPC请求中没有参数信息或参数信息类型错误);return false;}return true;}std::string method() {return _body[KEY_METHOD].asString();}void setMethod(const std::string method_name) {_body[KEY_METHOD] method_name;}Json::Value params() {return _body[KEY_PARAMS];}void setParams(const Json::Value params) {_body[KEY_PARAMS] params;}};class TopicRequest : public JsonRequest {public:using ptr std::shared_ptrTopicRequest;virtual bool check() override {//rpc请求中包含请求方法名称-字符串参数字段-对象if (_body[KEY_TOPIC_KEY].isNull() true ||_body[KEY_TOPIC_KEY].isString() false) {ELOG(主题请求中没有主题名称或主题名称类型错误);return false;}if (_body[KEY_OPTYPE].isNull() true ||_body[KEY_OPTYPE].isIntegral() false) {ELOG(主题请求中没有操作类型或操作类型的类型错误);return false;}if (_body[KEY_OPTYPE].asInt() (int)TopicOptype::TOPIC_PUBLISH (_body[KEY_TOPIC_MSG].isNull() true ||_body[KEY_TOPIC_MSG].isString() false)) {ELOG(主题消息发布请求中没有消息内容字段或消息内容类型错误);return false;}return true;}std::string topicKey() {return _body[KEY_TOPIC_KEY].asString();}void setTopicKey(const std::string key) {_body[KEY_TOPIC_KEY] key;}TopicOptype optype() {return (TopicOptype)_body[KEY_OPTYPE].asInt();}void setOptype(TopicOptype optype) {_body[KEY_OPTYPE] (int)optype;}std::string topicMsg() {return _body[KEY_TOPIC_MSG].asString();}void setTopicMsg(const std::string msg) {_body[KEY_TOPIC_MSG] msg;}};class ServiceRequest : public JsonRequest {public:using ptr std::shared_ptrServiceRequest;virtual bool check() override {//rpc请求中包含请求方法名称-字符串参数字段-对象if (_body[KEY_METHOD].isNull() true ||_body[KEY_METHOD].isString() false) {ELOG(服务请求中没有方法名称或方法名称类型错误);return false;}if (_body[KEY_OPTYPE].isNull() true ||_body[KEY_OPTYPE].isIntegral() false) {ELOG(服务请求中没有操作类型或操作类型的类型错误);return false;}if (_body[KEY_OPTYPE].asInt() ! (int)(ServiceOptype::SERVICE_DISCOVERY) (_body[KEY_HOST].isNull() true ||_body[KEY_HOST].isObject() false ||_body[KEY_HOST][KEY_HOST_IP].isNull() true ||_body[KEY_HOST][KEY_HOST_IP].isString() false ||_body[KEY_HOST][KEY_HOST_PORT].isNull() true ||_body[KEY_HOST][KEY_HOST_PORT].isIntegral() false)) {ELOG(服务请求中主机地址信息错误);return false;}return true;}std::string method() {return _body[KEY_METHOD].asString();}void setMethod(const std::string name) {_body[KEY_METHOD] name;}ServiceOptype optype() {return (ServiceOptype)_body[KEY_OPTYPE].asInt();}void setOptype(ServiceOptype optype) {_body[KEY_OPTYPE] (int)optype;}Address host() {Address addr;addr.first _body[KEY_HOST][KEY_HOST_IP].asString();addr.second _body[KEY_HOST][KEY_HOST_PORT].asInt();return addr;}void setHost(const Address host) {Json::Value val;val[KEY_HOST_IP] host.first;val[KEY_HOST_PORT] host.second;_body[KEY_HOST] val;}};class RpcResponse : public JsonResponse {public:using ptr std::shared_ptrRpcResponse;virtual bool check() override {if (_body[KEY_RCODE].isNull() true ||_body[KEY_RCODE].isIntegral() false) {ELOG(响应中没有响应状态码,或状态码类型错误);return false;}if (_body[KEY_RESULT].isNull() true) {ELOG(响应中没有Rpc调用结果,或结果类型错误);return false;}return true;}Json::Value result() {return _body[KEY_RESULT];}void setResult(const Json::Value result) {_body[KEY_RESULT] result;}};class TopicResponse : public JsonResponse {public:using ptr std::shared_ptrTopicResponse;};class ServiceResponse : public JsonResponse {public:using ptr std::shared_ptrServiceResponse;virtual bool check() override {if (_body[KEY_RCODE].isNull() true ||_body[KEY_RCODE].isIntegral() false) {ELOG(响应中没有响应状态码,或状态码类型错误);return false;}if (_body[KEY_OPTYPE].isNull() true ||_body[KEY_OPTYPE].isIntegral() false) {ELOG(响应中没有操作类型,或操作类型的类型错误);return false;}if (_body[KEY_OPTYPE].asInt() (int)(ServiceOptype::SERVICE_DISCOVERY) (_body[KEY_METHOD].isNull() true ||_body[KEY_METHOD].isString() false ||_body[KEY_HOST].isNull() true ||_body[KEY_HOST].isArray() false)) {ELOG(服务发现响应中响应信息字段错误);return false;}return true;}ServiceOptype optype() {return (ServiceOptype)_body[KEY_OPTYPE].asInt();}void setOptype(ServiceOptype optype) {_body[KEY_OPTYPE] (int)optype;}std::string method() {return _body[KEY_METHOD].asString();}void setMethod(const std::string method) {_body[KEY_METHOD] method;}void setHost(std::vectorAddress addrs) {for (auto addr : addrs) {Json::Value val;val[KEY_HOST_IP] addr.first;val[KEY_HOST_PORT] addr.second;_body[KEY_HOST].append(val);}}std::vectorAddress hosts() {std::vectorAddress addrs;int sz _body[KEY_HOST].size();for (int i 0; i sz; i) {Address addr;addr.first _body[KEY_HOST][i][KEY_HOST_IP].asString();addr.second _body[KEY_HOST][i][KEY_HOST_PORT].asInt();addrs.push_back(addr);}return addrs;}};我们可以采用工厂模式来生产消息 class MessageFactory {public:static BaseMessage::ptr create(MType mtype) {switch(mtype) {case MType::REQ_RPC : return std::make_sharedRpcRequest();case MType::RSP_RPC : return std::make_sharedRpcResponse();case MType::REQ_TOPIC : return std::make_sharedTopicRequest();case MType::RSP_TOPIC : return std::make_sharedTopicResponse();case MType::REQ_SERVICE : return std::make_sharedServiceRequest();case MType::RSP_SERVICE : return std::make_sharedServiceResponse();}return BaseMessage::ptr();}templatetypename T, typename ...Argsstatic std::shared_ptrT create(Args ...args) {return std::make_sharedT(std::forward(args)...);}};Dispatcher实现 注册消息类型-回调函数映射关系 提供消息处理接口 namespace myrpc {class Callback {public:using ptr std::shared_ptrCallback;virtual void onMessage(const BaseConnection::ptr conn, BaseMessage::ptr msg) 0;};templatetypename Tclass CallbackT : public Callback{public:using ptr std::shared_ptrCallbackTT;using MessageCallback std::functionvoid(const BaseConnection::ptr conn, std::shared_ptrT msg);CallbackT(const MessageCallback handler):_handler(handler) { }void onMessage(const BaseConnection::ptr conn, BaseMessage::ptr msg) override {auto type_msg std::dynamic_pointer_castT(msg);_handler(conn, type_msg);}private:MessageCallback _handler;};class Dispatcher {public:using ptr std::shared_ptrDispatcher;templatetypename Tvoid registerHandler(MType mtype, const typename CallbackTT::MessageCallback handler) {std::unique_lockstd::mutex lock(_mutex);auto cb std::make_sharedCallbackTT(handler);_handlers.insert(std::make_pair(mtype, cb));}void onMessage(const BaseConnection::ptr conn, BaseMessage::ptr msg) {//找到消息类型对应的业务处理函数进行调用即可std::unique_lockstd::mutex lock(_mutex);auto it _handlers.find(msg-mtype());if (it ! _handlers.end()) {return it-second-onMessage(conn, msg);}//没有找到指定类型的处理回调--因为客户端和服务端都是我们自己设计的因此不可能出现这种情况ELOG(收到未知类型的消息: %d, msg-mtype());conn-shutdown();}private:std::mutex _mutex;std::unordered_mapMType, Callback::ptr _handlers;};服务端RpcRouter实现 提供Rpc请求处理回调函数 内部的服务管理方法名称 参数信息 对外提供参数校验接口 namespace myrpc {namespace server {enum class VType {BOOL 0,INTEGRAL,NUMERIC,STRING,ARRAY,OBJECT,};class ServiceDescribe {public: using ptr std::shared_ptrServiceDescribe;using ServiceCallback std::functionvoid(const Json::Value, Json::Value );using ParamsDescribe std::pairstd::string, VType;ServiceDescribe(std::string mname, std::vectorParamsDescribe desc, VType vtype, ServiceCallback handler) : _method_name(std::move(mname)),_callback(std::move(handler)), _params_desc(std::move(desc)), _return_type(vtype){}const std::string method() { return _method_name; }//针对收到的请求中的参数进行校验bool paramCheck(const Json::Value params){//对params进行参数校验---判断所描述的参数字段是否存在类型是否一致for (auto desc : _params_desc) {if (params.isMember(desc.first) false) {ELOG(参数字段完整性校验失败%s 字段缺失, desc.first.c_str());return false;}if (check(desc.second, params[desc.first]) false) {ELOG(%s 参数类型校验失败, desc.first.c_str());return false;}}return true;}bool call(const Json::Value params, Json::Value result) {_callback(params, result);if (rtypeCheck(result) false) {ELOG(回调处理函数中的响应信息校验失败);return false;}return true;}private:bool rtypeCheck(const Json::Value val) {return check(_return_type, val);}bool check(VType vtype, const Json::Value val) {switch(vtype) {case VType::BOOL : return val.isBool();case VType::INTEGRAL : return val.isIntegral();case VType::NUMERIC : return val.isNumeric();case VType::STRING : return val.isString();case VType::ARRAY : return val.isArray();case VType::OBJECT : return val.isObject();}return false;}private:std::string _method_name; // 方法名称ServiceCallback _callback; // 实际的业务回调函数std::vectorParamsDescribe _params_desc; // 参数字段格式描述VType _return_type; //结果作为返回值类型的描述};class SDescribeFactory {public:void setMethodName(const std::string name) {_method_name name;}void setReturnType(VType vtype) {_return_type vtype;}void setParamsDesc(const std::string pname, VType vtype) {_params_desc.push_back(ServiceDescribe::ParamsDescribe(pname, vtype));}void setCallback(const ServiceDescribe::ServiceCallback cb) {_callback cb;}ServiceDescribe::ptr build() {return std::make_sharedServiceDescribe(std::move(_method_name), std::move(_params_desc), _return_type, std::move(_callback));}private:std::string _method_name;ServiceDescribe::ServiceCallback _callback; // 实际的业务回调函数std::vectorServiceDescribe::ParamsDescribe _params_desc; // 参数字段格式描述VType _return_type; //结果作为返回值类型的描述};class ServiceManager {public:using ptr std::shared_ptrServiceManager;void insert(const ServiceDescribe::ptr desc) {std::unique_lockstd::mutex lock(_mutex);_services.insert(std::make_pair(desc-method(), desc));}ServiceDescribe::ptr select(const std::string method_name) {std::unique_lockstd::mutex lock(_mutex);auto it _services.find(method_name);if (it _services.end()) {return ServiceDescribe::ptr();}return it-second;}void remove(const std::string method_name) {std::unique_lockstd::mutex lock(_mutex);_services.erase(method_name);}private:std::mutex _mutex;std::unordered_mapstd::string, ServiceDescribe::ptr _services;};class RpcRouter {public:using ptr std::shared_ptrRpcRouter;RpcRouter(): _service_manager(std::make_sharedServiceManager()){}//这是注册到Dispatcher模块针对rpc请求进行回调处理的业务函数void onRpcRequest(const BaseConnection::ptr conn, RpcRequest::ptr request){//1. 查询客户端请求的方法描述--判断当前服务端能否提供对应的服务auto service _service_manager-select(request-method());if (service.get() nullptr) {ELOG(%s 服务未找到, request-method().c_str());return response(conn, request, Json::Value(), RCode::RCODE_NOT_FOUND_SERVICE);}//2. 进行参数校验确定能否提供服务if (service-paramCheck(request-params()) false) {ELOG(%s 服务参数校验失败, request-method().c_str());return response(conn, request, Json::Value(), RCode::RCODE_INVALID_PARAMS);}//3. 调用业务回调接口进行业务处理Json::Value result;bool ret service-call(request-params(), result);if (ret false) {ELOG(%s 服务参数校验失败, request-method().c_str());return response(conn, request, Json::Value(), RCode::RCODE_INTERNAL_ERROR);}//4. 处理完毕得到结果组织响应向客户端发送return response(conn, request, result, RCode::RCODE_OK);}void registerMethod(const ServiceDescribe::ptr service) {return _service_manager-insert(service);}private:void response(const BaseConnection::ptr conn, const RpcRequest::ptr req, const Json::Value res, RCode rcode) {auto msg MessageFactory::createRpcResponse();msg-setId(req-rid());msg-setMType(myrpc::MType::RSP_RPC);msg-setRCode(rcode);msg-setResult(res);conn-send(msg);}private:ServiceManager::ptr _service_manager;};} }服务端 Publish Subscribe实现 对外提供主题操作处理回调函数 对外提供消息发布处理回调函数 内部进行主题及订阅者的管理 namespace myrpc {namespace server {class TopicManager {public:using ptr std::shared_ptrTopicManager;TopicManager() {}void onTopicRequest(const BaseConnection::ptr conn, const TopicRequest::ptr msg) {TopicOptype topic_optype msg-optype();bool ret true;switch(topic_optype){//主题的创建case TopicOptype::TOPIC_CREATE: topicCreate(conn, msg); break;//主题的删除case TopicOptype::TOPIC_REMOVE: topicRemove(conn, msg); break;//主题的订阅case TopicOptype::TOPIC_SUBSCRIBE: ret topicSubscribe(conn, msg); break;//主题的取消订阅case TopicOptype::TOPIC_CANCEL: topicCancel(conn, msg); break;//主题消息的发布case TopicOptype::TOPIC_PUBLISH: ret topicPublish(conn, msg); break;default: return errorResponse(conn, msg, RCode::RCODE_INVALID_OPTYPE);}if (!ret) return errorResponse(conn, msg, RCode::RCODE_NOT_FOUND_TOPIC);return topicResponse(conn, msg);}//一个订阅者在连接断开时的处理---删除其关联的数据void onShutdown(const BaseConnection::ptr conn) {//消息发布者断开连接不需要任何操作 消息订阅者断开连接需要删除管理数据//1. 判断断开连接的是否是订阅者不是的话则直接返回std::vectorTopic::ptr topics;Subscriber::ptr subscriber;{std::unique_lockstd::mutex lock(_mutex);auto it _subscribers.find(conn);if (it _subscribers.end()) {return;//断开的连接不是一个订阅者的连接}subscriber it-second;//2. 获取到订阅者退出受影响的主题对象for (auto topic_name : subscriber-topics) {auto topic_it _topics.find(topic_name);if (topic_it _topics.end()) continue;topics.push_back(topic_it-second);}//4. 从订阅者映射信息中删除订阅者_subscribers.erase(it);}//3. 从受影响的主题对象中移除订阅者for (auto topic : topics) {topic-removeSubscriber(subscriber);}}private:void errorResponse(const BaseConnection::ptr conn, const TopicRequest::ptr msg, RCode rcode) {auto msg_rsp MessageFactory::createTopicResponse();msg_rsp-setId(msg-rid());msg_rsp-setMType(MType::RSP_TOPIC);msg_rsp-setRCode(rcode);conn-send(msg_rsp);}void topicResponse(const BaseConnection::ptr conn, const TopicRequest::ptr msg) {auto msg_rsp MessageFactory::createTopicResponse();msg_rsp-setId(msg-rid());msg_rsp-setMType(MType::RSP_TOPIC);msg_rsp-setRCode(RCode::RCODE_OK);conn-send(msg_rsp);}void topicCreate(const BaseConnection::ptr conn, const TopicRequest::ptr msg) {std::unique_lockstd::mutex lock(_mutex);//构造一个主题对象添加映射关系的管理std::string topic_name msg-topicKey();auto topic std::make_sharedTopic(topic_name);_topics.insert(std::make_pair(topic_name, topic));}void topicRemove(const BaseConnection::ptr conn, const TopicRequest::ptr msg) {// 1. 查看当前主题有哪些订阅者然后从订阅者中将主题信息删除掉// 2. 删除主题的数据 -- 主题名称与主题对象的映射关系std::string topic_name msg-topicKey();std::unordered_setSubscriber::ptr subscribers;{std::unique_lockstd::mutex lock(_mutex);//在删除主题之前先找出会受到影响的订阅者auto it _topics.find(topic_name);if (it _topics.end()) {return;}subscribers it-second-subscribers;_topics.erase(it);//删除当前的主题映射关系}for (auto subscriber : subscribers) {subscriber-removeTopic(topic_name);}}bool topicSubscribe(const BaseConnection::ptr conn, const TopicRequest::ptr msg) {//1. 先找出主题对象以及订阅者对象// 如果没有找到主题--就要报错 但是如果没有找到订阅者对象那就要构造一个订阅者Topic::ptr topic;Subscriber::ptr subscriber;{std::unique_lockstd::mutex lock(_mutex);auto topic_it _topics.find(msg-topicKey());if (topic_it _topics.end()) {return false;}topic topic_it-second;auto sub_it _subscribers.find(conn);if (sub_it ! _subscribers.end()) {subscriber sub_it-second;}else {subscriber std::make_sharedSubscriber(conn);_subscribers.insert(std::make_pair(conn, subscriber));}}//2. 在主题对象中新增一个订阅者对象关联的连接 在订阅者对象中新增一个订阅的主题topic-appendSubscriber(subscriber);subscriber-appendTopic(msg-topicKey());return true;}void topicCancel(const BaseConnection::ptr conn, const TopicRequest::ptr msg) {//1. 先找出主题对象和订阅者对象Topic::ptr topic;Subscriber::ptr subscriber;{std::unique_lockstd::mutex lock(_mutex);auto topic_it _topics.find(msg-topicKey());if (topic_it ! _topics.end()) {topic topic_it-second;}auto sub_it _subscribers.find(conn);if (sub_it ! _subscribers.end()) {subscriber sub_it-second;}}//2. 从主题对象中删除当前的订阅者连接 从订阅者信息中删除所订阅的主题名称if (subscriber) subscriber-removeTopic(msg-topicKey());if (topic subscriber) topic-removeSubscriber(subscriber);}bool topicPublish(const BaseConnection::ptr conn, const TopicRequest::ptr msg) {Topic::ptr topic;{std::unique_lockstd::mutex lock(_mutex);auto topic_it _topics.find(msg-topicKey());if (topic_it _topics.end()) {return false;}topic topic_it-second;}topic-pushMessage(msg);return true;}private:struct Subscriber {using ptr std::shared_ptrSubscriber;std::mutex _mutex;BaseConnection::ptr conn;std::unordered_setstd::string topics;//订阅者所订阅的主题名称Subscriber(const BaseConnection::ptr c): conn(c) { }//订阅主题的时候调用void appendTopic(const std::string topic_name) {std::unique_lockstd::mutex lock(_mutex);topics.insert(topic_name);}//主题被删除 或者 取消订阅的时候调用void removeTopic(const std::string topic_name) {std::unique_lockstd::mutex lock(_mutex);topics.erase(topic_name);}};struct Topic {using ptr std::shared_ptrTopic;std::mutex _mutex;std::string topic_name;std::unordered_setSubscriber::ptr subscribers; //当前主题的订阅者Topic(const std::string name) : topic_name(name){}//新增订阅的时候调用void appendSubscriber(const Subscriber::ptr subscriber) {std::unique_lockstd::mutex lock(_mutex);subscribers.insert(subscriber);}//取消订阅 或者 订阅者连接断开 的时候调用void removeSubscriber(const Subscriber::ptr subscriber) {std::unique_lockstd::mutex lock(_mutex);subscribers.erase(subscriber);}//收到消息发布请求的时候调用void pushMessage(const BaseMessage::ptr msg) {std::unique_lockstd::mutex lock(_mutex);for (auto subscriber : subscribers) {subscriber-conn-send(msg);}}};private:std::mutex _mutex;std::unordered_mapstd::string, Topic::ptr _topics;std::unordered_mapBaseConnection::ptr, Subscriber::ptr _subscribers;};} }服务端 Registry Discovery实现 对外提供服务操作注册发现消息处理回调函数 内部进行服务发现者的管理 内部进行服务提供者的管理 namespace myrpc {namespace server {class ProviderManager {public:using ptr std::shared_ptrProviderManager;struct Provider {using ptr std::shared_ptrProvider;std::mutex _mutex;BaseConnection::ptr conn;Address host;std::vectorstd::string methods;Provider(const BaseConnection::ptr c, const Address h):conn(c), host(h){}void appendMethod(const std::string method) {std::unique_lockstd::mutex lock(_mutex);methods.emplace_back(method);}};//当一个新的服务提供者进行服务注册的时候调用void addProvider(const BaseConnection::ptr c, const Address h, const std::string method) {Provider::ptr provider;//查找连接所关联的服务提供者对象找到则获取找不到则创建并建立关联{std::unique_lockstd::mutex lock(_mutex);auto it _conns.find(c);if (it ! _conns.end()) {provider it-second;}else {provider std::make_sharedProvider(c, h);_conns.insert(std::make_pair(c, provider));}//method方法的提供主机要多出一个_providers新增数据auto providers _providers[method];providers.insert(provider);}//向服务对象中新增一个所能提供的服务名称provider-appendMethod(method);}//当一个服务提供者断开连接的时候获取他的信息--用于进行服务下线通知Provider::ptr getProvider(const BaseConnection::ptr c) {std::unique_lockstd::mutex lock(_mutex);auto it _conns.find(c);if (it ! _conns.end()) {return it-second;}return Provider::ptr();}//当一个服务提供者断开连接的时候删除它的关联信息void delProvider(const BaseConnection::ptr c) {std::unique_lockstd::mutex lock(_mutex);auto it _conns.find(c);if (it _conns.end()) {//当前断开连接的不是一个服务提供者return;}//如果是提供者看看提供了什么服务从服务者提供信息中删除当前服务提供者for (auto method : it-second-methods) {auto providers _providers[method];providers.erase(it-second);}//删除连接与服务提供者的关联关系_conns.erase(it);}std::vectorAddress methodHosts(const std::string method) {std::unique_lockstd::mutex lock(_mutex);auto it _providers.find(method);if (it _providers.end()) {return std::vectorAddress();}std::vectorAddress result;for (auto provider : it-second) {result.push_back(provider-host);}return result;}private:std::mutex _mutex;std::unordered_mapstd::string, std::setProvider::ptr _providers;std::unordered_mapBaseConnection::ptr, Provider::ptr _conns;};class DiscovererManager {public:using ptr std::shared_ptrDiscovererManager;struct Discoverer {using ptr std::shared_ptrDiscoverer;std::mutex _mutex;BaseConnection::ptr conn; //发现者关联的客户端连接std::vectorstd::string methods; //发现过的服务名称Discoverer(const BaseConnection::ptr c) : conn(c){}void appendMethod(const std::string method) {std::unique_lockstd::mutex lock(_mutex);methods.push_back(method);}};//当每次客户端进行服务发现的时候新增发现者新增服务名称Discoverer::ptr addDiscoverer(const BaseConnection::ptr c, const std::string method) {Discoverer::ptr discoverer;{std::unique_lockstd::mutex lock(_mutex);auto it _conns.find(c);if (it ! _conns.end()) {discoverer it-second;}else {discoverer std::make_sharedDiscoverer(c);_conns.insert(std::make_pair(c, discoverer));}auto discoverers _discoverers[method];discoverers.insert(discoverer);}discoverer-appendMethod(method);return discoverer;}//发现者客户端断开连接时找到发现者信息删除关联数据void delDiscoverer(const BaseConnection::ptr c) {std::unique_lockstd::mutex lock(_mutex);auto it _conns.find(c);if (it _conns.end()) {//没有找到连接对应的发现者信息代表客户端不是一个服务发现者return;}for (auto method : it-second-methods) {auto discoverers _discoverers[method];discoverers.erase(it-second);}_conns.erase(it);}//当有一个新的服务提供者上线则进行上线通知void onlineNotify(const std::string method, const Address host) {return notify(method, host, ServiceOptype::SERVICE_ONLINE);}//当有一个服务提供者断开连接则进行下线通知void offlineNotify(const std::string method, const Address host) {return notify(method, host, ServiceOptype::SERVICE_OFFLINE);}private:void notify(const std::string method, const Address host, ServiceOptype optype) {std::unique_lockstd::mutex lock(_mutex);auto it _discoverers.find(method);if (it _discoverers.end()) {//这代表这个服务当前没有发现者return;}auto msg_req MessageFactory::createServiceRequest();msg_req-setId(UUID::uuid());msg_req-setMType(MType::REQ_SERVICE);msg_req-setMethod(method);msg_req-setHost(host);msg_req-setOptype(optype);for (auto discoverer : it-second) {discoverer-conn-send(msg_req);}}private:std::mutex _mutex;std::unordered_mapstd::string, std::setDiscoverer::ptr _discoverers;std::unordered_mapBaseConnection::ptr, Discoverer::ptr _conns;};class PDManager {public:using ptr std::shared_ptrPDManager;PDManager():_providers(std::make_sharedProviderManager()),_discoverers(std::make_sharedDiscovererManager()){}void onServiceRequest(const BaseConnection::ptr conn, const ServiceRequest::ptr msg) {//服务操作请求服务注册/服务发现/ServiceOptype optype msg-optype();if (optype ServiceOptype::SERVICE_REGISTRY){//服务注册// 1. 新增服务提供者 2. 进行服务上线的通知ILOG(%s:%d 注册服务 %s, msg-host().first.c_str(), msg-host().second, msg-method().c_str());_providers-addProvider(conn, msg-host(), msg-method());_discoverers-onlineNotify(msg-method(), msg-host());return registryResponse(conn, msg);} else if (optype ServiceOptype::SERVICE_DISCOVERY){//服务发现// 1. 新增服务发现者ILOG(客户端要进行 %s 服务发现, msg-method().c_str());_discoverers-addDiscoverer(conn, msg-method());return discoveryResponse(conn, msg);}else {ELOG(收到服务操作请求但是操作类型错误);return errorResponse(conn, msg);}}void onConnShutdown(const BaseConnection::ptr conn) {auto provider _providers-getProvider(conn);if (provider.get() ! nullptr) {ILOG(%s:%d 服务下线, provider-host.first.c_str(), provider-host.second);for (auto method : provider-methods) {_discoverers-offlineNotify(method, provider-host);}_providers-delProvider(conn);}_discoverers-delDiscoverer(conn);}private:void errorResponse(const BaseConnection::ptr conn, const ServiceRequest::ptr msg) {auto msg_rsp MessageFactory::createServiceResponse();msg_rsp-setId(msg-rid());msg_rsp-setMType(MType::RSP_SERVICE);msg_rsp-setRCode(RCode::RCODE_INVALID_OPTYPE);msg_rsp-setOptype(ServiceOptype::SERVICE_UNKNOW);conn-send(msg_rsp);}void registryResponse(const BaseConnection::ptr conn, const ServiceRequest::ptr msg) {auto msg_rsp MessageFactory::createServiceResponse();msg_rsp-setId(msg-rid());msg_rsp-setMType(MType::RSP_SERVICE);msg_rsp-setRCode(RCode::RCODE_OK);msg_rsp-setOptype(ServiceOptype::SERVICE_REGISTRY);conn-send(msg_rsp);}void discoveryResponse(const BaseConnection::ptr conn, const ServiceRequest::ptr msg) {auto msg_rsp MessageFactory::createServiceResponse();msg_rsp-setId(msg-rid());msg_rsp-setMType(MType::RSP_SERVICE);msg_rsp-setOptype(ServiceOptype::SERVICE_DISCOVERY);std::vectorAddress hosts _providers-methodHosts(msg-method());if (hosts.empty()) {msg_rsp-setRCode(RCode::RCODE_NOT_FOUND_SERVICE);return conn-send(msg_rsp);} msg_rsp-setRCode(RCode::RCODE_OK);msg_rsp-setMethod(msg-method());msg_rsp-setHost(hosts);return conn-send(msg_rsp);}private:ProviderManager::ptr _providers;DiscovererManager::ptr _discoverers;};} }服务端整合 namespace myrpc {namespace server {//注册中心服务端只需要针对服务注册与发现请求进行处理即可class RegistryServer {public:using ptr std::shared_ptrRegistryServer;RegistryServer(int port):_pd_manager(std::make_sharedPDManager()),_dispatcher(std::make_sharedmyrpc::Dispatcher()){auto service_cb std::bind(PDManager::onServiceRequest, _pd_manager.get(),std::placeholders::_1, std::placeholders::_2);_dispatcher-registerHandlerServiceRequest(MType::REQ_SERVICE, service_cb);_server myrpc::ServerFactory::create(port);auto message_cb std::bind(myrpc::Dispatcher::onMessage, _dispatcher.get(), std::placeholders::_1, std::placeholders::_2);_server-setMessageCallback(message_cb);auto close_cb std::bind(RegistryServer::onConnShutdown, this, std::placeholders::_1);_server-setCloseCallback(close_cb);}void start() {_server-start();}private:void onConnShutdown(const BaseConnection::ptr conn) {_pd_manager-onConnShutdown(conn);}private:PDManager::ptr _pd_manager;Dispatcher::ptr _dispatcher;BaseServer::ptr _server;};class RpcServer {public:using ptr std::shared_ptrRpcServer;//rpc——server端有两套地址信息// 1. rpc服务提供端地址信息--必须是rpc服务器对外访问地址云服务器---监听地址和访问地址不同// 2. 注册中心服务端地址信息 -- 启用服务注册后连接注册中心进行服务注册用的RpcServer(const Address access_addr, bool enableRegistry false, const Address registry_server_addr Address()):_enableRegistry(enableRegistry),_access_addr(access_addr),_router(std::make_sharedmyrpc::server::RpcRouter()),_dispatcher(std::make_sharedmyrpc::Dispatcher()) {if (enableRegistry) {_reg_client std::make_sharedclient::RegistryClient(registry_server_addr.first, registry_server_addr.second);}//当前成员server是一个rpcserver用于提供rpc服务的auto rpc_cb std::bind(RpcRouter::onRpcRequest, _router.get(), std::placeholders::_1, std::placeholders::_2);_dispatcher-registerHandlermyrpc::RpcRequest(myrpc::MType::REQ_RPC, rpc_cb);_server myrpc::ServerFactory::create(access_addr.second);auto message_cb std::bind(myrpc::Dispatcher::onMessage, _dispatcher.get(), std::placeholders::_1, std::placeholders::_2);_server-setMessageCallback(message_cb);}void registerMethod(const ServiceDescribe::ptr service) {if (_enableRegistry) {_reg_client-registryMethod(service-method(), _access_addr);}_router-registerMethod(service);}void start() {_server-start();}private:bool _enableRegistry;Address _access_addr;client::RegistryClient::ptr _reg_client;RpcRouter::ptr _router;Dispatcher::ptr _dispatcher;BaseServer::ptr _server;};class TopicServer {public:using ptr std::shared_ptrTopicServer;TopicServer(int port):_topic_manager(std::make_sharedTopicManager()),_dispatcher(std::make_sharedmyrpc::Dispatcher()){auto topic_cb std::bind(TopicManager::onTopicRequest, _topic_manager.get(),std::placeholders::_1, std::placeholders::_2);_dispatcher-registerHandlerTopicRequest(MType::REQ_TOPIC, topic_cb);_server myrpc::ServerFactory::create(port);auto message_cb std::bind(myrpc::Dispatcher::onMessage, _dispatcher.get(), std::placeholders::_1, std::placeholders::_2);_server-setMessageCallback(message_cb);auto close_cb std::bind(TopicServer::onConnShutdown, this, std::placeholders::_1);_server-setCloseCallback(close_cb);}void start() {_server-start();}private:void onConnShutdown(const BaseConnection::ptr conn) {_topic_manager-onShutdown(conn);}private:TopicManager::ptr _topic_manager;Dispatcher::ptr _dispatcher;BaseServer::ptr _server;};} }客户端Requestor实现 提供发送请求的接口 内部进行请求 响应的管理 namespace myrpc {namespace client {class Requestor {public:using ptr std::shared_ptrRequestor;using RequestCallback std::functionvoid(const BaseMessage::ptr);using AsyncResponse std::futureBaseMessage::ptr;struct RequestDescribe {using ptr std::shared_ptrRequestDescribe;BaseMessage::ptr request;RType rtype;std::promiseBaseMessage::ptr response;RequestCallback callback;};void onResponse(const BaseConnection::ptr conn, BaseMessage::ptr msg){std::string rid msg-rid();RequestDescribe::ptr rdp getDescribe(rid);if (rdp.get() nullptr) {ELOG(收到响应 - %s但是未找到对应的请求描述, rid.c_str());return;}if (rdp-rtype RType::REQ_ASYNC) {rdp-response.set_value(msg);}else if (rdp-rtype RType::REQ_CALLBACK){if (rdp-callback) rdp-callback(msg);}else {ELOG(请求类型未知);}delDescribe(rid);}bool send(const BaseConnection::ptr conn, const BaseMessage::ptr req, AsyncResponse async_rsp) {RequestDescribe::ptr rdp newDescribe(req, RType::REQ_ASYNC);if (rdp.get() nullptr) {ELOG(构造请求描述对象失败);return false;}conn-send(req);async_rsp rdp-response.get_future();return true;}bool send(const BaseConnection::ptr conn, const BaseMessage::ptr req, BaseMessage::ptr rsp) {AsyncResponse rsp_future;bool ret send(conn, req, rsp_future);if (ret false) {return false;}rsp rsp_future.get();return true;}bool send(const BaseConnection::ptr conn, const BaseMessage::ptr req, const RequestCallback cb) {RequestDescribe::ptr rdp newDescribe(req, RType::REQ_CALLBACK, cb);if (rdp.get() nullptr) {ELOG(构造请求描述对象失败);return false;}conn-send(req);return true;}private:RequestDescribe::ptr newDescribe(const BaseMessage::ptr req, RType rtype, const RequestCallback cb RequestCallback()) {std::unique_lockstd::mutex lock(_mutex);RequestDescribe::ptr rd std::make_sharedRequestDescribe();rd-request req;rd-rtype rtype;if (rtype RType::REQ_CALLBACK cb) {rd-callback cb;}_request_desc.insert(std::make_pair(req-rid(), rd));return rd;}RequestDescribe::ptr getDescribe(const std::string rid) {std::unique_lockstd::mutex lock(_mutex);auto it _request_desc.find(rid);if (it _request_desc.end()) {return RequestDescribe::ptr();}return it-second;}void delDescribe(const std::string rid) {std::unique_lockstd::mutex lock(_mutex);_request_desc.erase(rid);}private:std::mutex _mutex;std::unordered_mapstd::string, RequestDescribe::ptr _request_desc;};} }客户端 RpcCaller实现 namespace myrpc {namespace client {class RpcCaller {public:using ptr std::shared_ptrRpcCaller;using JsonAsyncResponse std::futureJson::Value;using JsonResponseCallback std::functionvoid(const Json::Value);RpcCaller(const Requestor::ptr requestor): _requestor(requestor){}//requestor中的处理是针对BaseMessage进行处理的//用于在rpccaller中针对结果的处理是针对 RpcResponse里边的result进行的bool call(const BaseConnection::ptr conn, const std::string method, const Json::Value params, Json::Value result) {DLOG(开始同步rpc调用...);//1. 组织请求auto req_msg MessageFactory::createRpcRequest();req_msg-setId(UUID::uuid());req_msg-setMType(MType::REQ_RPC);req_msg-setMethod(method);req_msg-setParams(params);BaseMessage::ptr rsp_msg;//2. 发送请求bool ret _requestor-send(conn, std::dynamic_pointer_castBaseMessage(req_msg), rsp_msg);if (ret false) {ELOG(同步Rpc请求失败);return false;}DLOG(收到响应进行解析获取结果!);//3. 等待响应auto rpc_rsp_msg std::dynamic_pointer_castRpcResponse(rsp_msg);if (!rpc_rsp_msg) {ELOG(rpc响应向下类型转换失败);return false;}if (rpc_rsp_msg-rcode() ! RCode::RCODE_OK) {ELOG(rpc请求出错%s, errReason(rpc_rsp_msg-rcode()));return false;}result rpc_rsp_msg-result();DLOG(结果设置完毕);return true;}bool call(const BaseConnection::ptr conn, const std::string method, const Json::Value params, JsonAsyncResponse result) {//向服务器发送异步回调请求设置回调函数回调函数中会传入一个promise对象在回调函数中去堆promise设置数据auto req_msg MessageFactory::createRpcRequest();req_msg-setId(UUID::uuid());req_msg-setMType(MType::REQ_RPC);req_msg-setMethod(method);req_msg-setParams(params);auto json_promise std::make_sharedstd::promiseJson::Value() ;result json_promise-get_future();Requestor::RequestCallback cb std::bind(RpcCaller::Callback, this, json_promise, std::placeholders::_1);bool ret _requestor-send(conn, std::dynamic_pointer_castBaseMessage(req_msg), cb);if (ret false) {ELOG(异步Rpc请求失败);return false;}return true;}bool call(const BaseConnection::ptr conn, const std::string method, const Json::Value params, const JsonResponseCallback cb) {auto req_msg MessageFactory::createRpcRequest();req_msg-setId(UUID::uuid());req_msg-setMType(MType::REQ_RPC);req_msg-setMethod(method);req_msg-setParams(params);Requestor::RequestCallback req_cb std::bind(RpcCaller::Callback1, this, cb, std::placeholders::_1);bool ret _requestor-send(conn, std::dynamic_pointer_castBaseMessage(req_msg), req_cb);if (ret false) {ELOG(回调Rpc请求失败);return false;}return true;}private:void Callback1(const JsonResponseCallback cb, const BaseMessage::ptr msg) {auto rpc_rsp_msg std::dynamic_pointer_castRpcResponse(msg);if (!rpc_rsp_msg) {ELOG(rpc响应向下类型转换失败);return ;}if (rpc_rsp_msg-rcode() ! RCode::RCODE_OK) {ELOG(rpc回调请求出错%s, errReason(rpc_rsp_msg-rcode()));return ;}cb(rpc_rsp_msg-result());}void Callback(std::shared_ptrstd::promiseJson::Value result, const BaseMessage::ptr msg) {auto rpc_rsp_msg std::dynamic_pointer_castRpcResponse(msg);if (!rpc_rsp_msg) {ELOG(rpc响应向下类型转换失败);return ;}if (rpc_rsp_msg-rcode() ! RCode::RCODE_OK) {ELOG(rpc异步请求出错%s, errReason(rpc_rsp_msg-rcode()));return ;}result-set_value(rpc_rsp_msg-result());}private:Requestor::ptr _requestor;};} }客户端 Publish Subscribe实现 提供消息发布接口 提供主题操作接口 内部进行主题及订阅者的管理 namespace myrpc {namespace client {class TopicManager {public:using SubCallback std::functionvoid(const std::string key, const std::string msg);using ptr std::shared_ptrTopicManager;TopicManager(const Requestor::ptr requestor) : _requestor(requestor) {}bool create(const BaseConnection::ptr conn, const std::string key) {return commonRequest(conn, key, TopicOptype::TOPIC_CREATE);}bool remove(const BaseConnection::ptr conn, const std::string key) {return commonRequest(conn, key, TopicOptype::TOPIC_REMOVE);}bool subscribe(const BaseConnection::ptr conn, const std::string key, const SubCallback cb) {addSubscribe(key, cb);bool ret commonRequest(conn, key, TopicOptype::TOPIC_SUBSCRIBE);if (ret false) {delSubscribe(key);return false;}return true;}bool cancel(const BaseConnection::ptr conn, const std::string key) {delSubscribe(key);return commonRequest(conn, key, TopicOptype::TOPIC_CANCEL);}bool publish(const BaseConnection::ptr conn, const std::string key, const std::string msg) {return commonRequest(conn, key, TopicOptype::TOPIC_PUBLISH, msg);}void onPublish(const BaseConnection::ptr conn, const TopicRequest::ptr msg) {//1. 从消息中取出操作类型进行判断是否是消息请求auto type msg-optype();if (type ! TopicOptype::TOPIC_PUBLISH) {ELOG(收到了错误类型的主题操作);return ;}//2. 取出消息主题名称以及消息内容std::string topic_key msg-topicKey();std::string topic_msg msg-topicMsg();//3. 通过主题名称查找对应主题的回调处理函数有在处理无在报错auto callback getSubscribe(topic_key);if (!callback) {ELOG(收到了 %s 主题消息但是该消息无主题处理回调, topic_key.c_str());return ;}return callback(topic_key, topic_msg);}private:void addSubscribe(const std::string key, const SubCallback cb) {std::unique_lockstd::mutex lock(_mutex);_topic_callbacks.insert(std::make_pair(key, cb));}void delSubscribe(const std::string key) {std::unique_lockstd::mutex lock(_mutex);_topic_callbacks.erase(key);}const SubCallback getSubscribe(const std::string key) {std::unique_lockstd::mutex lock(_mutex);auto it _topic_callbacks.find(key);if (it _topic_callbacks.end()) {return SubCallback();}return it-second;}bool commonRequest(const BaseConnection::ptr conn, const std::string key, TopicOptype type, const std::string msg ) {//1. 构造请求对象并填充数据auto msg_req MessageFactory::createTopicRequest();msg_req-setId(UUID::uuid());msg_req-setMType(MType::REQ_TOPIC);msg_req-setOptype(type);msg_req-setTopicKey(key);if (type TopicOptype::TOPIC_PUBLISH) {msg_req-setTopicMsg(msg);}//2. 向服务端发送请求等待响应BaseMessage::ptr msg_rsp;bool ret _requestor-send(conn, msg_req, msg_rsp);if (ret false) {ELOG(主题操作请求失败);return false;}//3. 判断请求处理是否成功auto topic_rsp_msg std::dynamic_pointer_castTopicResponse(msg_rsp);if (!topic_rsp_msg) {ELOG(主题操作响应向下类型转换失败);return false;}if (topic_rsp_msg-rcode() ! RCode::RCODE_OK) {ELOG(主题操作请求出错%s, errReason(topic_rsp_msg-rcode()));return false;}return true;}private:std::mutex _mutex;std::unordered_mapstd::string, SubCallback _topic_callbacks;Requestor::ptr _requestor;};} }客户端Registry Discovery 实现 提供服务发现接口 提供服务注册接口 提供服务上线下线操作通知处理回调函数 内部进行发现的服务与主机信息管理 namespace myrpc {namespace client {class Provider {public:using ptr std::shared_ptrProvider;Provider(const Requestor::ptr requestor) : _requestor(requestor){}bool registryMethod(const BaseConnection::ptr conn, const std::string method, const Address host) {auto msg_req MessageFactory::createServiceRequest();msg_req-setId(UUID::uuid());msg_req-setMType(MType::REQ_SERVICE);msg_req-setMethod(method);msg_req-setHost(host);msg_req-setOptype(ServiceOptype::SERVICE_REGISTRY);BaseMessage::ptr msg_rsp;bool ret _requestor-send(conn, msg_req, msg_rsp);if (ret false) {ELOG(%s 服务注册失败, method.c_str());return false;}auto service_rsp std::dynamic_pointer_castServiceResponse(msg_rsp);if (service_rsp.get() nullptr) {ELOG(响应类型向下转换失败);return false;}if (service_rsp-rcode() ! RCode::RCODE_OK) {ELOG(服务注册失败原因%s, errReason(service_rsp-rcode()).c_str());return false;}return true;}private:Requestor::ptr _requestor;};class MethodHost {public:using ptr std::shared_ptrMethodHost;MethodHost(): _idx(0){}MethodHost(const std::vectorAddress hosts):_hosts(hosts.begin(), hosts.end()), _idx(0){}void appendHost(const Address host) {//中途收到了服务上线请求后被调用std::unique_lockstd::mutex lock(_mutex);_hosts.push_back(host);}void removeHost(const Address host) {//中途收到了服务下线请求后被调用std::unique_lockstd::mutex lock(_mutex);for (auto it _hosts.begin(); it ! _hosts.end(); it) {if (*it host) {_hosts.erase(it);break;}}}Address chooseHost() {std::unique_lockstd::mutex lock(_mutex);size_t pos _idx % _hosts.size();return _hosts[pos];}bool empty() {std::unique_lockstd::mutex lock(_mutex);return _hosts.empty();}private:std::mutex _mutex;size_t _idx;std::vectorAddress _hosts;};class Discoverer {public:using OfflineCallback std::functionvoid(const Address);using ptr std::shared_ptrDiscoverer;Discoverer(const Requestor::ptr requestor, const OfflineCallback cb) : _requestor(requestor), _offline_callback(cb){}bool serviceDiscovery(const BaseConnection::ptr conn, const std::string method, Address host) {{//当前所保管的提供者信息存在则直接返回地址std::unique_lockstd::mutex lock(_mutex);auto it _method_hosts.find(method);if (it ! _method_hosts.end()) {if (it-second-empty() false) {host it-second-chooseHost();return true;}}}//当前服务的提供者为空auto msg_req MessageFactory::createServiceRequest();msg_req-setId(UUID::uuid());msg_req-setMType(MType::REQ_SERVICE);msg_req-setMethod(method);msg_req-setOptype(ServiceOptype::SERVICE_DISCOVERY);BaseMessage::ptr msg_rsp;bool ret _requestor-send(conn, msg_req, msg_rsp);if (ret false) {ELOG(服务发现失败);return false;}auto service_rsp std::dynamic_pointer_castServiceResponse(msg_rsp);if (!service_rsp) {ELOG(服务发现失败响应类型转换失败);return false;}if (service_rsp-rcode() ! RCode::RCODE_OK) {ELOG(服务发现失败%s, errReason(service_rsp-rcode()).c_str());return false;}//能走到这里代表当前是没有对应的服务提供主机的std::unique_lockstd::mutex lock(_mutex);auto method_host std::make_sharedMethodHost(service_rsp-hosts());if (method_host-empty()) {ELOG(%s 服务发现失败没有能够提供服务的主机, method.c_str());return false;}host method_host-chooseHost();_method_hosts[method] method_host;return true;}//这个接口是提供给Dispatcher模块进行服务上线下线请求处理的回调函数void onServiceRequest(const BaseConnection::ptr conn, const ServiceRequest::ptr msg) {//1. 判断是上线还是下线请求如果都不是那就不用处理了auto optype msg-optype();std::string method msg-method();std::unique_lockstd::mutex lock(_mutex);if (optype ServiceOptype::SERVICE_ONLINE){//2. 上线请求找到MethodHost向其中新增一个主机地址auto it _method_hosts.find(method);if (it _method_hosts.end()) {auto method_host std::make_sharedMethodHost();method_host-appendHost(msg-host());_method_hosts[method] method_host;}else {it-second-appendHost(msg-host());}} else if (optype ServiceOptype::SERVICE_OFFLINE){//3. 下线请求找到MethodHost从其中删除一个主机地址auto it _method_hosts.find(method);if (it _method_hosts.end()) {return;}it-second-removeHost(msg-host());_offline_callback(msg-host());}}private:OfflineCallback _offline_callback;std::mutex _mutex;std::unordered_mapstd::string, MethodHost::ptr _method_hosts;Requestor::ptr _requestor;};} }客户端整合 namespace myrpc {namespace client {class RegistryClient {public:using ptr std::shared_ptrRegistryClient;//构造函数传入注册中心的地址信息用于连接注册中心RegistryClient(const std::string ip, int port):_requestor(std::make_sharedRequestor()),_provider(std::make_sharedclient::Provider(_requestor)),_dispatcher(std::make_sharedDispatcher()) {auto rsp_cb std::bind(client::Requestor::onResponse, _requestor.get(), std::placeholders::_1, std::placeholders::_2);_dispatcher-registerHandlerBaseMessage(MType::RSP_SERVICE, rsp_cb);auto message_cb std::bind(Dispatcher::onMessage, _dispatcher.get(), std::placeholders::_1, std::placeholders::_2);_client ClientFactory::create(ip, port);_client-setMessageCallback(message_cb);_client-connect();}//向外提供的服务注册接口bool registryMethod(const std::string method, const Address host) {return _provider-registryMethod(_client-connection(), method, host);}private:Requestor::ptr _requestor;client::Provider::ptr _provider;Dispatcher::ptr _dispatcher;BaseClient::ptr _client;};class DiscoveryClient {public:using ptr std::shared_ptrDiscoveryClient;//构造函数传入注册中心的地址信息用于连接注册中心DiscoveryClient(const std::string ip, int port, const Discoverer::OfflineCallback cb): _requestor(std::make_sharedRequestor()),_discoverer(std::make_sharedclient::Discoverer(_requestor, cb)),_dispatcher(std::make_sharedDispatcher()){auto rsp_cb std::bind(client::Requestor::onResponse, _requestor.get(), std::placeholders::_1, std::placeholders::_2);_dispatcher-registerHandlerBaseMessage(MType::RSP_SERVICE, rsp_cb);auto req_cb std::bind(client::Discoverer::onServiceRequest, _discoverer.get(),std::placeholders::_1, std::placeholders::_2);_dispatcher-registerHandlerServiceRequest(MType::REQ_SERVICE, req_cb);auto message_cb std::bind(Dispatcher::onMessage, _dispatcher.get(), std::placeholders::_1, std::placeholders::_2);_client ClientFactory::create(ip, port);_client-setMessageCallback(message_cb);_client-connect();}//向外提供的服务发现接口bool serviceDiscovery(const std::string method, Address host) {return _discoverer-serviceDiscovery(_client-connection(), method, host);}private:Requestor::ptr _requestor;client::Discoverer::ptr _discoverer;Dispatcher::ptr _dispatcher;BaseClient::ptr _client;};class RpcClient {public:using ptr std::shared_ptrRpcClient;//enableDiscovery--是否启用服务发现功能也决定了传入的地址信息是注册中心的地址还是服务提供者的地址RpcClient(bool enableDiscovery, const std::string ip, int port):_enableDiscovery(enableDiscovery),_requestor(std::make_sharedRequestor()),_dispatcher(std::make_sharedDispatcher()),_caller(std::make_sharedmyrpc::client::RpcCaller(_requestor)) {//针对rpc请求后的响应进行的回调处理auto rsp_cb std::bind(client::Requestor::onResponse, _requestor.get(), std::placeholders::_1, std::placeholders::_2);_dispatcher-registerHandlerBaseMessage(MType::RSP_RPC, rsp_cb);//如果启用了服务发现地址信息是注册中心的地址是服务发现客户端需要连接的地址则通过地址信息实例化discovery_client//如果没有启用服务发现则地址信息是服务提供者的地址则直接实例化好rpc_clientif (_enableDiscovery) {auto offline_cb std::bind(RpcClient::delClient, this, std::placeholders::_1);_discovery_client std::make_sharedDiscoveryClient(ip, port, offline_cb);}else {auto message_cb std::bind(Dispatcher::onMessage, _dispatcher.get(), std::placeholders::_1, std::placeholders::_2);_rpc_client ClientFactory::create(ip, port);_rpc_client-setMessageCallback(message_cb);_rpc_client-connect();}}bool call(const std::string method, const Json::Value params, Json::Value result) {//获取服务提供者1. 服务发现 2. 固定服务提供者BaseClient::ptr client getClient(method);if (client.get() nullptr) {return false;}//3. 通过客户端连接发送rpc请求return _caller-call(client-connection(), method, params, result);}bool call(const std::string method, const Json::Value params, RpcCaller::JsonAsyncResponse result) {BaseClient::ptr client getClient(method);if (client.get() nullptr) {return false;}//3. 通过客户端连接发送rpc请求return _caller-call(client-connection(), method, params, result);}bool call(const std::string method, const Json::Value params, const RpcCaller::JsonResponseCallback cb) {BaseClient::ptr client getClient(method);if (client.get() nullptr) {return false;}//3. 通过客户端连接发送rpc请求return _caller-call(client-connection(), method, params, cb);}private:BaseClient::ptr newClient(const Address host) {auto message_cb std::bind(Dispatcher::onMessage, _dispatcher.get(), std::placeholders::_1, std::placeholders::_2);auto client ClientFactory::create(host.first, host.second);client-setMessageCallback(message_cb);client-connect();putClient(host, client);return client;}BaseClient::ptr getClient(const Address host) {std::unique_lockstd::mutex lock(_mutex);auto it _rpc_clients.find(host);if (it _rpc_clients.end()) {return BaseClient::ptr();}return it-second;}BaseClient::ptr getClient(const std::string method) {BaseClient::ptr client;if (_enableDiscovery) {//1. 通过服务发现获取服务提供者地址信息Address host;bool ret _discovery_client-serviceDiscovery(method, host);if (ret false) {ELOG(当前 %s 服务没有找到服务提供者, method.c_str());return BaseClient::ptr();}//2. 查看服务提供者是否已有实例化客户端有则直接使用没有则创建client getClient(host);if (client.get() nullptr) {//没有找打已实例化的客户端则创建client newClient(host);}}else {client _rpc_client;}return client;}void putClient(const Address host, BaseClient::ptr client) {std::unique_lockstd::mutex lock(_mutex);_rpc_clients.insert(std::make_pair(host, client));}void delClient(const Address host) {std::unique_lockstd::mutex lock(_mutex);_rpc_clients.erase(host);}private:struct AddressHash {size_t operator()(const Address host) const{std::string addr host.first std::to_string(host.second);return std::hashstd::string{}(addr);}};bool _enableDiscovery;DiscoveryClient::ptr _discovery_client;Requestor::ptr _requestor;RpcCaller::ptr _caller;Dispatcher::ptr _dispatcher;BaseClient::ptr _rpc_client;//用于未启用服务发现std::mutex _mutex;//127.0.0.1:8080, client1std::unordered_mapAddress, BaseClient::ptr, AddressHash _rpc_clients;//用于服务发现的客户端连接池};class TopicClient {public:TopicClient(const std::string ip, int port):_requestor(std::make_sharedRequestor()),_dispatcher(std::make_sharedDispatcher()),_topic_manager(std::make_sharedTopicManager(_requestor)) {auto rsp_cb std::bind(Requestor::onResponse, _requestor.get(), std::placeholders::_1, std::placeholders::_2);_dispatcher-registerHandlerBaseMessage(MType::RSP_TOPIC, rsp_cb);auto msg_cb std::bind(TopicManager::onPublish, _topic_manager.get(),std::placeholders::_1, std::placeholders::_2);_dispatcher-registerHandlerTopicRequest(MType::REQ_TOPIC, msg_cb);auto message_cb std::bind(Dispatcher::onMessage, _dispatcher.get(), std::placeholders::_1, std::placeholders::_2);_rpc_client ClientFactory::create(ip, port);_rpc_client-setMessageCallback(message_cb);_rpc_client-connect();}bool create(const std::string key) {return _topic_manager-create(_rpc_client-connection(), key);}bool remove(const std::string key) {return _topic_manager-remove(_rpc_client-connection(), key);}bool subscribe(const std::string key, const TopicManager::SubCallback cb) {return _topic_manager-subscribe(_rpc_client-connection(), key, cb);}bool cancel(const std::string key) {return _topic_manager-cancel(_rpc_client-connection(), key);}bool publish(const std::string key, const std::string msg) {return _topic_manager-publish(_rpc_client-connection(), key, msg);}void shutdown() {_rpc_client-shutdown();}private:Requestor::ptr _requestor;TopicManager::ptr _topic_manager;Dispatcher::ptr _dispatcher;BaseClient::ptr _rpc_client;//用于未启用服务发现};} }八、项目总结 项目功能实现了基础的rpc远程调用功能以及基于服务注册与发现的rpc远程调用功能简单的发布订阅功能 所用技术json序列化反序列化网络通信Muduorpc发布订阅 框架设计 抽象层针对底层的网络通信和协议部分进行了抽象降低框架的依赖提高框架的灵活度以及可维护性 具象层针对抽象的功能进行具体的实现Muduo库搭建高性能客户端服务器TLV应用层协议格式消息类型 业务层基础rpc服务发现与注册以及上线下线通知发布订阅 具体模块划分 应用层协议的抽象与实现 网络通信模块的抽象与实现 消息的抽象与实现 rpc客户端与服务端业务 服务发现与注册业务 发布订阅业务 项目代码 Json-Rpc框架
http://www.zqtcl.cn/news/700080/

相关文章:

  • php制作网站网站开发与客户沟通
  • 百度网站建设平台微盟微商城官网
  • 三明网站seo上海中学分数线
  • 青岛谷歌网站建设网站建站公司排名
  • 成都旅游网站建设规划windows优化大师官方
  • 福永网站建设公司哪家好财务公司承兑汇票
  • 青岛快速建站模板制作公司网页什么价位
  • 网站建设公司的经营范围wordpress设置文本编辑器
  • 做网站用微软雅黑侵权吗wordpress 同类文章
  • 免费下载建设银行官方网站自己做网站犯法吗
  • 手机网站html代码附近做广告牌的店
  • 建设和优化网站的步骤wordpress 模板 含数据库
  • 太原制作网站的工作室wordpress弹幕播放器
  • 英语网站开发菏泽做网站优化的
  • 宜昌建设网站公司做网站语言服务器 空间
  • 湖南做网站价格广州网站建设哪家便宜
  • 建筑工程素材资源网站中山做网站建设联系电话
  • 做网站关键词集团网站群建设方案
  • 网站开发有哪些课程网站开发好要租服务器吗
  • 鲜花店网站建设的规模设想网站之间的差异
  • 网站怎么在百度做推广郑州建网站
  • 机关门户网站建设顺义做网站
  • 网站开发公司东莞环球军事头条
  • 企业网站管理系统添加教程如何用python开发网页
  • 公司网站建设需要资质wordpress admin
  • 万维网网站301重定向怎么做国家城乡建设规划部网站
  • 现在的网站内容区域做多宽俄文网站开发翻译
  • 上海闵行建设局官方网站做电影网站的流程
  • 怎样做水族馆网站wordpress第三方订阅地址
  • 东莞做网站注意事项如何查网站的百度快照