做网站 套用模板之后用什么改,重装没有设置wordpress,建设自己公司的网站首页,在建工程RPC
RPC 框架是分布式领域核心组件#xff0c;也是微服务的基础。
RPC #xff08;Remote Procedure Call#xff09;全称是远程过程调用#xff0c;相对于本地方法调用#xff0c;在同一内存空间可以直接通过方法栈实现调用#xff0c;远程调用则跨了不同的服务终端也是微服务的基础。
RPC Remote Procedure Call全称是远程过程调用相对于本地方法调用在同一内存空间可以直接通过方法栈实现调用远程调用则跨了不同的服务终端并不能直接调用。
RPC框架 要解决的就是远程方法调用的问题并且实现调用远程服务像调用本地服务一样简单框架内部封装实现了网络调用的细节。 1. 通信协议选择
根据不同的需求来选择通信协议UDP是不可靠传输一般来说很少做为RPC框架的选择。
TCP和HTTP是最佳选择。
HTTP虽然有很多无用的头部信息传输效率上会比较低但是HTTP通用性更强跨语言跨平台更易移植。
TCP可靠传输需要自定义协议传输效率更高但是通用性不强。
1.1 HTTP/1.0和HTTP/1.1的区别
HTTP1.0最早在网页中使用是在1996年那个时候只是使用一些较为简单的网页上和网络请求上而HTTP1.1则在1999年才开始广泛应用于现在的各大浏览器网络请求中同时HTTP1.1也是当前使用最为广泛的HTTP协议。 主要区别主要体现在
缓存处理在HTTP1.0中主要使用header里的If-Modified-Since,Expires来做为缓存判断的标准HTTP1.1则引入了更多的缓存控制策略例如Entity tagIf-Unmodified-Since, If-Match, If-None-Match等更多可供选择的缓存头来控制缓存策略。带宽优化及网络连接的使用HTTP1.0中存在一些浪费带宽的现象例如客户端只是需要某个对象的一部分而服务器却将整个对象送过来了并且不支持断点续传功能HTTP1.1则在请求头引入了range头域它允许只请求资源的某个部分即返回码是206Partial Content这样就方便了开发者自由的选择以便于充分利用带宽和连接。错误通知的管理在HTTP1.1中新增了24个错误状态响应码如409Conflict表示请求的资源与资源的当前状态发生冲突410Gone表示服务器上的某个资源被永久性的删除。Host头处理在HTTP1.0中认为每台服务器都绑定一个唯一的IP地址因此请求消息中的URL并没有传递主机名hostname。但随着虚拟主机技术的发展在一台物理服务器上可以存在多个虚拟主机Multi-homed Web Servers并且它们共享一个IP地址。HTTP1.1的请求消息和响应消息都应支持Host头域且请求消息中如果没有Host头域会报告一个错误400 Bad Request。长连接HTTP 1.1支持长连接PersistentConnection和请求的流水线Pipelining处理在一个TCP连接上可以传送多个HTTP请求和响应减少了建立和关闭连接的消耗和延迟在HTTP1.1中默认开启Connection keep-alive一定程度上弥补了HTTP1.0每次请求都要创建连接的缺点。
1.2 HTTP/1.1和HTTP/2的区别
新的二进制格式Binary FormatHTTP1.x的解析是基于文本。基于文本协议的格式解析存在天然缺陷文本的表现形式有多样性要做到健壮性考虑的场景必然很多二进制则不同只认0和1的组合。基于这种考虑HTTP2.0的协议解析决定采用二进制格式实现方便且健壮。多路复用MultiPlexing即连接共享即每一个request都是是用作连接共享机制的。一个request对应一个id这样一个连接上可以有多个request每个连接的request可以随机的混杂在一起接收方可以根据request的 id将request再归属到各自不同的服务端请求里面。header压缩如上文中所言对前面提到过HTTP1.x的header带有大量信息而且每次都要重复发送HTTP2.0使用encoder来减少需要传输的header大小通讯双方各自cache一份header fields表既避免了重复header的传输又减小了需要传输的大小。服务端推送server pushHTTP2.0也具有server push功能。 grpc采用了http2协议由于http的通用性所以现在的几乎所有的rpc框架都支持grpc 2. 序列化协议
数据在网络中传输必须是二进制的所以我们需要先将传输的对象进行序列化之后才能传输。
接收方通过反序列化将数据解析出来。
序列化协议有XML、 JSON、Protobuf、Thrift 等Golang 原生支持的 Gob 协议。
3. 编解码
如果使用TCP我们需要定义数据传输的格式防止在传输过程中出现的粘包拆包等问题。 假设客户端分别发送了两个数据包D1和D2给服务端由于服务端一次读取到字节数是不确定的故可能存在以下四种情况
服务端分两次读取到了两个独立的数据包分别是D1和D2没有粘包和拆包服务端一次接受到了两个数据包D1和D2粘合在一起称之为TCP粘包服务端分两次读取到了数据包第一次读取到了完整的D1包和D2包的部分内容第二次读取到了D2包的剩余内容这称之为TCP拆包服务端分两次读取到了数据包第一次读取到了D1包的部分内容D1_1第二次读取到了D1包的剩余部分内容D1_2和完整的D2包。
特别要注意的是如果TCP的接受滑窗非常小而数据包D1和D2比较大很有可能会发生第五种情况即服务端分多次才能将D1和D2包完全接收期间发生多次拆包。
自定义格式可以使用定长的头和不定长的体标识数据长度即可。
1字节1字节4字节1字节1字节1字节8字节不定魔法数Magic Number版本Version消息长度full length消息类型messageType压缩类型compress序列化类型serialize请求idrequestId请求体body
magic number : 通信双方协商的一个暗号 魔数的作用是用于服务端在接收数据时先解析出魔数做正确性对比。如果和协议中的魔数不匹配则认为是非法数据version : 不同版本的协议对应的解析方法可能是不同的应对业务变化需求full length 记录了整个消息的长度messageType普通请求、普通响应、心跳等根据消息类型做出不同的解析compress 序列化的字节流还可以进行压缩使得体积更小在网络传输更快不一定要使用serialize序列化方式比如jsonprotostuffglob等request id每个请求分配好请求Id这样响应数据的时候才能对的上body具体的数据
4. 实现
4.1 http方式
package rpcimport (bufiobytesencoding/jsonerrorsfmtionet/httpnet/urlstringstime
)type MsHttpClient struct {client http.Client
}// NewHttpClient Transport请求分发协程安全支持连接池s
func NewHttpClient() *MsHttpClient {client : http.Client{Timeout: time.Duration(3) * time.Second,Transport: http.Transport{MaxIdleConnsPerHost: 5,MaxConnsPerHost: 100,IdleConnTimeout: 90 * time.Second,TLSHandshakeTimeout: 10 * time.Second,ExpectContinueTimeout: 1 * time.Second,},}return MsHttpClient{client: client}
}func (c *MsHttpClient) GetRequest(method string, url string, args map[string]any) (*http.Request, error) {if args ! nil len(args) 0 {url url ? c.toValues(args)}req, err : http.NewRequest(method, url, nil)if err ! nil {return nil, err}return req, nil
}func (c *MsHttpClient) FormRequest(method string, url string, args map[string]any) (*http.Request, error) {req, err : http.NewRequest(method, url, strings.NewReader(c.toValues(args)))if err ! nil {return nil, err}return req, nil
}func (c *MsHttpClient) JsonRequest(method string, url string, args map[string]any) (*http.Request, error) {jsonStr, _ : json.Marshal(args)req, err : http.NewRequest(method, url, bytes.NewReader(jsonStr))if err ! nil {return nil, err}return req, nil
}func (c *MsHttpClient) Get(url string, args map[string]any) ([]byte, error) {if args ! nil len(args) 0 {url url ? c.toValues(args)}req, err : http.NewRequest(GET, url, nil)if err ! nil {return nil, err}return c.handleResponse(req)
}func (c *MsHttpClient) PostForm(url string, args map[string]any) ([]byte, error) {req, err : http.NewRequest(POST, url, strings.NewReader(c.toValues(args)))if err ! nil {return nil, err}return c.handleResponse(req)
}func (c *MsHttpClient) PostJson(url string, args map[string]any) ([]byte, error) {jsonStr, _ : json.Marshal(args)req, err : http.NewRequest(POST, url, bytes.NewReader(jsonStr))if err ! nil {return nil, err}return c.handleResponse(req)
}func (c *MsHttpClient) Response(req *http.Request) ([]byte, error) {return c.handleResponse(req)
}
func (c *MsHttpClient) handleResponse(req *http.Request) ([]byte, error) {var err errorresponse, err : c.client.Do(req)if err ! nil {return nil, err}if response.StatusCode ! 200 {return nil, errors.New(response.Status)}buffLen : 79buff : make([]byte, buffLen)body : make([]byte, 0)reader : bufio.NewReader(response.Body)for {n, err : reader.Read(buff)if err io.EOF || n 0 {break}body append(body, buff[:n]...)if n buffLen {break}}defer response.Body.Close()if err ! nil {return nil, err}return body, nil
}func (c *MsHttpClient) toValues(args map[string]any) string {if args ! nil len(args) 0 {params : url.Values{}for k, v : range args {params.Set(k, fmt.Sprintf(%v, v))}return params.Encode()}return
}
ordercenter
package mainimport (encoding/jsonfmtgithub.com/mszlu521/msgogithub.com/mszlu521/msgo/rpcnet/http
)type Result struct {Code int json:codeMsg string json:msgData any json:data
}type Goods struct {Id int64 json:idName string json:name
}func main() {engine : msgo.Default()client : rpc.NewHttpClient()g : engine.Group(order)g.Get(/find, func(ctx *msgo.Context) {//查询商品bytes, err : client.Get(http://localhost:9002/goods/find, nil)if err ! nil {ctx.Logger.Error(err)}fmt.Println(string(bytes))v : Result{}json.Unmarshal(bytes, v)ctx.JSON(http.StatusOK, v)})engine.Run(:9003)
}
goodsCenter
package mainimport (github.com/mszlu521/msgonet/http
)type Result struct {Code int json:codeMsg string json:msgData any json:data
}type Goods struct {Id int64 json:idName string json:name
}func main() {engine : msgo.Default()g : engine.Group(goods)g.Get(/find, func(ctx *msgo.Context) {//查询商品goods : Goods{Id: 1000, Name: 商品中心9001商品}ctx.JSON(http.StatusOK, Result{Code: 200, Msg: success, Data: goods})})engine.Run(:9002)
}
4.2 改造http方式
config:
package rpcimport strconvtype Config struct {Protocol stringHost stringPort intSsl bool
}func (c Config) Url() string {switch c.Protocol {case HTTP, HTTP2:prefix : http://if c.Ssl {prefix https://}return prefix c.Host : strconv.FormatInt(int64(c.Port), 10)}return
}const (HTTP HTTPHTTP2 HTTP2TCP TCP
)const (GET GETPOSTForm POST_FORMPOSTJson POST_JSON
)
rpc.go:
package rpctype MsService interface {Env() Config
} func (c *MsHttpClient) Use(name string, s MsService) {if c.serviceMap nil {c.serviceMap make(map[string]MsService)}c.serviceMap[name] s
}func (c *MsHttpClient) Do(name string, method string) MsService {s, ok : c.serviceMap[name]if !ok {panic(errors.New(name not exist, please action))}t : reflect.TypeOf(s)v : reflect.ValueOf(s)if t.Kind() ! reflect.Pointer {panic(errors.New(service must be pointer))}tVar : t.Elem()vVar : v.Elem()findIndex : -1for i : 0; i tVar.NumField(); i {field : tVar.Field(i)name : field.Nameif method name {findIndex i}}if findIndex -1 {panic(errors.New(method not exist))}requestPath : tVar.Field(findIndex).Tag.Get(msrpc)if requestPath {panic(errors.New(msrpc tag not exist))}split : strings.Split(requestPath, ,)mt : split[0]path : split[1]co : s.Env()prefix : co.Url()f : func(args map[string]any) ([]byte, error) {if mt GET {return c.Get(prefixpath, args)}if mt POSTForm {return c.PostForm(prefixpath, args)}if mt POSTJson {return c.PostJson(prefixpath, args)}return nil, nil}value : reflect.ValueOf(f)vVar.Field(findIndex).Set(value)return s
}
goods:
package serviceimport (github.com/mszlu521/msgo/rpc
)type Goods struct {Id int64 json:idName string json:name
}type GoodsService struct {Find func(args map[string]any) ([]byte, error) msrpc:GET,/goods/find
}func (r *GoodsService) Env() rpc.Config {c : rpc.Config{Host: localhost,Port: 9002,Protocol: rpc.HTTP,}return c
}
package mainimport (encoding/jsonfmtgithub.com/mszlu521/msgogithub.com/mszlu521/msgo/rpcgithub.com/mszlu521/ordercenter/modelgithub.com/mszlu521/ordercenter/servicenet/http
)func main() {engine : msgo.Default()client : rpc.NewHttpClient()g : engine.Group(order)goodsService : service.GoodsService{}client.Use(goodsService, goodsService)g.Get(/find, func(ctx *msgo.Context) {//查询商品v : model.Result{}bytes, err : client.Do(goodsService, Find).(*service.GoodsService).Find(nil)if err ! nil {ctx.Logger.Error(err)}fmt.Println(string(bytes))json.Unmarshal(bytes, v)ctx.JSON(http.StatusOK, v)})engine.Run(:9003)
} 通过上述改造我们可以比较轻易的使用框架来实现http方式的rpc调用 记住框架的目的是易用但同时需要遵守规则所以定义规则也是框架的一部分
4.3 http2grpc方式
有关grpc的使用可以先去看教程教程地址
go get google.golang.org/grpcprotoc --go_out./ --go-grpc_out./ .\api\goods.protogoodscenter服务端
syntax proto3;//import google/protobuf/any.proto;option go_package/api;package api;service GoodsApi {rpc Find(GoodsRequest) returns (GoodsResponse);
}message GoodsRequest {}message GoodsResponse {int64 Code 1;string Msg 2;Goods Data 3;
}message Goods {int64 Id 1;string Name 2;
}package serviceimport (contextgithub.com/mszlu521/goodscenter/api
)type GoodsApiService struct {
}func (GoodsApiService) Find(context.Context, *api.GoodsRequest) (*api.GoodsResponse, error) {goods : api.Goods{Id: 1000, Name: 商品中心9002商品,grpc提供}res : api.GoodsResponse{Code: 200,Msg: success,Data: goods,}return res, nil
}
func (GoodsApiService) mustEmbedUnimplementedGoodsApiServer() {}
grpc服务端
listen, _ : net.Listen(tcp, :9111)server : grpc.NewServer()api.RegisterGoodsApiServer(server, api.GoodsApiService{})err : server.Serve(listen)log.Println(err)grpc客户端:
g.Get(/findGrpc, func(ctx *msgo.Context) {//查询商品var serviceHost 127.0.0.1:9111conn, err : grpc.Dial(serviceHost, grpc.WithTransportCredentials(insecure.NewCredentials()))if err ! nil {fmt.Println(err)}defer conn.Close()client : api.NewGoodsApiClient(conn)rsp, err : client.Find(context.TODO(), api.GoodsRequest{})if err ! nil {fmt.Println(err)}ctx.JSON(http.StatusOK, rsp)})4.3.1 形成框架工具
服务端
package rpcimport (google.golang.org/grpcnet
)type MsGrpcServer struct {listen net.ListenergrpcServer *grpc.Serverregisters []func(grpcServer *grpc.Server)ops []grpc.ServerOption
}func NewGrpcServer(address string, ops ...MsGrpcOption) (*MsGrpcServer, error) {listen, err : net.Listen(tcp, address)if err ! nil {return nil, err}ms : MsGrpcServer{listen: listen,}for _, op : range ops {op.Apply(ms)}s : grpc.NewServer(ms.ops...)ms.grpcServer sreturn ms, nil
}func (s *MsGrpcServer) Run() error {for _, register : range s.registers {register(s.grpcServer)}return s.grpcServer.Serve(s.listen)
}func (s *MsGrpcServer) Register(register func(grpServer *grpc.Server)) {s.registers append(s.registers, register)
}type MsGrpcOption interface {Apply(s *MsGrpcServer)
}type DefaultGrpcOption struct {f func(s *MsGrpcServer)
}func (d DefaultGrpcOption) Apply(s *MsGrpcServer) {d.f(s)
}func WithGrpcOptions(options ...grpc.ServerOption) MsGrpcOption {return DefaultGrpcOption{f: func(s *MsGrpcServer) {s.ops append(s.ops, options...)}}
} grpcServer, _ : rpc.NewGrpcServer(:9111)grpcServer.Register(func(grpServer *grpc.Server) {api.RegisterGoodsApiServer(grpServer, api.GoodsApiService{})})err : grpcServer.Run()type MsGrpcClient struct {Conn *grpc.ClientConn
}func NewGrpcClient(config *MsGrpcClientConfig) (*MsGrpcClient, error) {var ctx context.Background()var dialOptions config.dialOptionsif config.Block {//阻塞if config.DialTimeout time.Duration(0) {var cancel context.CancelFuncctx, cancel context.WithTimeout(ctx, config.DialTimeout)defer cancel()}dialOptions append(dialOptions, grpc.WithBlock())}if config.KeepAlive ! nil {dialOptions append(dialOptions, grpc.WithKeepaliveParams(*config.KeepAlive))}conn, err : grpc.DialContext(ctx, config.Address, dialOptions...)if err ! nil {return nil, err}return MsGrpcClient{Conn: conn,}, nil
}type MsGrpcClientConfig struct {Address stringBlock boolDialTimeout time.DurationReadTimeout time.DurationDirect boolKeepAlive *keepalive.ClientParametersdialOptions []grpc.DialOption
}func DefaultGrpcClientConfig() *MsGrpcClientConfig {return MsGrpcClientConfig{dialOptions: []grpc.DialOption{grpc.WithTransportCredentials(insecure.NewCredentials()),},DialTimeout: time.Second * 3,ReadTimeout: time.Second * 2,Block: true,}
}4.4 TCP方式 tcp方式就需要实现序列化编解码等操作了 序列化协议支持两种
Protobuf 和 go的Gob协议。
4.4.1 server端 type Serializer interface {Serialize(i interface{}) ([]byte, error)Deserialize(data []byte, i interface{}) error
}
type GobSerializer struct{}func (c GobSerializer) Serialize(data any) ([]byte, error) {var buffer bytes.Bufferencoder : gob.NewEncoder(buffer)if err : encoder.Encode(data); err ! nil {return nil, err}return buffer.Bytes(), nil
}func (c GobSerializer) Deserialize(data []byte, target any) error {buffer : bytes.NewBuffer(data)decoder : gob.NewDecoder(buffer)return decoder.Decode(target)
}type MsRpcMessage struct {//头Header *Header//消息体Data any
}const mn byte 0x1d
const version 0x01type CompressType byteconst (Gzip CompressType iota
)type SerializeType byteconst (Gob SerializeType iotaProtoBuff
)type MessageType byteconst (msgRequest MessageType iotamsgResponsemsgPingmsgPong
)type Header struct {MagicNumber byteVersion byteFullLength int32MessageType MessageTypeCompressType CompressTypeSerializeType SerializeTypeRequestId int64
}type MsRpcRequest struct {RequestId int64ServiceName stringMethodName stringArgs []any
}type MsRpcResponse struct {RequestId int64Code int16Msg stringCompressType CompressTypeSerializeType SerializeTypeData any
}type MsRpcServer interface {Register(name string, service interface{})Run()Stop()
}type MsTcpServer struct {listener net.ListenerHost stringPort intNetwork stringserviceMap map[string]interface{}
}type MsTcpConn struct {s *MsTcpServerconn net.ConnrspChan chan *MsRpcResponse
}func (c *MsTcpConn) writeHandle() {ctx : context.Background()_, cancel : context.WithTimeout(ctx, time.Duration(3)*time.Second)defer cancel()select {case rsp : -c.rspChan://编码数据err : c.Send(c.conn, rsp)if err ! nil {log.Println(err)}returncase -ctx.Done():log.Println(超时了)return}
}func (c *MsTcpConn) Send(conn net.Conn, rsp *MsRpcResponse) error {headers : make([]byte, 17)//magic numberheaders[0] mn//versionheaders[1] version//full length//消息类型headers[6] byte(msgResponse)//压缩类型headers[7] byte(rsp.CompressType)//序列化headers[8] byte(rsp.SerializeType)//请求idbinary.BigEndian.PutUint64(headers[9:], uint64(rsp.RequestId))serializer, err : loadSerialize(rsp.SerializeType)if err ! nil {return err}body, err : serializer.Serialize(rsp)if err ! nil {return err}body, err compress(body, rsp.CompressType)if err ! nil {return err}fullLen : 17 len(body)binary.BigEndian.PutUint32(headers[2:6], uint32(fullLen))_, err conn.Write(headers[:])if err ! nil {return err}err binary.Write(c.conn, binary.BigEndian, body[:])if err ! nil {return err}log.Println(发送数据成功)return nil
}func NewTcpServer(host string, port int) *MsTcpServer {return MsTcpServer{Host: host,Port: port,Network: tcp,}
}
func (s *MsTcpServer) Register(name string, service interface{}) {if s.serviceMap nil {s.serviceMap make(map[string]interface{})}v : reflect.ValueOf(service)if v.Kind() ! reflect.Pointer {panic(errors.New(service not pointer))}s.serviceMap[name] service
}
func (s *MsTcpServer) Run() {addr : fmt.Sprintf(%s:%d, s.Host, s.Port)listen, err : net.Listen(s.Network, addr)if err ! nil {panic(err)}s.listener listenfor {conn, err : s.listener.Accept()if err ! nil {log.Println(err)continue}msConn : MsTcpConn{conn: conn, rspChan: make(chan *MsRpcResponse, 1), s: s}go s.readHandle(msConn)go msConn.writeHandle()}
}func (s *MsTcpServer) readHandle(msConn *MsTcpConn) {defer func() {if err : recover(); err ! nil {log.Println(err)msConn.conn.Close()}}()msg : s.decodeFrame(msConn.conn)if msg nil {msConn.rspChan - nilreturn}//根据请求if msg.Header.MessageType msgRequest {req : msg.Data.(*MsRpcRequest)//查找注册的服务匹配后进行调用调用完发送到一个channel当中service, ok : s.serviceMap[req.ServiceName]rsp : MsRpcResponse{RequestId: req.RequestId, CompressType: msg.Header.CompressType, SerializeType: msg.Header.SerializeType}if !ok {rsp.Code 500rsp.Msg no service foundmsConn.rspChan - rspreturn}v : reflect.ValueOf(service)reflectMethod : v.MethodByName(req.MethodName)args : make([]reflect.Value, len(req.Args))for i : range req.Args {args[i] reflect.ValueOf(req.Args[i])}result : reflectMethod.Call(args)if len(result) 0 {//无返回结果rsp.Code 200msConn.rspChan - rspreturn}resArgs : make([]interface{}, len(result))for i : 0; i len(result); i {resArgs[i] result[i].Interface()}var err errorif _, ok : result[len(result)-1].Interface().(error); ok {err result[len(result)-1].Interface().(error)}if err ! nil {rsp.Code 500rsp.Msg err.Error()}rsp.Code 200rsp.Data resArgs[0]msConn.rspChan - rsplog.Println(接收数据成功)return}
}func (s *MsTcpServer) Close() {if s.listener ! nil {s.listener.Close()}
}func (*MsTcpServer) decodeFrame(conn net.Conn) *MsRpcMessage {//读取数据 先读取header部分//1141118 17字节headers : make([]byte, 17)_, err : io.ReadFull(conn, headers)if err ! nil {log.Println(err)return nil}//magic numbermagicNumber : headers[0]if magicNumber ! mn {log.Println(magic number not valid : , magicNumber)return nil}//versionversion : headers[1]//fullLength : headers[2:6]//mt : headers[6]messageType : MessageType(mt)//压缩类型compressType : headers[7]//序列化类型serializeType : headers[8]//请求idrequestId : headers[9:]//将body解析出来包装成request 根据请求内容查找对应的服务完成调用//网络调用 大端fl : int32(binary.BigEndian.Uint32(fullLength))bodyLen : fl - 17body : make([]byte, bodyLen)_, err io.ReadFull(conn, body)log.Println(读完了)if err ! nil {log.Println(err)return nil}//先解压body, err unCompress(body, CompressType(compressType))if err ! nil {log.Println(err)return nil}//反序列化serializer, err : loadSerialize(SerializeType(serializeType))if err ! nil {log.Println(err)return nil}header : Header{}header.MagicNumber magicNumberheader.FullLength flheader.CompressType CompressType(compressType)header.Version versionheader.SerializeType SerializeType(serializeType)header.RequestId int64(binary.BigEndian.Uint64(requestId))header.MessageType messageTypeif messageType msgRequest {msg : MsRpcMessage{}msg.Header headerreq : MsRpcRequest{}err : serializer.Deserialize(body, req)if err ! nil {log.Println(err)return nil}msg.Data reqreturn msg}if messageType msgResponse {msg : MsRpcMessage{}msg.Header headerrsp : MsRpcResponse{}err : serializer.Deserialize(body, rsp)if err ! nil {log.Println(err)return nil}msg.Data rspreturn msg}return nil
}func loadSerialize(serializeType SerializeType) (Serializer, error) {switch serializeType {case Gob://gobs : GobSerializer{}return s, nil}return nil, errors.New(no serializeType)
}func compress(body []byte, compressType CompressType) ([]byte, error) {switch compressType {case Gzip://return body, nil//gzip//创建一个新的 byte 输出流var buf bytes.Bufferw : gzip.NewWriter(buf)_, err : w.Write(body)if err ! nil {return nil, err}if err : w.Close(); err ! nil {return nil, err}return buf.Bytes(), nil}return nil, errors.New(no compressType)
}func unCompress(body []byte, compressType CompressType) ([]byte, error) {switch compressType {case Gzip://return body, nil//gzipreader, err : gzip.NewReader(bytes.NewReader(body))defer reader.Close()if err ! nil {return nil, err}buf : new(bytes.Buffer)// 从 Reader 中读取出数据if _, err : buf.ReadFrom(reader); err ! nil {return nil, err}return buf.Bytes(), nil}return nil, errors.New(no compressType)
}tcpServer : rpc.NewTcpServer(localhost, 9112)gob.Register(model.Result{})gob.Register(model.Goods{})tcpServer.Register(goods, service.GoodsRpcService{})go tcpServer.Run()go engine.Run(:9002)quit : make(chan os.Signal)signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM, syscall.SIGHUP, syscall.SIGQUIT)-quittcpServer.Close()package serviceimport (github.com/mszlu521/goodscenter/model
)type GoodsRpcService struct {
}func (*GoodsRpcService) Find(id int64) *model.Result {goods : model.Goods{Id: 1000, Name: 商品中心9002商品}return model.Result{Code: 200, Msg: success, Data: goods}
}
4.4.2 client端 type MsRpcClient interface {Connect() errorInvoke(context context.Context, serviceName string, methodName string, args []any) (any, error)Close() error
}type MsTcpClient struct {conn net.Connoption TcpClientOption
}type TcpClientOption struct {Retries intConnectionTimeout time.DurationSerializeType SerializeTypeCompressType CompressTypeHost stringPort int
}var DefaultOption TcpClientOption{Host: 127.0.0.1,Port: 9112,Retries: 3,ConnectionTimeout: 5 * time.Second,SerializeType: Gob,CompressType: Gzip,
}func NewTcpClient(option TcpClientOption) *MsTcpClient {return MsTcpClient{option: option}
}func (c *MsTcpClient) Connect() error {addr : fmt.Sprintf(%s:%d, c.option.Host, c.option.Port)conn, err : net.DialTimeout(tcp, addr, c.option.ConnectionTimeout)if err ! nil {return err}c.conn connreturn nil
}var reqId int64func (c *MsTcpClient) Invoke(ctx context.Context, serviceName string, methodName string, args []any) (any, error) {var cancel context.CancelFuncctx, cancel context.WithTimeout(ctx, c.option.ConnectionTimeout)defer cancel()req : MsRpcRequest{}req.RequestId atomic.AddInt64(reqId, 1)req.ServiceName serviceNamereq.MethodName methodNamereq.args argsheaders : make([]byte, 17)//magic numberheaders[0] mn//versionheaders[1] version//full length//消息类型headers[6] byte(msgRequest)//压缩类型headers[7] byte(c.option.CompressType)//序列化headers[8] byte(c.option.SerializeType)//请求idbinary.BigEndian.PutUint64(headers[9:], uint64(req.RequestId))serializer, err : loadSerialize(c.option.SerializeType)if err ! nil {return nil, err}body, err : serializer.Serialize(req)if err ! nil {return nil, err}body, err compress(body, c.option.CompressType)if err ! nil {return nil, err}fullLen : 17 len(body)binary.BigEndian.PutUint32(headers[2:6], uint32(fullLen))_, err c.conn.Write(headers[:])if err ! nil {return nil, err}err binary.Write(c.conn, binary.BigEndian, body[:])if err ! nil {return nil, err}rspChan : make(chan *MsRpcResponse)go c.readHandle(rspChan)rsp : -rspChanreturn rsp, nil
}func (c *MsTcpClient) Close() error {if c.conn ! nil {return c.conn.Close()}return nil
}func (c *MsTcpClient) readHandle(rspChan chan *MsRpcResponse) {defer func() {if err : recover(); err ! nil {log.Println(err)c.conn.Close()}}()for {msg : c.decodeFrame(c.conn)if msg nil {log.Println(未解析出任何数据)rspChan - nilreturn}//根据请求if msg.Header.MessageType msgResponse {rsp : msg.Data.(*MsRpcResponse)rspChan - rspreturn}}
}func (*MsTcpClient) decodeFrame(conn net.Conn) *MsRpcMessage {//读取数据 先读取header部分//1141118 17字节headers : make([]byte, 17)_, err : io.ReadFull(conn, headers)if err ! nil {log.Println(err)return nil}//magic numbermagicNumber : headers[0]if magicNumber ! mn {log.Println(magic number not valid : , magicNumber)return nil}//versionversion : headers[1]//fullLength : headers[2:6]//mt : headers[6]messageType : MessageType(mt)//压缩类型compressType : headers[7]//序列化类型serializeType : headers[8]//请求idrequestId : headers[9:]//将body解析出来包装成request 根据请求内容查找对应的服务完成调用//网络调用 大端fl : int32(binary.BigEndian.Uint32(fullLength))bodyLen : fl - 17body : make([]byte, bodyLen)_, err io.ReadFull(conn, body)log.Println(读完了)if err ! nil {log.Println(err)return nil}//先解压body, err unCompress(body, CompressType(compressType))if err ! nil {log.Println(err)return nil}//反序列化serializer, err : loadSerialize(SerializeType(serializeType))if err ! nil {log.Println(err)return nil}header : Header{}header.MagicNumber magicNumberheader.FullLength flheader.CompressType CompressType(compressType)header.Version versionheader.SerializeType SerializeType(serializeType)header.RequestId int64(binary.BigEndian.Uint64(requestId))header.MessageType messageTypeif messageType msgRequest {msg : MsRpcMessage{}msg.Header headerreq : MsRpcRequest{}err : serializer.Deserialize(body, req)if err ! nil {log.Println(err)return nil}msg.Data reqreturn msg}if messageType msgResponse {msg : MsRpcMessage{}msg.Header headerrsp : MsRpcResponse{}err : serializer.Deserialize(body, rsp)if err ! nil {log.Println(err)return nil}msg.Data rspreturn msg}return nil
}type MsTcpClientProxy struct {client *MsTcpClientoption TcpClientOption
}func NewMsTcpClientProxy(option TcpClientOption) *MsTcpClientProxy {return MsTcpClientProxy{option: option}
}func (p *MsTcpClientProxy) Call(ctx context.Context, serviceName string, methodName string, args []any) (any, error) {client : NewTcpClient(p.option)p.client clienterr : client.Connect()if err ! nil {return nil, err}for i : 0; i p.option.Retries; i {result, err : client.Invoke(ctx, serviceName, methodName, args)if err ! nil {if i p.option.Retries-1 {log.Println(errors.New(already retry all time))client.Close()return nil, err}continue}client.Close()return result, nil}return nil, errors.New(retry time is 0)
}
g.Get(/findTcp, func(ctx *msgo.Context) {//查询商品gob.Register(model.Result{})gob.Register(model.Goods{})args : make([]any, 1)args[0] 1result, err : proxy.Call(context.Background(), goods, Find, args)if err ! nil {panic(err)}ctx.JSON(http.StatusOK, result)})4.4.3 protobuf序列化支持 type ProtobufSerializer struct{}func (c ProtobufSerializer) Serialize(data any) ([]byte, error) {marshal, err : proto.Marshal(data.(proto.Message))if err ! nil {return nil, err}return marshal, nil
}func (c ProtobufSerializer) Deserialize(data []byte, target any) error {message : target.(proto.Message)return proto.Unmarshal(data, message)
}protoc --go_out./ --go-grpc_out./ .\rpc\tcp.proto syntax proto3;import google/protobuf/struct.proto;option go_package/rpc;package rpc;message Request {int64 RequestId 1;string ServiceName 2;string MethodName 3;repeated google.protobuf.Value Args 4;
}message Response {int64 RequestId 1;int32 Code 2;string Msg 3;int32 CompressType 4;int32 SerializeType 5;google.protobuf.Value Data 6;
}package rpcimport (bytescompress/gzipcontextencoding/binaryencoding/gobencoding/jsonerrorsfmtgoogle.golang.org/protobuf/protogoogle.golang.org/protobuf/types/known/structpbiolognetreflectsync/atomictime
)type Serializer interface {Serialize(i interface{}) ([]byte, error)Deserialize(data []byte, i interface{}) error
}
type GobSerializer struct{}func (c GobSerializer) Serialize(data any) ([]byte, error) {var buffer bytes.Bufferencoder : gob.NewEncoder(buffer)if err : encoder.Encode(data); err ! nil {return nil, err}return buffer.Bytes(), nil
}func (c GobSerializer) Deserialize(data []byte, target any) error {buffer : bytes.NewBuffer(data)decoder : gob.NewDecoder(buffer)return decoder.Decode(target)
}type ProtobufSerializer struct{}func (c ProtobufSerializer) Serialize(data any) ([]byte, error) {marshal, err : proto.Marshal(data.(proto.Message))if err ! nil {return nil, err}return marshal, nil
}func (c ProtobufSerializer) Deserialize(data []byte, target any) error {message : target.(proto.Message)return proto.Unmarshal(data, message)
}type MsRpcMessage struct {//头Header *Header//消息体Data any
}const mn byte 0x1d
const version 0x01type CompressType byteconst (Gzip CompressType iota
)type SerializeType byteconst (Gob SerializeType iotaProtoBuff
)type MessageType byteconst (msgRequest MessageType iotamsgResponsemsgPingmsgPong
)type Header struct {MagicNumber byteVersion byteFullLength int32MessageType MessageTypeCompressType CompressTypeSerializeType SerializeTypeRequestId int64
}type MsRpcRequest struct {RequestId int64ServiceName stringMethodName stringArgs []any
}type MsRpcResponse struct {RequestId int64Code int16Msg stringCompressType CompressTypeSerializeType SerializeTypeData any
}type MsRpcServer interface {Register(name string, service interface{})Run()Stop()
}type MsTcpServer struct {listener net.ListenerHost stringPort intNetwork stringserviceMap map[string]interface{}
}type MsTcpConn struct {s *MsTcpServerconn net.ConnrspChan chan *MsRpcResponse
}func (c *MsTcpConn) writeHandle() {ctx : context.Background()_, cancel : context.WithTimeout(ctx, time.Duration(3)*time.Second)defer cancel()select {case rsp : -c.rspChan://编码数据err : c.Send(c.conn, rsp)if err ! nil {log.Println(err)}returncase -ctx.Done():log.Println(超时了)return}
}func (c *MsTcpConn) Send(conn net.Conn, rsp *MsRpcResponse) error {headers : make([]byte, 17)//magic numberheaders[0] mn//versionheaders[1] version//full length//消息类型headers[6] byte(msgResponse)//压缩类型headers[7] byte(rsp.CompressType)//序列化headers[8] byte(rsp.SerializeType)//请求idbinary.BigEndian.PutUint64(headers[9:], uint64(rsp.RequestId))serializer, err : loadSerialize(SerializeType(rsp.SerializeType))if err ! nil {return err}var body []byteif ProtoBuff rsp.SerializeType {pRsp : Response{}pRsp.SerializeType int32(rsp.SerializeType)pRsp.CompressType int32(rsp.CompressType)pRsp.Code int32(rsp.Code)pRsp.Msg rsp.MsgpRsp.RequestId rsp.RequestId//value, err : structpb.// log.Println(err)m : make(map[string]any)marshal, _ : json.Marshal(rsp.Data)_ json.Unmarshal(marshal, m)value, err : structpb.NewStruct(m)log.Println(err)pRsp.Data structpb.NewStructValue(value)body, err serializer.Serialize(pRsp)} else {body, err serializer.Serialize(rsp)}if err ! nil {return err}body, err compress(body, CompressType(rsp.CompressType))if err ! nil {return err}fullLen : 17 len(body)binary.BigEndian.PutUint32(headers[2:6], uint32(fullLen))_, err conn.Write(headers[:])if err ! nil {return err}err binary.Write(c.conn, binary.BigEndian, body[:])if err ! nil {return err}log.Println(发送数据成功)return nil
}func NewTcpServer(host string, port int) *MsTcpServer {return MsTcpServer{Host: host,Port: port,Network: tcp,}
}
func (s *MsTcpServer) Register(name string, service interface{}) {if s.serviceMap nil {s.serviceMap make(map[string]interface{})}v : reflect.ValueOf(service)if v.Kind() ! reflect.Pointer {panic(errors.New(service not pointer))}s.serviceMap[name] service
}
func (s *MsTcpServer) Run() {addr : fmt.Sprintf(%s:%d, s.Host, s.Port)listen, err : net.Listen(s.Network, addr)if err ! nil {panic(err)}s.listener listenfor {conn, err : s.listener.Accept()if err ! nil {log.Println(err)continue}msConn : MsTcpConn{conn: conn, rspChan: make(chan *MsRpcResponse, 1), s: s}go s.readHandle(msConn)go msConn.writeHandle()}
}func (s *MsTcpServer) readHandle(msConn *MsTcpConn) {defer func() {if err : recover(); err ! nil {log.Println(err)msConn.conn.Close()}}()msg : s.decodeFrame(msConn.conn)if msg nil {msConn.rspChan - nilreturn}//根据请求if msg.Header.MessageType msgRequest {req : msg.Data.(*Request)//查找注册的服务匹配后进行调用调用完发送到一个channel当中service, ok : s.serviceMap[req.ServiceName]rsp : MsRpcResponse{RequestId: req.RequestId, CompressType: msg.Header.CompressType, SerializeType: msg.Header.SerializeType}if !ok {rsp.Code 500rsp.Msg no service foundmsConn.rspChan - rspreturn}v : reflect.ValueOf(service)reflectMethod : v.MethodByName(req.MethodName)args : make([]reflect.Value, len(req.Args))for i : range req.Args {of : reflect.ValueOf(req.Args[i].AsInterface())of of.Convert(reflectMethod.Type().In(i))args[i] of}result : reflectMethod.Call(args)if len(result) 0 {//无返回结果rsp.Code 200msConn.rspChan - rspreturn}resArgs : make([]interface{}, len(result))for i : 0; i len(result); i {resArgs[i] result[i].Interface()}var err errorif _, ok : result[len(result)-1].Interface().(error); ok {err result[len(result)-1].Interface().(error)}if err ! nil {rsp.Code 500rsp.Msg err.Error()}rsp.Code 200rsp.Data resArgs[0]msConn.rspChan - rsplog.Println(接收数据成功)return}
}func (s *MsTcpServer) Close() {if s.listener ! nil {s.listener.Close()}
}func (*MsTcpServer) decodeFrame(conn net.Conn) *MsRpcMessage {//读取数据 先读取header部分//1141118 17字节headers : make([]byte, 17)_, err : io.ReadFull(conn, headers)if err ! nil {log.Println(err)return nil}//magic numbermagicNumber : headers[0]if magicNumber ! mn {log.Println(magic number not valid : , magicNumber)return nil}//versionversion : headers[1]//fullLength : headers[2:6]//mt : headers[6]messageType : MessageType(mt)//压缩类型compressType : headers[7]//序列化类型serializeType : headers[8]//请求idrequestId : headers[9:]//将body解析出来包装成request 根据请求内容查找对应的服务完成调用//网络调用 大端fl : int32(binary.BigEndian.Uint32(fullLength))bodyLen : fl - 17body : make([]byte, bodyLen)_, err io.ReadFull(conn, body)log.Println(读完了)if err ! nil {log.Println(err)return nil}//先解压body, err unCompress(body, CompressType(compressType))if err ! nil {log.Println(err)return nil}//反序列化serializer, err : loadSerialize(SerializeType(serializeType))if err ! nil {log.Println(err)return nil}header : Header{}header.MagicNumber magicNumberheader.FullLength flheader.CompressType CompressType(compressType)header.Version versionheader.SerializeType SerializeType(serializeType)header.RequestId int64(binary.BigEndian.Uint64(requestId))header.MessageType messageTypeif messageType msgRequest {msg : MsRpcMessage{}msg.Header headerif ProtoBuff SerializeType(serializeType) {req : Request{}err : serializer.Deserialize(body, req)if err ! nil {log.Println(err)return nil}msg.Data req} else {req : MsRpcRequest{}err : serializer.Deserialize(body, req)if err ! nil {log.Println(err)return nil}msg.Data req}return msg}if messageType msgResponse {msg : MsRpcMessage{}msg.Header headerif ProtoBuff SerializeType(serializeType) {rsp : Response{}err : serializer.Deserialize(body, rsp)if err ! nil {log.Println(err)return nil}msg.Data rsp} else {rsp : MsRpcResponse{}err : serializer.Deserialize(body, rsp)if err ! nil {log.Println(err)return nil}msg.Data rsp}return msg}return nil
}func loadSerialize(serializeType SerializeType) (Serializer, error) {switch serializeType {case Gob://gobs : GobSerializer{}return s, nilcase ProtoBuff:s : ProtobufSerializer{}return s, nil}return nil, errors.New(no serializeType)
}func compress(body []byte, compressType CompressType) ([]byte, error) {switch compressType {case Gzip://return body, nil//gzip//创建一个新的 byte 输出流var buf bytes.Bufferw : gzip.NewWriter(buf)_, err : w.Write(body)if err ! nil {return nil, err}if err : w.Close(); err ! nil {return nil, err}return buf.Bytes(), nil}return nil, errors.New(no compressType)
}func unCompress(body []byte, compressType CompressType) ([]byte, error) {switch compressType {case Gzip://return body, nil//gzipreader, err : gzip.NewReader(bytes.NewReader(body))defer reader.Close()if err ! nil {return nil, err}buf : new(bytes.Buffer)// 从 Reader 中读取出数据if _, err : buf.ReadFrom(reader); err ! nil {return nil, err}return buf.Bytes(), nil}return nil, errors.New(no compressType)
}type MsRpcClient interface {Connect() errorInvoke(context context.Context, serviceName string, methodName string, args []any) (any, error)Close() error
}type MsTcpClient struct {conn net.Connoption TcpClientOption
}type TcpClientOption struct {Retries intConnectionTimeout time.DurationSerializeType SerializeTypeCompressType CompressTypeHost stringPort int
}var DefaultOption TcpClientOption{Host: 127.0.0.1,Port: 9112,Retries: 3,ConnectionTimeout: 5 * time.Second,SerializeType: Gob,CompressType: Gzip,
}func NewTcpClient(option TcpClientOption) *MsTcpClient {return MsTcpClient{option: option}
}func (c *MsTcpClient) Connect() error {addr : fmt.Sprintf(%s:%d, c.option.Host, c.option.Port)conn, err : net.DialTimeout(tcp, addr, c.option.ConnectionTimeout)if err ! nil {return err}c.conn connreturn nil
}var reqId int64func (c *MsTcpClient) Invoke(ctx context.Context, serviceName string, methodName string, args []any) (any, error) {var cancel context.CancelFuncctx, cancel context.WithTimeout(ctx, c.option.ConnectionTimeout)defer cancel()req : MsRpcRequest{}req.RequestId atomic.AddInt64(reqId, 1)req.ServiceName serviceNamereq.MethodName methodNamereq.Args argsheaders : make([]byte, 17)//magic numberheaders[0] mn//versionheaders[1] version//full length//消息类型headers[6] byte(msgRequest)//压缩类型headers[7] byte(c.option.CompressType)//序列化headers[8] byte(c.option.SerializeType)//请求idbinary.BigEndian.PutUint64(headers[9:], uint64(req.RequestId))serializer, err : loadSerialize(c.option.SerializeType)if err ! nil {return nil, err}var body []byteif ProtoBuff c.option.SerializeType {pReq : Request{}pReq.RequestId atomic.AddInt64(reqId, 1)pReq.ServiceName serviceNamepReq.MethodName methodNamelist, err : structpb.NewList(args)log.Println(err)pReq.Args list.Valuesbody, err serializer.Serialize(pReq)} else {body, err serializer.Serialize(req)}fmt.Println(body)if err ! nil {return nil, err}log.Println(body)body, err compress(body, c.option.CompressType)if err ! nil {return nil, err}fullLen : 17 len(body)binary.BigEndian.PutUint32(headers[2:6], uint32(fullLen))_, err c.conn.Write(headers[:])if err ! nil {return nil, err}log.Println(body)log.Println(len:, len(body))err binary.Write(c.conn, binary.BigEndian, body[:])if err ! nil {return nil, err}rspChan : make(chan *MsRpcResponse)go c.readHandle(rspChan)rsp : -rspChanreturn rsp, nil
}func (c *MsTcpClient) Close() error {if c.conn ! nil {return c.conn.Close()}return nil
}func (c *MsTcpClient) readHandle(rspChan chan *MsRpcResponse) {defer func() {if err : recover(); err ! nil {log.Println(err)c.conn.Close()}}()for {msg : c.decodeFrame(c.conn)if msg nil {log.Println(未解析出任何数据)rspChan - nilreturn}//根据请求if msg.Header.MessageType msgResponse {if msg.Header.SerializeType ProtoBuff {rsp : msg.Data.(*Response)asInterface : rsp.Data.AsInterface()marshal, _ : json.Marshal(asInterface)rsp1 : MsRpcResponse{}json.Unmarshal(marshal, rsp1)rspChan - rsp1} else {rsp : msg.Data.(*MsRpcResponse)rspChan - rsp}return}}
}func (*MsTcpClient) decodeFrame(conn net.Conn) *MsRpcMessage {//读取数据 先读取header部分//1141118 17字节headers : make([]byte, 17)_, err : io.ReadFull(conn, headers)if err ! nil {log.Println(err)return nil}//magic numbermagicNumber : headers[0]if magicNumber ! mn {log.Println(magic number not valid : , magicNumber)return nil}//versionversion : headers[1]//fullLength : headers[2:6]//mt : headers[6]messageType : MessageType(mt)//压缩类型compressType : headers[7]//序列化类型serializeType : headers[8]//请求idrequestId : headers[9:]//将body解析出来包装成request 根据请求内容查找对应的服务完成调用//网络调用 大端fl : int32(binary.BigEndian.Uint32(fullLength))bodyLen : fl - 17body : make([]byte, bodyLen)_, err io.ReadFull(conn, body)log.Println(读完了)if err ! nil {log.Println(err)return nil}//先解压body, err unCompress(body, CompressType(compressType))if err ! nil {log.Println(err)return nil}//反序列化serializer, err : loadSerialize(SerializeType(serializeType))if err ! nil {log.Println(err)return nil}header : Header{}header.MagicNumber magicNumberheader.FullLength flheader.CompressType CompressType(compressType)header.Version versionheader.SerializeType SerializeType(serializeType)header.RequestId int64(binary.BigEndian.Uint64(requestId))header.MessageType messageTypeif messageType msgRequest {msg : MsRpcMessage{}msg.Header headerif ProtoBuff SerializeType(serializeType) {req : Request{}err : serializer.Deserialize(body, req)if err ! nil {log.Println(err)return nil}msg.Data req} else {req : MsRpcRequest{}err : serializer.Deserialize(body, req)if err ! nil {log.Println(err)return nil}msg.Data req}return msg}if messageType msgResponse {msg : MsRpcMessage{}msg.Header headerif ProtoBuff SerializeType(serializeType) {rsp : Response{}err : serializer.Deserialize(body, rsp)if err ! nil {log.Println(err)return nil}msg.Data rsp} else {rsp : MsRpcResponse{}err : serializer.Deserialize(body, rsp)if err ! nil {log.Println(err)return nil}msg.Data rsp}return msg}return nil
}type MsTcpClientProxy struct {client *MsTcpClientoption TcpClientOption
}func NewMsTcpClientProxy(option TcpClientOption) *MsTcpClientProxy {return MsTcpClientProxy{option: option}
}func (p *MsTcpClientProxy) Call(ctx context.Context, serviceName string, methodName string, args []any) (any, error) {client : NewTcpClient(p.option)p.client clienterr : client.Connect()if err ! nil {return nil, err}for i : 0; i p.option.Retries; i {result, err : client.Invoke(ctx, serviceName, methodName, args)if err ! nil {if i p.option.Retries-1 {log.Println(errors.New(already retry all time))client.Close()return nil, err}continue}client.Close()return result, nil}return nil, errors.New(retry time is 0)
} 对rpc做了初步实现属于简单实现并没有处理更为复杂的心跳超时连接管理等需要大家自行去完善