音乐网站禁止做浅度链接,广西住房和城乡建设厅培训中心网站,华为网站建设目标,网络系统运维1.从实现服务端开始
服务端中肯定会有进行监听的。这里先创建一个空的结构体Server。
其Accept方法是进行监听#xff0c;并与客户端进行连接后, 开启新协程异步去处理ServeConn。
//server.go文件
type Server struct{}func NewServer() *Server {return Server{}
}v…1.从实现服务端开始
服务端中肯定会有进行监听的。这里先创建一个空的结构体Server。
其Accept方法是进行监听并与客户端进行连接后, 开启新协程异步去处理ServeConn。
//server.go文件
type Server struct{}func NewServer() *Server {return Server{}
}var DefaultServer NewServer()func (server *Server) Accept(lis net.Listener) {for {conn, err : lis.Accept()if err ! nil {log.Println(rpc server: accept error:, err)return}// 拿到客户端的连接, 开启新协程异步去处理.go server.ServeConn(conn)}
}func Accept(lis net.Listener) { DefaultServer.Accept(lis) }
如果想启动服务过程是非常简单的传入 listener 即可tcp 协议和 unix 协议都支持。
lis, _ : net.Listen(tcp, localhost:10000)
geerpc.Accept(lis)
监听请求之后就是处理信息。那处理消息中要先了解信息的格式以及信息的编解码方式。
2.消息的格式和编解码方式
消息格式
对于消息(request和response)的定义 我们可以简单定为消息头(header)内容体(body)。
我们定义头部结构体。
//codec.go文件
type Header struct {ServiceMethod string // format Service.MethodSeq uint64 // sequence number chosen by clientError string
}
ServiceMethod 是服务名和方法名通常与 Go 语言中的结构体和方法相映射。Seq 是请求的序号也可以认为是某个请求的 ID用来区分不同的请求。Error 是错误信息客户端置为空服务端如果如果发生错误将错误信息置于 Error 中。
消息的编解码方式
编解码方式就有很多种Go中默认使用的是gob。(gob编码方式为go语言专用的编码方式, 无法跨语言使用。而要是使用json来进行编解码呢所以我们抽象出对消息体进行编解码的接口 Codec抽象出接口是为了实现不同的 Codec 实例来编解码。
编解码Codec实例中就应该有读取头部读取body写回复这三个方法。
//codec.go文件
type Code interface {ReadHeader(*Header) errorReadBody(any) errorWriteResponse(*Header, any) errorClose() error //关闭连接
}
实现gob编码方式
定义GobCodec 结构体一个编解码器需要有的结构
读取/写入的io流此处为socket连接编码器将要编码的数据写入缓冲区等待推送给要写入的socket连接解码器将socket连接中的数据读取到指定的对象中缓冲区编码器需要
//gob.go文件
type GobCodec struct {conn io.ReadWriteCloserbuf *bufio.Writerdec *gob.Decoderenc *gob.Encoder
}//使用与客户端的socket连接初始化编解码器。
//dec: gob.NewDecoder(conn)使得解码时从连接中获取数据
//enc: gob.NewEncoder(buf)编码器需要缓冲区且该缓冲区的底层io流应该为与客户端的连接。
func NewGobCodec(conn io.ReadWriteCloser) Codec {buf : bufio.NewWriter(conn)return GobCodec{conn: conn,buf: buf,dec: gob.NewDecoder(conn),enc: gob.NewEncoder(buf),}
} 接着实现 Close方法和接口的ReadHeader、ReadBody、WriteResponse方法。
ReadHeader和WriteResponse分别是用该实例的解码器进行解码编码器编码回复。
//gob.go文件
func (c *GobCodec) ReadHeader(h *Header) error {return c.dec.Decode(h)
}func (c *GobCodec) ReadBody(body any) error {return c.dec.Decode(body)
}func (c *GobCodec) WriteResponse(h *Header, body any) (err error) {defer func() {c.buf.Flush()if err ! nil {c.Close()}}()if err : c.enc.Encode(h); err ! nil {log.Println(rpc codec: gob error encoding header:, err)return err}if err : c.enc.Encode(body); err ! nil {log.Println(rpc codec: gob error encoding body:, err)return err}return nil
}func (c *GobCodec) Close() error {return c.conn.Close()
}
那接着我们定义字符串来表示哪个是使用gob编解码的。
//codec.go文件
type CodeType stringconst (GobType CodeType application/gobJsonType CodeType application/json // not implemented
)type NewCodecFunc func(io.ReadWriteCloser) Codecvar NewCodeFuncMap map[CodeType]NewCodecFuncfunc init() {NewCodeFuncMap make(map[CodeType]NewCodecFunc)NewCodeFuncMap[GobType] NewGobCodec
} 对外提供NewCodeFuncMap 客户端和服务端可以通过 Codec 的 CodeType得到构造函数从而创建 Codec 实例
判断客户端使用的编解码方式
实现了GobCodec后那目前对于该rpc需要协商的唯一一项内容是消息的编解码方式。我们将这部分信息放到结构体 Option 中承载。
//server.go文件
const MagicNumber 0x3b3f5ctype Option struct {MagicNumber int // MagicNumber marks thiss a geerpc requestCodecType codec.Type // client may choose different Codec to encode body
}var DefaultOption Option{MagicNumber: MagicNumber,CodecType: codec.GobType,
}
一般来说涉及协议协商的这部分信息需要设计固定的字节来传输的。但是为了实现上简单点GeeRPC 客户端固定采用 JSON 编码 Option后续的 header 和 body 的编码方式由 Option 中的 CodeType 指定服务端首先使用 JSON 解码 Option然后通过 Option 的 CodeType 解码剩余的内容。即报文将以这样的形式发送
| Option{MagicNumber: xxx, CodecType: xxx} | Header{ServiceMethod ...} | Body interface{} |
| ------ 固定 JSON 编码 ------ | ------- 编码方式由 CodeType 决定 -------| 但要注意的是在一次连接中Option 固定在报文的最开始位置Header 和 Body 可以有多个即报文可能是这样的。
| Option | Header1 | Body1 | Header2 | Body2 | ...
3.服务端Accpet后的处理
定义好消息格式和编解码方式后就回到Server的Accpet方法中。其开启新协程去处理客户的请求ServeConn方法。
那该方法中肯定就需要先通过客户发送的信息获取到编解码方式即是使用 json.NewDecoder 反序列化得到 Option 实例检查 MagicNumber 和 CodeType 的值是否正确。然后根据 CodeType 得到对应的消息编解码器。
//server.go文件
func (server *Server) ServeConn(conn io.ReadWriteCloser) {defer conn.Close()var opt Optionif err : json.NewDecoder(conn).Decode(opt); err ! nil {log.Println(rpc server: options error: , err)return}if opt.MagicNumber ! MagicNumber {log.Printf(rpc server: invalid magic number %x, opt.MagicNumber)return}//目前只实现了gob编解码f : codec.NewCodeFuncMap[opt.CodecType]if f nil {log.Printf(rpc server: invalid codec type %s, opt.CodecType)return}server.servCode(f(conn))
}
servCode方法
通过server.ServeCodec方法处理请求主要分为解析请求信息(readRequest)和处理请求(handleRequest)两步。
//server.go文件
func (server *Server) servCode(cc codec.Codec) {sending : new(sync.Mutex)wg : new(sync.WaitGroup)for {req, err : server.readRequest(cc) //读取请求if err ! nil {if req nil {break}req.h.Error err.Error()//发送解析请求信息出错的响应信息 invalidRequest struct{}{}server.sendResponse(cc, req.h, invalidRequest, sending)continue}wg.Add(1)go server.handleRequest(cc, req, sending, wg)//处理请求}wg.Wait()cc.Close()
}
建立的连接是长连接轮询读取连接中的数据并异步处理。因此这里使用了 for 无限制地等待请求的到来直到发生错误例如连接被关闭接收到的报文有问题等这里需要讲解几点
handleRequest 使用了协程并发处理请求。wg : new(sync.WaitGroup)用于不再接收请求时等待正在执行的请求处理完成后再关闭连接,即是等server.handleRequest都执行完再退出。处理请求是并发的但是回复请求的报文必须是逐个发送的并发容易导致多个回复报文交织在一起客户端无法解析。对于同一个连接的不同请求响应的发送是异步的所以需要互斥锁来避免对同一个连接的写入冲突sending : new(sync.Mutex)尽力而为只有在 header 解析失败时才终止循环。 可能会有疑惑sending,wg变量为什么都是new出来的不能用栈空间变量吗首先sync.Mutex,sync.WaitGroup都是值类型互斥锁是不允许copy的值传递的话就会进行拷贝会出错。WaitGroup也相似其也是值类型,要是不使用指针传递的话那函数内部调用的wg根本就不是同一个对象会导致死锁的。
sync.Mutex和sync.WaitGroup作为函数参数要想其是同一个对象那就都需要传递其指针才不会进行拷贝。
这里再抽象出一个结构体request,其存储请求的所有信息 readRequest
其主要是先解码header再解码body。
readRequest方法中的cc.ReadBody(req.requestData)其参数是指针是因为gob解码需要是指针。可以试试使用req.requestData去测试运行时有什么错误。
//server.go文件
type request struct {h *codec.Header// argv, replyv reflect.ValuerequestData uint64 //请求的body数据replyData string //返回给用户的data
}func (server *Server) readRequestHeader(cc codec.Codec) (*codec.Header, error) {var h codec.Headerif err : cc.ReadHeader(h); err ! nil {if err ! io.EOF err ! io.ErrUnexpectedEOF {log.Println(rpc server: read header error:, err)}return nil, err}return h, nil
}
func (server *Server) readRequest(cc codec.Codec) (*request, error) {h, err : server.readRequestHeader(cc)req : request{h: h}// TODO: now we dont know the type of request argv//这一章节我们只能处理用户发送过来的uint64类型的数据cc.ReadBody(req.requestData)return req, nil
}handleRequest和sendResponse
sendResponse先加锁之后调用编解码器的WriteResponse方法。
handleRequest就是处理信息跟着调用sendResponse方法发送信息给客户端。
//server.go文件
func (server *Server) sendResponse(cc codec.Codec, h *codec.Header, body any, sending *sync.Mutex) {sending.Lock()defer sending.Unlock()if err : cc.WriteResponse(h, body); err ! nil {log.Println(rpc server: write response error:, err)}
}func (server *Server) handleRequest(cc codec.Codec, req *request, sending *sync.Mutex, wg *sync.WaitGroup) {defer wg.Done()log.Println(handleRequest , req.h, req.requestData)// req.replyv reflect.ValueOf(fmt.Sprintf(geerpc resp %d, req.h.Seq))// server.sendResponse(cc, req.h, req.replyv.Interface(), sending)req.replyData fmt.Sprintf( ok my resp %d, req.h.Seq)server.sendResponse(cc, req.h, req.replyData, sending)
}
目前是还不能判断 body 的类型后序会实现的。
这要说回结构体request中的一些变量原教程的是request中是使用了反射类型变量的但我感觉这一节使用反射可能读起来会有点疑惑难理解。所以这里就使用具体的类型来处理在这一节会好理解点后序会再添加上反射reflect。
4.简单的客户端测试
在这里我们就实现了一个消息的编解码器 GobCodec并且客户端与服务端实现了简单的协议交换(protocol exchange)即允许客户端使用不同的编码方式。同时实现了服务端的雏形建立连接读取、处理并回复客户端的请求。
接下来我们就在 main 函数中看看如何使用刚实现的 RPC。
func startServer(addr chan string) {l, err : net.Listen(tcp, localhost:10000)if err ! nil {log.Fatal(network error:, err)}log.Println(start rpc server on, l.Addr())addr - l.Addr().String()geerpc.Accept(l)
}func main() {addr : make(chan string)go startServer(addr)// in fact, following code is like a simple geerpc clientconn, _ : net.Dial(tcp, -addr)defer conn.Close()// send options_ json.NewEncoder(conn).Encode(geerpc.DefaultOption)cc : codec.NewGobCodec(conn)// send request receive responsefor i : 0; i 3; i {h : codec.Header{ServiceMethod: Foo.Sum,Seq: uint64(i),}cc.WriteResponse(h, h.Seq)cc.ReadHeader(h)var reply stringcc.ReadBody(reply)log.Println(reply:, reply)}
}
在 startServer 中使用了信道 addr确保服务端端口监听成功客户端再发起请求。客户端首先发送 Option 进行协议交换接下来发送消息头 h : codec.Header{}和消息体 geerpc req ${h.Seq}。最后解析服务端的响应 reply并打印出来。 完整代码 https://github.com/liwook/Go-projects/tree/main/geerpc/1-codec