做网站大概要多少,重庆网站seo服务,企查查在线查询网页版,洗涤公司建设的意义Golang 原生Rpc Server实现 引言源码解析服务端数据结构服务注册请求处理 客户端数据结构建立连接请求调用 延伸异步调用定制服务名采用TPC协议建立连接自定义编码格式自定义服务器 参考 引言
本文我们来看看golang原生rpc库的实现 , 首先来看一下golang rpc库的demo案例:
服… Golang 原生Rpc Server实现 引言源码解析服务端数据结构服务注册请求处理 客户端数据结构建立连接请求调用 延伸异步调用定制服务名采用TPC协议建立连接自定义编码格式自定义服务器 参考 引言
本文我们来看看golang原生rpc库的实现 , 首先来看一下golang rpc库的demo案例:
服务端和客户端公共代码
type HelloService interface {Hello(request *Request, response *Response) error
}type Request struct {Header map[string]interface{}Params map[string]interface{}
}type Response struct {Header map[string]interface{}Params map[string]interface{}
}服务端代码
type HelloServiceImpl intfunc NewServer() {helloImpl : new(HelloServiceImpl)rpc.RegisterName(helloService, helloImpl)rpc.HandleHTTP()if err : http.ListenAndServe(:1235, nil); err ! nil {log.Fatal(server error: , err)}
}func (s *HelloServiceImpl) Hello(request *common.Request, response *common.Response) error {response.Header request.Headerresponse.Params map[string]interface{}{data: Hello World,}return nil
}客户端代码
func NewClient() *common.Response {client, err : rpc.DialHTTP(tcp, :1234)if err ! nil {log.Fatal(dialing: , err)}res : common.Response{}err client.Call(helloService.Hello, common.Request{map[string]interface{}{client: val1,}, map[string]interface{}{data: hello world,},}, res)if err ! nil {log.Fatal(call: , err)}return res
}golang 原生 rpc 库的使用基本还是分为两步走:
server 端 :
服务注册启动服务
server端对注册的方法有一定的限制方法必须满足签名:
func (t *T) MethodName(argType T1, replyType *T2) error首先方法必须是导出的名字首字母大写其次方法接受两个参数必须是导出的或内置类型。第一个参数表示客户端传递过来的请求参数第二个是需要返回给客户端的响应。第二个参数必须为指针类型需要修改最后方法必须返回一个error类型的值。返回非nil的值表示调用出错。
rpc.HandleHTTP()注册 HTTP 路由。http.ListenAndServe(“:1234”, nil)在端口1234上启动一个 HTTP 服务请求 rpc 方法会交给rpc内部路由处理。这样我们就可以通过客户端调用这两个方法了。 client 端 :
连接服务端调用接口
客户端比服务端稍微简单一点我们使用rpc.DialHTTP(“tcp”, “:1234”)连接到服务端的监听地址返回一个 rpc 的客户端对象。后续就可以调用该对象的Call()方法调用服务端对象的对应方法依次传入方法名需要加上类型限定、参数、一个指针用于接收返回值。 源码解析
对net/http包不熟悉的童鞋可能会觉得奇怪rpc.HandleHTTP()与http.ListenAndServer(“:1234”, nil)是怎么联系起来的我们简单看一下源码
// src/net/rpc/server.go
const (// Defaults used by HandleHTTPDefaultRPCPath /_goRPC_DefaultDebugPath /debug/rpc
)func (server *Server) HandleHTTP(rpcPath, debugPath string) {http.Handle(rpcPath, server)http.Handle(debugPath, debugHTTP{server})
}func HandleHTTP() {DefaultServer.HandleHTTP(DefaultRPCPath, DefaultDebugPath)
}实际上rpc.HandleHTTP()会调用http.Handle()在预定义的路径上/_goRPC_注册处理器。这个处理器最终被添加到net/http包中的默认多路复用器上
// src/net/http/server.go
func Handle(pattern string, handler Handler) {DefaultServeMux.Handle(pattern, handler)
}而http.ListenAndServer()第二个参数传入nil时也是使用默认的多路复用器。 有关golang http server 实现可阅读: Golang 原生Http Server实现 细心的朋友可能发现了除了默认的路径/_goRPC_用来处理 RPC 请求rpc.HandleHTTP()方法还注册了一个调试路径/debug/rpc。我们可以直接在浏览器中访问这个网址需要服务端程序开启。如果服务端在远程需要相应地修改地址localhost:1234直观的查看各个方法的调用情况 当我们访问/_goRPC_路径 , 最终调用到的请求处理器是net/rpc/server包下的ServerHttp函数:
func (server *Server) ServeHTTP(w http.ResponseWriter, req *http.Request) {if req.Method ! CONNECT {w.Header().Set(Content-Type, text/plain; charsetutf-8)w.WriteHeader(http.StatusMethodNotAllowed)io.WriteString(w, 405 must CONNECT\n)return}// 拦截http连接拦截,获取原生的connectionconn, _, err : w.(http.Hijacker).Hijack()...io.WriteString(conn, HTTP/1.0 connected\n\n)// 连接上后续的数据读写都走rpc 协议 , 不走 http 协议了server.ServeConn(conn)
}服务端
数据结构
首先来看一下承载Rpc服务核心状态的Server结构体实现:
// Server represents an RPC Server.
type Server struct {serviceMap sync.Map // map[string]*service. 服务信息映射集合reqLock sync.Mutex // protects freeReq. freeReq *Request respLock sync.Mutex // protects freeRespfreeResp *Response
}其次是包含了注册服务信息的service结构体实现:
type service struct {name string // 服务名rcvr reflect.Value // 服务实现类typ reflect.Type // 服务实现类类型method map[string]*methodType // 当前服务接口注册进来的方法列表
}type methodType struct {sync.Mutex method reflect.MethodArgType reflect.Type ReplyType reflect.TypenumCalls uint
}下面是golang rpc通信使用到的请求和响应对象结构 , 请求和响应对象都会采用对象池进行复用所以都有next属性:
// Request is a header written before every RPC call. It is used internally
// but documented here as an aid to debugging, such as when analyzing
// network traffic.
type Request struct {ServiceMethod string // format: Service.MethodSeq uint64 // sequence number chosen by clientnext *Request // for free list in Server
}// Response is a header written before every RPC return. It is used internally
// but documented here as an aid to debugging, such as when analyzing
// network traffic.
type Response struct {ServiceMethod string // echoes that of the RequestSeq uint64 // echoes that of the requestError string // error, if any.next *Response // for free list in Server
}服务注册
通过调用RegisterName函数我们可以向rpc server的服务映射集合中保存当前服务信息:
// 服务名 , 服务实现类
func RegisterName(name string, rcvr any) error {return DefaultServer.RegisterName(name, rcvr)
}func (server *Server) RegisterName(name string, rcvr any) error {return server.register(rcvr, name, true)
}func (server *Server) register(rcvr any, name string, useName bool) error {// 创建一个新的服务信息类s : new(service)// 反射获取当前服务实现类的类型和值s.typ reflect.TypeOf(rcvr)s.rcvr reflect.ValueOf(rcvr)// 保存服务名sname : name// useName 表示是否使用传入的name作为服务名 , 如果为false , 则采用服务实现类的类型名if !useName {sname reflect.Indirect(s.rcvr).Type().Name()}if sname {s : rpc.Register: no service name for type s.typ.String()log.Print(s)return errors.New(s)}// 如果采用服务实现类的类型名作为服务名要确保服务实现类是导出的对外可见if !useName !token.IsExported(sname) {s : rpc.Register: type sname is not exportedlog.Print(s)return errors.New(s)}s.name sname// 构建注册服务方法列表信息s.method suitableMethods(s.typ, logRegisterError)if len(s.method) 0 {str : // To help the user, see if a pointer receiver would work.method : suitableMethods(reflect.PointerTo(s.typ), false)if len(method) ! 0 {str rpc.Register: type sname has no exported methods of suitable type (hint: pass a pointer to value of that type)} else {str rpc.Register: type sname has no exported methods of suitable type}log.Print(str)return errors.New(str)}// 判断服务名是否重复if _, dup : server.serviceMap.LoadOrStore(sname, s); dup {return errors.New(rpc: service already defined: sname)}return nil
}suitableMethods方法用于遍历当前服务实现类所有导出方法并筛选出符合RPC调用格式的方法列表
func suitableMethods(typ reflect.Type, logErr bool) map[string]*methodType {methods : make(map[string]*methodType)// 遍历当前服务实现类的所有方法for m : 0; m typ.NumMethod(); m {// 定位方法元数据对象method : typ.Method(m)// 获取方法类型和方法名mtype : method.Typemname : method.Name// 跳过未导出的方法if !method.IsExported() {continue}// Method needs three ins: receiver, *args, *reply.// 方法参数必须有两个第一个用于作为请求参数第二个用于接收请求结果if mtype.NumIn() ! 3 {if logErr {log.Printf(rpc.Register: method %q has %d input parameters; needs exactly three\n, mname, mtype.NumIn())}continue}// First arg need not be a pointer.// 第一个参数可以不是指针类型argType : mtype.In(1)if !isExportedOrBuiltinType(argType) {if logErr {log.Printf(rpc.Register: argument type of method %q is not exported: %q\n, mname, argType)}continue}// Second arg must be a pointer.// 第二个参数必须是指针类型replyType : mtype.In(2)if replyType.Kind() ! reflect.Pointer {if logErr {log.Printf(rpc.Register: reply type of method %q is not a pointer: %q\n, mname, replyType)}continue}// Reply type must be exported.// 第二个参数类型必须是导出的if !isExportedOrBuiltinType(replyType) {if logErr {log.Printf(rpc.Register: reply type of method %q is not exported: %q\n, mname, replyType)}continue}// 方法必须只有一个返回值同时返回值类型必须是error类型// Method needs one out.if mtype.NumOut() ! 1 {if logErr {log.Printf(rpc.Register: method %q has %d output parameters; needs exactly one\n, mname, mtype.NumOut())}continue}// The return type of the method must be error.if returnType : mtype.Out(0); returnType ! typeOfError {if logErr {log.Printf(rpc.Register: return type of method %q is %q, must be error\n, mname, returnType)}continue}// 构造方法类型信息: 方法元数据本身方法第一个入参类型方法第二个入参类型methods[mname] methodType{method: method, ArgType: argType, ReplyType: replyType}}return methods
}请求处理
本文一开始给出的Demo是借助 Http Server 来 Accept 用户连接当接收到用户连接后会通过Hijack获取到原生连接然后后续该连接上的客户端读写事件都采用gob编码进行通信而非http协议了:
func (server *Server) ServeConn(conn io.ReadWriteCloser) {buf : bufio.NewWriter(conn)// 构建gob编码器srv : gobServerCodec{rwc: conn,dec: gob.NewDecoder(conn),enc: gob.NewEncoder(buf),encBuf: buf,}// 使用gob编码器从连接到读取字节流然后按照golang RPC协议执行反序列化server.ServeCodec(srv)
}ServeCodec 函数会按照gob编码反序列化得到RPC请求头和请求数据然后调用目标最终将结果按gob编码执行序列化写会connection中:
func (server *Server) ServeCodec(codec ServerCodec) {sending : new(sync.Mutex)wg : new(sync.WaitGroup)for {// 解析得到请求数据service, mtype, req, argv, replyv, keepReading, err : server.readRequest(codec)if err ! nil {if debugLog err ! io.EOF {log.Println(rpc:, err)}// 读取完所有请求后,退出循环if !keepReading {break}// send a response if we actually managed to read a header.if req ! nil {server.sendResponse(sending, req, invalidRequest, codec, err.Error())server.freeRequest(req)}continue}wg.Add(1)// 处理请求调用go service.call(server, sending, wg, mtype, req, argv, replyv, codec)}// Weve seen that there are no more requests.// Wait for responses to be sent before closing codec.// 等待所有响应被处理完毕wg.Wait()codec.Close()
}golang rpc 调用发出的请求数据由两部分组成首先是请求头其次是RPC函数入参数的第一个对象同样也是按照这个顺序依次执行反序列化读取:
func (server *Server) readRequest(codec ServerCodec) (service *service, mtype *methodType, req *Request, argv, replyv reflect.Value, keepReading bool, err error) {// 解析请求头service, mtype, req, keepReading, err server.readRequestHeader(codec)if err ! nil {if !keepReading {return}// discard bodycodec.ReadRequestBody(nil)return}// Decode the argument value.argIsValue : false // 如果rpc方法的第一个参数(请求参数)类型是指针,则解引用拿到原始类型// 然后以原始类型分配一块新的内存,返回指向该内存的指针if mtype.ArgType.Kind() reflect.Pointer {argv reflect.New(mtype.ArgType.Elem())} else {argv reflect.New(mtype.ArgType)argIsValue true}// 反序列化得到请求参数的具体值设置到argv指向到的零值结构体中if err codec.ReadRequestBody(argv.Interface()); err ! nil {return}// 如果目标RPC方法的请求入参是值类型则进行解引用if argIsValue {argv argv.Elem()}// 为第二个参数(返回值参数)同样初始化零值replyv reflect.New(mtype.ReplyType.Elem())// 如果返回值参数类型为Map或者Slice则初始化空map或切片switch mtype.ReplyType.Elem().Kind() {case reflect.Map:replyv.Elem().Set(reflect.MakeMap(mtype.ReplyType.Elem()))case reflect.Slice:replyv.Elem().Set(reflect.MakeSlice(mtype.ReplyType.Elem(), 0, 0))}return
}golang rpc 请求头由调用方法信息和请求序列号组成 反序列化后可以拿到服务名和方法名根据方法名去server的服务映射集合中定位具体的方法元数据对象:
func (server *Server) readRequestHeader(codec ServerCodec) (svc *service, mtype *methodType, req *Request, keepReading bool, err error) {// 从请求对象池中获取一个空闲的请求对象req server.getRequest()// 采用gob编码器将请求头部分字节流反序列化为req对象类型err codec.ReadRequestHeader(req)if err ! nil {req nil// 字节流读完了if err io.EOF || err io.ErrUnexpectedEOF {return}err errors.New(rpc: server cannot decode request: err.Error())return}// We read the header successfully. If we see an error now,// we can still recover and move on to the next request.keepReading true// 分割得到服务名和客户端想要调用的方法名 dot : strings.LastIndex(req.ServiceMethod, .)if dot 0 {err errors.New(rpc: service/method request ill-formed: req.ServiceMethod)return}serviceName : req.ServiceMethod[:dot]methodName : req.ServiceMethod[dot1:]// Look up the request.// 根据服务名加载对应的服务信息类svci, ok : server.serviceMap.Load(serviceName)if !ok {err errors.New(rpc: cant find service req.ServiceMethod)return}// 拿到服务信息类后根据方法名定位获取到对应的方法类型svc svci.(*service)mtype svc.method[methodName]if mtype nil {err errors.New(rpc: cant find method req.ServiceMethod)}return
}反序列化拿到请求数据后便可以查询服务映射集合拿到对应的方法信息最后我们便可以借助反射完成方法调用了:
func (s *service) call(server *Server, sending *sync.Mutex, wg *sync.WaitGroup, mtype *methodType, req *Request, argv, replyv reflect.Value, codec ServerCodec) {if wg ! nil {defer wg.Done()}mtype.Lock()// 当前方法调用次数加一mtype.numCallsmtype.Unlock()// 拿到方法句柄function : mtype.method.Func// 传入方法实际调用者即服务实现类方法的第一个和第二个请求参数returnValues : function.Call([]reflect.Value{s.rcvr, argv, replyv})// 方法执行完毕后拿到方法返回值 -- 代表errorerrInter : returnValues[0].Interface()errmsg : if errInter ! nil {errmsg errInter.(error).Error()}// 发送响应给客户端server.sendResponse(sending, req, replyv.Interface(), codec, errmsg)// 释放当前请求对象到对象池中server.freeRequest(req)
}本地方法执行完毕后需要组装响应对象然后将响应对象执行gob编码然后发送到连接中:
var invalidRequest struct{}{}func (server *Server) sendResponse(sending *sync.Mutex, req *Request, reply any, codec ServerCodec, errmsg string) {// 从响应池中获取到空闲的响应对象resp : server.getResponse()// Encode the response header// 组装响应对象resp.ServiceMethod req.ServiceMethodif errmsg ! {resp.Error errmsgreply invalidRequest}resp.Seq req.Seq// 将响应对象执行gob编码然后发送到conn中sending.Lock()err : codec.WriteResponse(resp, reply)if debugLog err ! nil {log.Println(rpc: writing response:, err)}sending.Unlock()// 将响应对象返回到对象池中server.freeResponse(resp)
}客户端
数据结构
首先是代表客户端对象的Client结构:
// Client represents an RPC Client.
// There may be multiple outstanding Calls associated
// with a single Client, and a Client may be used by
// multiple goroutines simultaneously.
type Client struct {codec ClientCodec // 请求数据编解码器默认是gob协议reqMutex sync.Mutex // protects followingrequest Request // 此处请求对象结构复用了/rpc/server包下的请求对象结构mutex sync.Mutex // protects followingseq uint64. // 请求序列号pending map[uint64]*Call // 已经发出但还未回复的rpc调用closing bool // user has called Closeshutdown bool // server has told us to stop
}Call 结构体承载了RPC远程调用的上下文信息
// Call represents an active RPC.
type Call struct {ServiceMethod string // The name of the service and method to call.Args any // The argument to the function (*struct).Reply any // The reply from the function (*struct).Error error // After completion, the error status.Done chan *Call // Receives *Call when Go is complete.
}建立连接
当服务端采用HTTP协议来接收客户端连接时客户端就必须通过调用DialHttp来与服务端建立连接:
func DialHTTP(network, address string) (*Client, error) {// 使用默认的RPC建立连接的请求路径: /_goRPC_ return DialHTTPPath(network, address, DefaultRPCPath)
}func DialHTTPPath(network, address, path string) (*Client, error) {// 建立TCP连接conn, err : net.Dial(network, address)if err ! nil {return nil, err}// 发出connect请求io.WriteString(conn, CONNECT path HTTP/1.0\n\n)// Require successful HTTP response// before switching to RPC protocol.// 再转换为采用RPC协议通信时需要确保此处的响应是成功的resp, err : http.ReadResponse(bufio.NewReader(conn), http.Request{Method: CONNECT})if err nil resp.Status connected {return NewClient(conn), nil}if err nil {err errors.New(unexpected HTTP response: resp.Status)}conn.Close()return nil, net.OpError{Op: dial-http,Net: network address,Addr: nil,Err: err,}
}当成功连接服务端时会创建一个新的客户端对象并返回:
func NewClient(conn io.ReadWriteCloser) *Client {encBuf : bufio.NewWriter(conn)// client端默认采用gob编码client : gobClientCodec{conn, gob.NewDecoder(conn), gob.NewEncoder(encBuf), encBuf}return NewClientWithCodec(client)
}但是在一个新的客户端初始化时会启动一个永不停歇的协程来不断接收并处理来自服务端的响应数据:
func NewClientWithCodec(codec ClientCodec) *Client {client : Client{codec: codec,pending: make(map[uint64]*Call),}// 启动一个永不停歇的协程来不断接收并处理来自服务端的响应数据go client.input()return client
}input 协程采用死循环来不断读取服务端响应并进行处理:
func (client *Client) input() {var err errorvar response Response // rpc/server包下的Response对象// 死循环来不断接收服务端响应,直到解析请求体的过程中出现错误才会退出循环for err nil {response Response{} // 读取响应头err client.codec.ReadResponseHeader(response)if err ! nil {break}// 拿到响应序列号得知该响应是对客户端发出的哪个请求的响应seq : response.Seqclient.mutex.Lock()// 从pending集合中定位对应的call对象call : client.pending[seq]// 从集合中移除该对象delete(client.pending, seq)client.mutex.Unlock()switch {// 如果pending集合中不存在call对象说明可能是重复响应说明存在错误case call nil:// Weve got no pending call. That usually means that// WriteRequest partially failed, and call was already// removed; response is a server telling us about an// error reading request body. We should still attempt// to read error body, but theres no one to give it to.err client.codec.ReadResponseBody(nil)if err ! nil {err errors.New(reading error body: err.Error())}// 响应头中错误信息不为空 case response.Error ! :// Weve got an error response. Give this to the request;// any subsequent requests will get the ReadResponseBody// error if there is one.call.Error ServerError(response.Error)err client.codec.ReadResponseBody(nil)if err ! nil {err errors.New(reading error body: err.Error())}// 通知本次请求结束call.done()// 正常响应 default:// 读取响应结果err client.codec.ReadResponseBody(call.Reply)// 存在错误则记录if err ! nil {call.Error errors.New(reading body err.Error())}// 通知本次请求处理结束call.done()}}// 如果解析请求体的过程中出现错误则退出上面的循环 // Terminate pending calls.client.reqMutex.Lock()client.mutex.Lock()client.shutdown trueclosing : client.closingif err io.EOF {if closing {err ErrShutdown} else {err io.ErrUnexpectedEOF}}// 终止所有已发送还未接收到响应的请求for _, call : range client.pending {call.Error errcall.done()}client.mutex.Unlock()client.reqMutex.Unlock()if debugLog err ! io.EOF !closing {log.Println(rpc: client protocol error:, err)}
}请求调用
rpc client端通过调用Call方法来完成远程过程调用:
func (client *Client) Call(serviceMethod string, args any, reply any) error {// 同步阻塞直到请求响应接收到为止Done信号在input协程中被设置或者请求发送过程中出现错误时被设置call : -client.Go(serviceMethod, args, reply, make(chan *Call, 1)).Donereturn call.Error
}func (client *Client) Go(serviceMethod string, args any, reply any, done chan *Call) *Call {// 构建请求调用对象call : new(Call)call.ServiceMethod serviceMethodcall.Args argscall.Reply replyif done nil {done make(chan *Call, 10) // buffered.} else {// If caller passes done ! nil, it must arrange that// done has enough buffer for the number of simultaneous// RPCs that will be using that channel. If the channel// is totally unbuffered, its best not to run at all.if cap(done) 0 {log.Panic(rpc: done channel is unbuffered)}}call.Done done// 发送请求client.send(call)return call
}实际请求发送会调用client的send方法完成
func (client *Client) send(call *Call) {client.reqMutex.Lock()defer client.reqMutex.Unlock()// Register this call.client.mutex.Lock()if client.shutdown || client.closing {client.mutex.Unlock()call.Error ErrShutdowncall.done()return}// 为当前请求设置请求序列号同时将当前请求调用添加进pending集合seq : client.seqclient.seqclient.pending[seq] callclient.mutex.Unlock()// Encode and send the request.// 构建请求对象client.request.Seq seqclient.request.ServiceMethod call.ServiceMethod// 发送请求 --- 此处发送完毕请求后就直接返回了不会等待响应结果err : client.codec.WriteRequest(client.request, call.Args)if err ! nil {client.mutex.Lock()call client.pending[seq]delete(client.pending, seq)client.mutex.Unlock()if call ! nil {call.Error errcall.done()}}
}延伸
异步调用
上文中举的例子客户端实际是同步调用模式首先WriteRequest发送请求方法是异步的但是Call方法会等待直到Done信号有值时才会返回。
改造为异步模式也很简单直接调用Go方法并在合适的时机调用监听Done通道是否有值即可:
func NewClient() *common.Response {client, err : rpc.DialHTTP(tcp, :1234)if err ! nil {log.Fatal(dialing: , err)}res : common.Response{}call : client.Go(helloService.Hello, common.Request{map[string]interface{}{client: val1,}, map[string]interface{}{data: hello world,},}, res, nil)ticker : time.NewTicker(time.Millisecond)defer ticker.Stop()select {case replyCall : -call.Done:if err : replyCall.Error; err ! nil {fmt.Println(rpc error:, err)} else {fmt.Printf(res %v, replyCall)}case t : -ticker.C:fmt.Println(Current time: , t)}return res
}定制服务名
默认情况下rpc.Register()将方法接收者receiver的类型名作为服务名。我们也可以自己设置。这时需要调用RegisterName(name string, rcvr interface{}) error方法我们一开始给出的例子就是采用了后者忘记的可以回看源码。 采用TPC协议建立连接
上面我们都是使用 HTTP 协议来实现 rpc 服务的rpc库也支持直接使用 TCP 协议。首先服务端先调用net.Listen(tcp, :1234)创建一个监听某个 TCP 端口的监听器Accepter然后使用rpc.Accept(l)在此监听器上接受连接并处理
type HelloServiceImpl intfunc NewServer() {helloImpl : new(HelloServiceImpl)l, err : net.Listen(tcp, :1236)if err ! nil {return}rpc.Register(helloImpl)rpc.Accept(l)
}func (s *HelloServiceImpl) Hello(request *common.Request, response *common.Response) error {response.Header request.Headerresponse.Params map[string]interface{}{data: Hello World,}return nil
}此处就相当于建立连接的时候就不采用http的connect请求方式了只要TCP连接建立成功就认为RPC连接建立成功:
func Accept(lis net.Listener) { DefaultServer.Accept(lis) }func (server *Server) Accept(lis net.Listener) {for {conn, err : lis.Accept()if err ! nil {log.Print(rpc.Serve: accept:, err.Error())return}go server.ServeConn(conn)}
}然后客户端调用rpc.Dial()以 TCP 协议连接到服务端
func NewClient() *common.Response {client, err : rpc.Dial(tcp, :1236)if err ! nil {log.Fatal(dialing: , err)}res : common.Response{}call : client.Go(helloService.Hello, common.Request{map[string]interface{}{client: val1,}, map[string]interface{}{data: hello world,},}, res, nil)ticker : time.NewTicker(time.Millisecond)defer ticker.Stop()select {case replyCall : -call.Done:if err : replyCall.Error; err ! nil {fmt.Println(rpc error:, err)} else {fmt.Printf(res %v, replyCall)}case t : -ticker.C:fmt.Println(Current time: , t)}return res
}相比于基于Http协议建立连接的方式此处就直接建立TCP连接就完事了而无需再发送Connect请求:
// Dial connects to an RPC server at the specified network address.
func Dial(network, address string) (*Client, error) {conn, err : net.Dial(network, address)if err ! nil {return nil, err}return NewClient(conn), nil
}自定义编码格式
默认客户端与服务端之间的数据使用gob编码我们可以使用其它的格式来编码。在服务端我们要实现rpc.ServerCodec接口
// src/net/rpc/server.go
type ServerCodec interface {ReadRequestHeader(*Request) errorReadRequestBody(interface{}) errorWriteResponse(*Response, interface{}) errorClose() error
}实际上不用这么麻烦我们查看源码看看gobServerCodec是怎么实现的然后仿造实现一个就行了。下面我实现了一个 JSON 格式的编解码器
type JsonServerCodec struct {rwc io.ReadWriteCloserdec *json.Decoderenc *json.EncoderencBuf *bufio.Writerclosed bool
}func NewJsonServerCodec(conn io.ReadWriteCloser) *JsonServerCodec {buf : bufio.NewWriter(conn)return JsonServerCodec{conn, json.NewDecoder(conn), json.NewEncoder(buf), buf, false}
}func (c *JsonServerCodec) ReadRequestHeader(r *rpc.Request) error {return c.dec.Decode(r)
}func (c *JsonServerCodec) ReadRequestBody(body interface{}) error {return c.dec.Decode(body)
}func (c *JsonServerCodec) WriteResponse(r *rpc.Response, body interface{}) (err error) {if err c.enc.Encode(r); err ! nil {if c.encBuf.Flush() nil {log.Println(rpc: json error encoding response:, err)c.Close()}return}if err c.enc.Encode(body); err ! nil {if c.encBuf.Flush() nil {log.Println(rpc: json error encoding body:, err)c.Close()}return}return c.encBuf.Flush()
}func (c *JsonServerCodec) Close() error {if c.closed {return nil}c.closed truereturn c.rwc.Close()
}server端的for循环中需要创建编解码器JsonServerCodec传给ServeCodec方法:
func NewServer() {helloImpl : new(HelloServiceImpl)l, err : net.Listen(tcp, :1236)if err ! nil {return}rpc.Register(helloImpl)for {conn, err : l.Accept()if err ! nil {return}go rpc.ServeCodec(common.NewJsonServerCodec(conn))}
}同样的客户端要实现rpc.ClientCodec接口也是仿造gobClientCodec的实现
type JsonClientCodec struct {rwc io.ReadWriteCloserdec *json.Decoderenc *json.EncoderencBuf *bufio.Writer
}func NewJsonClientCodec(conn io.ReadWriteCloser) *JsonClientCodec {encBuf : bufio.NewWriter(conn)return JsonClientCodec{conn, json.NewDecoder(conn), json.NewEncoder(encBuf), encBuf}
}func (c *JsonClientCodec) WriteRequest(r *rpc.Request, body interface{}) (err error) {if err c.enc.Encode(r); err ! nil {return}if err c.enc.Encode(body); err ! nil {return}return c.encBuf.Flush()
}func (c *JsonClientCodec) ReadResponseHeader(r *rpc.Response) error {return c.dec.Decode(r)
}func (c *JsonClientCodec) ReadResponseBody(body interface{}) error {return c.dec.Decode(body)
}func (c *JsonClientCodec) Close() error {return c.rwc.Close()
}要使用NewClientWithCodec以指定的编解码器创建客户端:
func NewClient() *common.Response {conn, err : net.Dial(tcp, :1234)if err ! nil {return nil}client : rpc.NewClientWithCodec(common.NewJsonClientCodec(conn))res : common.Response{}err client.Call(helloService.Hello, common.Request{map[string]interface{}{client: val1,}, map[string]interface{}{data: hello world,},}, res)return res
}自定义服务器
实际上上面我们调用的方法rpc.Registerrpc.RegisterNamerpc.ServeConnrpc.ServeCodec都是转而去调用默认DefaultServer的相关方法
// src/net/rpc/server.go
var DefaultServer NewServer()func Register(rcvr interface{}) error { return DefaultServer.Register(rcvr) }func RegisterName(name string, rcvr interface{}) error {return DefaultServer.RegisterName(name, rcvr)
}func ServeConn(conn io.ReadWriteCloser) {DefaultServer.ServeConn(conn)
}func ServeCodec(codec ServerCodec) {DefaultServer.ServeCodec(codec)
}但是因为DefaultServer是全局共享的如果有第三方库使用了相关方法并且注册了一些对象的方法我们引用这个第三方库之后就出现两个问题。第一可能与我们注册的方法冲突第二带来额外的安全隐患库中方法直接panic。故而推荐做法是自己NewServer
func main() {arith : new(Arith)server : rpc.NewServer()server.RegisterName(math, arith)server.HandleHTTP(rpc.DefaultRPCPath, rpc.DefaultDebugPath)if err : http.ListenAndServe(:1234, nil); err ! nil {log.Fatal(serve error:, err)}
}这其实是一个套路很多库会提供一个默认的实现直接使用如log、net/http这些库。但是也提供了创建和自定义的方法。一般测试时为了方便可以使用默认实现实践中最好自己创建相应的对象避免干扰和安全问题。 参考
延伸部分主要摘录至: Go 每日一库之 rpc