seo关键词排名优化如何,龙岗网站建设网站排名优化,主体负责人和网站负责人,电子商务网站建设的知识点从0到1开发go-tcp框架【3-读写协程分离、引入消息队列、进入连接管理器、引入连接属性】
1 读写协程分离[v0.7] 添加一个Reader和Writer之间通信的channel添加一个Writer goroutineReader由之前直接发送给客户端改为发送给通信channel启动Reader和Writer一起工作 zinx/znet/co…从0到1开发go-tcp框架【3-读写协程分离、引入消息队列、进入连接管理器、引入连接属性】
1 读写协程分离[v0.7] 添加一个Reader和Writer之间通信的channel添加一个Writer goroutineReader由之前直接发送给客户端改为发送给通信channel启动Reader和Writer一起工作 zinx/znet/connection.go
package znetimport (fmtgithub.com/kataras/iris/v12/x/errorsionet
)type Connection struct {Conn *net.TCPConnConnID uint32isClosed boolmsgChannel chan []byte//告知当前的连接已经退出/停止由Reader告知writer退出ExitChan chan boolMsgHandler *MsgHandle
}func NewConnection(conn *net.TCPConn, connID uint32, msgHandle *MsgHandle) *Connection {c : Connection{Conn: conn,ConnID: connID,MsgHandler: msgHandle,isClosed: false,msgChannel: make(chan []byte),ExitChan: make(chan bool, 1),}return c
}func (c *Connection) StartWriter() {fmt.Println([Writer Goroutine is running])defer fmt.Println([conn Writer goroutine exit!], c.RemoteAddr().String())//不断的阻塞等待channel的消息然后将channel中的消息写给客户端for {select {case data : -c.msgChannel://有数据写给客户端if _, err : c.Conn.Write(data); err ! nil {fmt.Println(Send data error , , err)return}case -c.ExitChan://代表reader已经退出此时writer也需要退出return}}
}func (c *Connection) StartReader() {fmt.Println(reader goroutine is running...)defer fmt.Println([Reader goroutine is exit] connID, c.ConnID, remote addr is , c.RemoteAddr().String())defer c.Stop()//读取数据for {//创建一个拆包对象dp : NewDataPack()//读取客户端的msg Head 二进制流 8字节headData : make([]byte, dp.GetHeadLen())if _, err : io.ReadFull(c.GetTCPConnection(), headData); err ! nil {fmt.Println(read msg head err , err)break}//拆包将读取到的headData封装为msgmsg, err : dp.UnPack(headData)if err ! nil {fmt.Println(unpack msg err , err)break}//根据dataLen再次读取Data放在msg.Data中var data []byte//如果数据包中有数据则读取if msg.GetMsgLen() 0 {data make([]byte, msg.GetMsgLen())//将切片data读满if _, err : io.ReadFull(c.GetTCPConnection(), data); err ! nil {fmt.Println(read msg data err , err)break}}msg.SetData(data)//封装请求改为router处理r : Request{conn: c,msg: msg,}go c.MsgHandler.DoMsgHandler(r)}
}//启动连接
func (c *Connection) Start() {fmt.Printf(ConnID %d is Start..., c.ConnID)//开启读、写go c.StartReader()go c.StartWriter()
}//停止连接
func (c *Connection) Stop() {fmt.Println(Connection Stop()...ConnectionID , c.ConnID)if c.isClosed {return}c.isClosed truec.Conn.Close()c.ExitChan - trueclose(c.msgChannel)close(c.ExitChan)
}//获取当前连接的Conn对象
func (c *Connection) GetTCPConnection() *net.TCPConn {return c.Conn
}//获取当前连接模块的id
func (c *Connection) GetConnectionID() uint32 {return c.ConnID
}//获取远程客户端的TCP状态 IPPort
func (c *Connection) RemoteAddr() net.Addr {return c.Conn.RemoteAddr()
}//发送数据
func (c *Connection) SendMsg(msgId uint32, data []byte) error {if c.isClosed {return errors.New(connection closed\n)}//将data进行封包dp : NewDataPack()binaryMsg, err : dp.Pack(NewMessage(msgId, data))if err ! nil {fmt.Println(Pack error msg id, msgId)return errors.New(pack error msg)}//将数据发送给客户端if _, err : c.Conn.Write(binaryMsg); err ! nil {fmt.Println(write msg id , msgId, error , err)return errors.New(conn write err )}return nil
}测试
myDemo/ZinxV0.7/client.go
client0.go
package mainimport (fmtiomyTest/zinx/znetnettime
)/*
模拟客户端
*/
func main() {fmt.Println(client start...)time.Sleep(time.Second * 1)//1 创建服务器连接conn, err : net.Dial(tcp, 127.0.0.1:8092)if err ! nil {fmt.Println(client start err , err)return}for {//发送封装后的数据包dp : znet.NewDataPack()binaryMsg, err : dp.Pack(znet.NewMessage(0, []byte(Zinx client0 test msg)))if err ! nil {fmt.Println(client pack msg err , err)return}if _, err : conn.Write(binaryMsg); err ! nil {fmt.Println(client write err , err)return}//服务器应该给我们回复一个message数据msgId为1内容为ping...ping...//1 先读取流中的head部分得到Id和dataLenbinaryHead : make([]byte, dp.GetHeadLen())if _, err : io.ReadFull(conn, binaryHead); err ! nil {fmt.Println(client read head err , err)break}//将二进制的head拆包到msg中msgHead, err : dp.UnPack(binaryHead)if err ! nil {fmt.Println(client unpack msgHead err , err)break}if msgHead.GetMsgLen() 0 {//2 有数据, 再根据dataLen进行二次读取将data读出来msg : msgHead.(*znet.Message)msg.Data make([]byte, msg.GetMsgLen())if _, err : io.ReadFull(conn, msg.Data); err ! nil {fmt.Println(read msg data error , err)return}fmt.Println(-------- Receive Server msg , ID, msg.Id, ,len, msg.DataLen, ,data, string(msg.Data))}//cpu阻塞让出cpu时间片避免无限for循环导致其他程序无法获取cpu时间片time.Sleep(time.Second * 1)}
}client1.go myDemo/ZinxV0.7/server.go
package mainimport (fmtmyTest/zinx/zifacemyTest/zinx/znet
)//自定义一个Router测试路由功能
type PingRouter struct {znet.BaseRouter
}func (pr *PingRouter) Handler(request ziface.IRequest) {fmt.Println(call router handler...)//先读取客户端数据再回写ping...ping...ping...fmt.Println(receive from client msgId, request.GetMsgID(),data, string(request.GetData()))//回写pingerr : request.GetConnection().SendMsg(0, []byte(ping...ping...ping...))if err ! nil {fmt.Println(err)}
}//定义第二个Router
type HelloRouter struct {znet.BaseRouter
}func (hr *HelloRouter) Handler(request ziface.IRequest) {fmt.Println(receive from client msgId, request.GetMsgID(),data, string(request.GetData()))err : request.GetConnection().SendMsg(1, []byte(hello zinx, Im the other handler))if err ! nil {fmt.Println(err)}
}func main() {s : znet.NewServer([Zinx v0.7])//添加自定义路由PingRouter和HelloRouterrouter0 : PingRouter{}s.AddRouter(0, router0)router1 : HelloRouter{}s.AddRouter(1, router1)s.Serve()
}
结果
接受多个客户端也可以 当client0退出时不会影响client1
2 创建消息队列及多任务[v0.8] 创建一个消息队列,MsgHandler消息管理模块增加TaskQueue、WorkerPoolSize创还能多任务worker的工作池并且启动将之前发送的消息全部改为把消息发送给消息队列和worker工作池来处理 实现消息队列机制和工作池机制集成到自定义框架 创建一个消息队列MsgHandler消息管理模块创建多任务worker的工作池并启动将之前发送的消息全部改为把消息发送给消息队列和worker工作池来处理将消息队列机制集成到Zinx框架中 开启并调用消息队列及worker工作池将从客户端处理的消息发送给当前Worker的工作池来处理 zinx/znet/server.go
package znetimport (fmtmyTest/zinx/utilmyTest/zinx/zifacenet
)type Server struct {Name stringIPVersion stringIP stringPort intMsgHandler *MsgHandle
}func NewServer(name string) *Server {s : Server{Name: name,IPVersion: tcp4,IP: util.GlobalObject.Host,Port: util.GlobalObject.TcpPort,MsgHandler: NewMsgHandle(),}return s
}func (s *Server) Start() {//启动服务监听端口fmt.Printf([Zinx] Server Name :%s , listen IP :%v , Port: %d is starting \n, s.Name, s.IP, s.Port)fmt.Printf([Zinx] Version :%s , MaxConn:%v , MaxPackageSize: %d \n, util.GlobalObject.Version, util.GlobalObject.MaxConn, util.GlobalObject.MaxPackageSize)var cid uint32 0go func() {//0 开启消息队列及Worker工作池s.MsgHandler.StartWorkerPool()addr, err : net.ResolveTCPAddr(s.IPVersion, fmt.Sprintf(%s:%d, s.IP, s.Port))if err ! nil {fmt.Printf(resolve tcp addr error %v\n, err)return}listener, err : net.ListenTCP(s.IPVersion, addr)if err ! nil {fmt.Println(listen , s.IPVersion, err , err)return}fmt.Println([start] Zinx server success , s.Name, Listening...)//阻塞连接处理业务for {conn, err : listener.AcceptTCP()if err ! nil {fmt.Println(Accept err , err)continue}dealConn : NewConnection(conn, cid, s.MsgHandler)cid//开启goroutine处理启动当前conngo dealConn.Start()}}()
}func (s *Server) Stop() {}func (s *Server) Serve() {s.Start()//阻塞一直读取客户端所发送过来的消息select {}
}func (s *Server) AddRouter(msgId uint32, router ziface.IRouter) {s.MsgHandler.AddRouter(msgId, router)
}zinx/znet/connection.go
package znetimport (fmtgithub.com/kataras/iris/v12/x/errorsiomyTest/zinx/utilnet
)type Connection struct {Conn *net.TCPConnConnID uint32isClosed boolmsgChannel chan []byte//告知当前的连接已经退出/停止由Reader告知writer退出ExitChan chan boolMsgHandler *MsgHandle
}func NewConnection(conn *net.TCPConn, connID uint32, msgHandle *MsgHandle) *Connection {c : Connection{Conn: conn,ConnID: connID,MsgHandler: msgHandle,isClosed: false,msgChannel: make(chan []byte),ExitChan: make(chan bool, 1),}return c
}func (c *Connection) StartWriter() {fmt.Println([Writer Goroutine is running])defer fmt.Println([conn Writer goroutine exit!], c.RemoteAddr().String())//不断的阻塞等待channel的消息然后将channel中的消息写给客户端for {select {case data : -c.msgChannel://有数据写给客户端if _, err : c.Conn.Write(data); err ! nil {fmt.Println(Send data error , , err)return}case -c.ExitChan://代表reader已经退出此时writer也需要退出return}}
}func (c *Connection) StartReader() {fmt.Println(reader goroutine is running...)defer fmt.Println([Reader goroutine is exit] connID, c.ConnID, remote addr is , c.RemoteAddr().String())defer c.Stop()//读取数据for {//创建一个拆包对象dp : NewDataPack()//读取客户端的msg Head 二进制流 8字节headData : make([]byte, dp.GetHeadLen())if _, err : io.ReadFull(c.GetTCPConnection(), headData); err ! nil {fmt.Println(read msg head err , err)break}//拆包将读取到的headData封装为msgmsg, err : dp.UnPack(headData)if err ! nil {fmt.Println(unpack msg err , err)break}//根据dataLen再次读取Data放在msg.Data中var data []byte//如果数据包中有数据则读取if msg.GetMsgLen() 0 {data make([]byte, msg.GetMsgLen())//将切片data读满if _, err : io.ReadFull(c.GetTCPConnection(), data); err ! nil {fmt.Println(read msg data err , err)break}}msg.SetData(data)//封装请求改为router处理r : Request{conn: c,msg: msg,}//判断是否开启workerPool,如果没有开启则直接创建协程处理如果开启则通过workerPool处理if util.GlobalObject.WorkerPoolSize 0 {c.MsgHandler.SendMsgToTaskQueue(r)} else {go c.MsgHandler.DoMsgHandler(r)}}
}//启动连接
func (c *Connection) Start() {fmt.Printf(ConnID %d is Start..., c.ConnID)//开启读、写go c.StartReader()go c.StartWriter()
}//停止连接
func (c *Connection) Stop() {fmt.Println(Connection Stop()...ConnectionID , c.ConnID)if c.isClosed {return}c.isClosed truec.Conn.Close()c.ExitChan - trueclose(c.msgChannel)close(c.ExitChan)
}//获取当前连接的Conn对象
func (c *Connection) GetTCPConnection() *net.TCPConn {return c.Conn
}//获取当前连接模块的id
func (c *Connection) GetConnectionID() uint32 {return c.ConnID
}//获取远程客户端的TCP状态 IPPort
func (c *Connection) RemoteAddr() net.Addr {return c.Conn.RemoteAddr()
}//发送数据
func (c *Connection) SendMsg(msgId uint32, data []byte) error {if c.isClosed {return errors.New(connection closed\n)}//将data进行封包dp : NewDataPack()binaryMsg, err : dp.Pack(NewMessage(msgId, data))if err ! nil {fmt.Println(Pack error msg id, msgId)return errors.New(pack error msg)}//将数据发送给客户端if _, err : c.Conn.Write(binaryMsg); err ! nil {fmt.Println(write msg id , msgId, error , err)return errors.New(conn write err )}return nil
}zinx/znet/msgHandler.go
package znetimport (fmtmyTest/zinx/utilmyTest/zinx/zifacestrconv
)type MsgHandle struct {//msgId与对应的router对应Api map[uint32]ziface.IRouter//负责worker取任务的消息队列TaskQueue []chan ziface.IRequest//业务工作worker池的goroutine数量WorkerPoolSize uint32
}func NewMsgHandle() *MsgHandle {return MsgHandle{Api: make(map[uint32]ziface.IRouter),TaskQueue: make([]chan ziface.IRequest, util.GlobalObject.WorkerPoolSize),WorkerPoolSize: util.GlobalObject.WorkerPoolSize,}
}func (mh *MsgHandle) DoMsgHandler(request ziface.IRequest) {//判断是否有对应的routerif _, ok : mh.Api[request.GetMsgID()]; !ok {fmt.Println(msgId , request.GetMsgID(), does not exist handler, need to add router)return}//call handlerrouter : mh.Api[request.GetMsgID()]router.PreHandle(request)router.Handler(request)router.PostHandler(request)
}func (mh *MsgHandle) AddRouter(msgId uint32, router ziface.IRouter) {if _, ok : mh.Api[msgId]; ok {//如果已经存在了对应的router则提示panic(repeat api, msgId strconv.Itoa(int(msgId)))}mh.Api[msgId] routerfmt.Println(msgId , msgId, Add router success )
}//启动一个worker工作池开启工作池的动作只能发生一次一个zinx框架只能有一个worker工作池
func (mh *MsgHandle) StartWorkerPool() {for i : 0; i int(mh.WorkerPoolSize); i {//开辟任务队列mh.TaskQueue[i] make(chan ziface.IRequest, util.GlobalObject.MaxWorkerTaskLen)//启动workergo mh.startOneWorker(i, mh.TaskQueue[i])}
}func (mh *MsgHandle) startOneWorker(workerId int, taskQueue chan ziface.IRequest) {fmt.Println(Worker ID, workerId, is started...)for {select {//从任务队列中取消息如果有消息过来出列的就是request然后执行该request所绑定的业务case request : -taskQueue:mh.DoMsgHandler(request)}}
}//将消息交给taskQueue由Worker进行处理
func (mh *MsgHandle) SendMsgToTaskQueue(request ziface.IRequest) {//通过取余数的方式来达到负载均衡workID : request.GetConnection().GetConnectionID() % util.GlobalObject.WorkerPoolSizefmt.Println(Add ConnID, request.GetConnection().GetConnectionID(), requestID, request.GetMsgID(), workID, workID)//将消息发送给对应worker的任务队列mh.TaskQueue[workID] - request
}zinx/ziface/imsgHandler.go
package zifacetype IMsgHandler interface {DoMsgHandler(request IRequest)AddRouter(msgId uint32, router IRouter)StartWorkerPool()SendMsgToTaskQueue(request IRequest)
}测试
myDemo/ZinxV0.8/Server.go 同myDemo/ZinxV0.7/Server.go修改一下NewServer时候所传的Zinx的名称即可 myDemo/ZinxV0.8/Client.go 同myDemo/ZinxV0.7/Client.go myDemo/ZinxV0.8/zinx.json
{Name: Zinx Server Application,Version: V0.8,Host: 0.0.0.0,TcpPort: 8092,MaxConn: 30,MaxPackageSize: 1024,WorkerPoolSize: 10
}3 连接管理器connManager[v0.9]
3.1 连接管理器(conn)的定义与实现 创建一个连接管理模块ConnManager 添加连接删除连接根据连接ID查找对应的连接总连接个数清理全部的连接 3.2 将连接管理模块集成到Zinx框架中 给server添加一个ConnMgr属性修改NewServer方法加入ConnMgr初始化判断当前连接数是否超出最大值MaxConn当server停止的时候调用server.Stop方法应该加入ConnMgr.ClearConn() 3.3 提供创建连接/销毁连之前所需的Hook函数 给我们自定义框架Zinx提供创建连接之后/销毁连接之前所要处理的一些业务。提供给用户能够注册的Hook函数 添加OnConnStart()添加OnConnStop() zinx/ziface/iserver.go
package zifacetype IServer interface {Start()Stop()Serve()AddRouter(msgId uint32, router IRouter)GetConnMgr() IConnManager//注册创OnConnStart钩子函数SetOnConnStart(func(conn IConnection))SetOnConnStop(func(conn IConnection))//调用OnConnStart钩子函数CallOnConnStart(conn IConnection)CallOnConnStop(conn IConnection)
}zinx/ziface/iconnmanager.go
package zifacetype IConnManager interface {Add(conn IConnection)Remove(conn IConnection)Get(connID uint32) (IConnection, error)Len() intClearConn()
}zinx/znet/connmanager.go
package znetimport (fmtgithub.com/kataras/iris/v12/x/errorsmyTest/zinx/utilmyTest/zinx/zifacesync
)type ConnManager struct {connections map[uint32]ziface.IConnection //管理的连接集合connLock sync.RWMutex //保护连接集合的读写锁
}func NewConnManager() *ConnManager {return ConnManager{connections: make(map[uint32]ziface.IConnection, util.GlobalObject.MaxConn),}
}
func (cm *ConnManager) Add(conn ziface.IConnection) {//添加写锁cm.connLock.Lock()defer cm.connLock.Unlock()cm.connections[conn.GetConnectionID()] connfmt.Println(connectionID, conn.GetConnectionID(), add to ConnManager success, conn num, cm.Len())
}func (cm *ConnManager) Remove(conn ziface.IConnection) {//保护共享资源mapcm.connLock.Lock()defer cm.connLock.Unlock()delete(cm.connections, conn.GetConnectionID())fmt.Println(connectionID, conn.GetConnectionID(), remote from ConnManager success, conn num, cm.Len())
}func (cm *ConnManager) Get(connID uint32) (ziface.IConnection, error) {cm.connLock.RLock()defer cm.connLock.RUnlock()if conn, ok : cm.connections[connID]; ok {return conn, nil} else {return nil, errors.New(connection NOT FOUND)}
}func (cm *ConnManager) Len() int {return len(cm.connections)
}func (cm *ConnManager) ClearConn() {cm.connLock.Lock()defer cm.connLock.Unlock()for connID, conn : range cm.connections {//停止连接conn.Stop()//删除连接delete(cm.connections, connID)}fmt.Println(Clear All connections success! conn num, cm.Len())
}zinx/znet/connection.go
package znetimport (fmtgithub.com/kataras/iris/v12/x/errorsiomyTest/zinx/utilmyTest/zinx/zifacenet
)type Connection struct {Conn *net.TCPConnConnID uint32isClosed boolmsgChannel chan []byte//告知当前的连接已经退出/停止由Reader告知writer退出ExitChan chan boolMsgHandler *MsgHandleTcpServer ziface.IServer
}func NewConnection(server ziface.IServer, conn *net.TCPConn, connID uint32, msgHandle *MsgHandle) *Connection {c : Connection{Conn: conn,ConnID: connID,MsgHandler: msgHandle,isClosed: false,msgChannel: make(chan []byte),ExitChan: make(chan bool, 1),TcpServer: server,}//将conn添加到connMgr中c.TcpServer.GetConnMgr().Add(c)return c
}func (c *Connection) StartWriter() {fmt.Println([Writer Goroutine is running])defer fmt.Println([conn Writer goroutine exit!], c.RemoteAddr().String())//不断的阻塞等待channel的消息然后将channel中的消息写给客户端for {select {case data : -c.msgChannel://有数据写给客户端if _, err : c.Conn.Write(data); err ! nil {fmt.Println(Send data error , , err)return}case -c.ExitChan://代表reader已经退出此时writer也需要退出return}}
}func (c *Connection) StartReader() {fmt.Println(reader goroutine is running...)defer fmt.Println([Reader goroutine is exit] connID, c.ConnID, remote addr is , c.RemoteAddr().String())defer c.Stop()//读取数据for {//创建一个拆包对象dp : NewDataPack()//读取客户端的msg Head 二进制流 8字节headData : make([]byte, dp.GetHeadLen())if _, err : io.ReadFull(c.GetTCPConnection(), headData); err ! nil {fmt.Println(read msg head err , err)break}//拆包将读取到的headData封装为msgmsg, err : dp.UnPack(headData)if err ! nil {fmt.Println(unpack msg err , err)break}//根据dataLen再次读取Data放在msg.Data中var data []byte//如果数据包中有数据则读取if msg.GetMsgLen() 0 {data make([]byte, msg.GetMsgLen())//将切片data读满if _, err : io.ReadFull(c.GetTCPConnection(), data); err ! nil {fmt.Println(read msg data err , err)break}}msg.SetData(data)//封装请求改为router处理r : Request{conn: c,msg: msg,}//判断是否开启workerPool,如果没有开启则直接创建协程处理如果开启则通过workerPool处理if util.GlobalObject.WorkerPoolSize 0 {c.MsgHandler.SendMsgToTaskQueue(r)} else {go c.MsgHandler.DoMsgHandler(r)}}
}//启动连接
func (c *Connection) Start() {fmt.Printf(ConnID %d is Start..., c.ConnID)//开启读、写go c.StartReader()go c.StartWriter()//执行钩子函数c.TcpServer.CallOnConnStart(c)
}//停止连接
func (c *Connection) Stop() {fmt.Println(Connection Stop()...ConnectionID , c.ConnID)if c.isClosed {return}c.isClosed true//连接关闭之前执行hook关闭的钩子函数c.TcpServer.CallOnConnStop(c)c.Conn.Close()c.ExitChan - true//连接conn关闭时需要从连接管理模块中移除c.TcpServer.GetConnMgr().Remove(c)close(c.msgChannel)close(c.ExitChan)
}//获取当前连接的Conn对象
func (c *Connection) GetTCPConnection() *net.TCPConn {return c.Conn
}//获取当前连接模块的id
func (c *Connection) GetConnectionID() uint32 {return c.ConnID
}//获取远程客户端的TCP状态 IPPort
func (c *Connection) RemoteAddr() net.Addr {return c.Conn.RemoteAddr()
}//发送数据
func (c *Connection) SendMsg(msgId uint32, data []byte) error {if c.isClosed {return errors.New(connection closed\n)}//将data进行封包dp : NewDataPack()binaryMsg, err : dp.Pack(NewMessage(msgId, data))if err ! nil {fmt.Println(Pack error msg id, msgId)return errors.New(pack error msg)}//将数据发送给客户端if _, err : c.Conn.Write(binaryMsg); err ! nil {fmt.Println(write msg id , msgId, error , err)return errors.New(conn write err )}return nil
}zinx/znet/server.go
package znetimport (fmtmyTest/zinx/utilmyTest/zinx/zifacenet
)type Server struct {Name stringIPVersion stringIP stringPort intMsgHandler *MsgHandleConnMgr *ConnManager//创建连接之前的Hook函数OnConnStart func(conn ziface.IConnection)OnConnStop func(conn ziface.IConnection)
}func NewServer(name string) *Server {s : Server{Name: name,IPVersion: tcp4,IP: util.GlobalObject.Host,Port: util.GlobalObject.TcpPort,MsgHandler: NewMsgHandle(),ConnMgr: NewConnManager(),}return s
}func (s *Server) Start() {//启动服务监听端口fmt.Printf([Zinx] Server Name :%s , listen IP :%v , Port: %d is starting \n, s.Name, s.IP, s.Port)fmt.Printf([Zinx] Version :%s , MaxConn:%v , MaxPackageSize: %d \n, util.GlobalObject.Version, util.GlobalObject.MaxConn, util.GlobalObject.MaxPackageSize)var cid uint32 0go func() {//0 开启消息队列及Worker工作池s.MsgHandler.StartWorkerPool()addr, err : net.ResolveTCPAddr(s.IPVersion, fmt.Sprintf(%s:%d, s.IP, s.Port))if err ! nil {fmt.Printf(resolve tcp addr error %v\n, err)return}listener, err : net.ListenTCP(s.IPVersion, addr)if err ! nil {fmt.Println(listen , s.IPVersion, err , err)return}fmt.Println([start] Zinx server success , s.Name, Listening...)//阻塞连接处理业务for {conn, err : listener.AcceptTCP()if err ! nil {fmt.Println(Accept err , err)continue}//判断当前连接数是否超过最大连接数如果超过则关闭新创建的连接if s.ConnMgr.Len() util.GlobalObject.MaxConn {//TODO 给客户端返回一个超出最大连接的错误包fmt.Println(-----------------》 Tcp Conn exceed, conn num, util.GlobalObject.MaxConn)conn.Close()//关闭当前连接等待下一次连接【如果当前连接数小于最大连接数】continue}dealConn : NewConnection(s, conn, cid, s.MsgHandler)cid//开启goroutine处理启动当前conngo dealConn.Start()}}()
}func (s *Server) Stop() {//释放相关资源fmt.Println([STOP] Zinx server name , s.Name)s.ConnMgr.ClearConn()
}func (s *Server) Serve() {s.Start()//阻塞一直读取客户端所发送过来的消息select {}
}func (s *Server) GetConnMgr() ziface.IConnManager {return s.ConnMgr
}func (s *Server) AddRouter(msgId uint32, router ziface.IRouter) {s.MsgHandler.AddRouter(msgId, router)
}//注册创OnConnStart钩子函数
func (s *Server) SetOnConnStart(hookFunc func(conn ziface.IConnection)) {s.OnConnStart hookFunc
}func (s *Server) SetOnConnStop(hookFunc func(conn ziface.IConnection)) {s.OnConnStop hookFunc
}//调用OnConnStart钩子函数
func (s *Server) CallOnConnStart(conn ziface.IConnection) {if s.OnConnStart ! nil {fmt.Println(--------- call OnConnStart())s.OnConnStart(conn)}
}func (s *Server) CallOnConnStop(conn ziface.IConnection) {if s.OnConnStop ! nil {fmt.Println(---------- call OnConnStop())s.OnConnStop(conn)}
}测试
myDemo/ZinxV0.9/Server.go
package mainimport (fmtmyTest/zinx/zifacemyTest/zinx/znet
)//自定义一个Router测试路由功能
type PingRouter struct {znet.BaseRouter
}func (pr *PingRouter) Handler(request ziface.IRequest) {fmt.Println(call router handler...)//先读取客户端数据再回写ping...ping...ping...fmt.Println(receive from client msgId, request.GetMsgID(),data, string(request.GetData()))//回写pingerr : request.GetConnection().SendMsg(0, []byte(ping...ping...ping...))if err ! nil {fmt.Println(err)}
}//定义第二个Router
type HelloRouter struct {znet.BaseRouter
}func (hr *HelloRouter) Handler(request ziface.IRequest) {fmt.Println(receive from client msgId, request.GetMsgID(),data, string(request.GetData()))err : request.GetConnection().SendMsg(1, []byte(hello zinx, Im the other handler))if err ! nil {fmt.Println(err)}
}//连接创建成功之后需要执行的逻辑
func DoConnBegin(conn ziface.IConnection) {fmt.Println(Do Conn Begin...)if err : conn.SendMsg(202, []byte(do connection begin...)); err ! nil {fmt.Println(err)}
}//连接断开之前要执行的逻辑
func DoConnLost(conn ziface.IConnection) {fmt.Println(Do Conn Lost...)fmt.Println(connID, conn.GetConnectionID(), is Lost....)
}func main() {s : znet.NewServer([Zinx v0.9])//添加自定义路由PingRouter和HelloRouterrouter0 : PingRouter{}s.AddRouter(0, router0)router1 : HelloRouter{}s.AddRouter(1, router1)//注册hook钩子函数s.SetOnConnStart(DoConnBegin)s.SetOnConnStop(DoConnLost)s.Serve()
}测试代码中的myDemo/ZinxV0.9/Client.go和myDemo/ZinxV0.8/Client.go一样。 为了方便测试超过最大连接数的报错信息我们可以修改配置文件 //将最大连接数设置为2然后我们复制Client.go可以多起几个Client来进行测试
{Name: Zinx Server Application,Version: V0.9,Host: 0.0.0.0,TcpPort: 8092,MaxConn: 2,MaxPackageSize: 1024,WorkerPoolSize: 10
}测试最大连接数与连接管理 测试钩子函数 4 添加连接属性并测试【v0.10】 通过map[string]interface{}来存储连接的属性值,通过RWLock来保证读写connection属性值安全 设置连接属性获取连接属性移除连接属性 zinx/ziface/iconnection.go
package zifaceimport nettype IConnection interface {//启动连接Start()//停止连接Stop()//获取当前连接的Conn对象GetTCPConnection() *net.TCPConn//获取当前连接模块的idGetConnectionID() uint32//获取远程客户端的TCP状态 IPPortRemoteAddr() net.Addr//发送数据SendMsg(msgId uint32, data []byte) errorSetProperty(key string, value interface{})GetProperty(key string) (interface{}, error)RemoveProperty(key string)
}//定义一个处理连接业务的方法
type HandleFunc func(*net.TCPConn, []byte, int) errorzinx/znet/connection.go
package znetimport (fmtgithub.com/kataras/iris/v12/x/errorsiomyTest/zinx/utilmyTest/zinx/zifacenetsync
)type Connection struct {Conn *net.TCPConnConnID uint32isClosed boolmsgChannel chan []byte//告知当前的连接已经退出/停止由Reader告知writer退出ExitChan chan boolMsgHandler *MsgHandleTcpServer ziface.IServerproperty map[string]interface{}propertyLock sync.RWMutex
}func NewConnection(server ziface.IServer, conn *net.TCPConn, connID uint32, msgHandle *MsgHandle) *Connection {c : Connection{Conn: conn,ConnID: connID,MsgHandler: msgHandle,isClosed: false,msgChannel: make(chan []byte),ExitChan: make(chan bool, 1),TcpServer: server,property: make(map[string]interface{}),}//将conn添加到connMgr中c.TcpServer.GetConnMgr().Add(c)return c
}func (c *Connection) StartWriter() {fmt.Println([Writer Goroutine is running])defer fmt.Println([conn Writer goroutine exit!], c.RemoteAddr().String())//不断的阻塞等待channel的消息然后将channel中的消息写给客户端for {select {case data : -c.msgChannel://有数据写给客户端if _, err : c.Conn.Write(data); err ! nil {fmt.Println(Send data error , , err)return}case -c.ExitChan://代表reader已经退出此时writer也需要退出return}}
}func (c *Connection) StartReader() {fmt.Println(reader goroutine is running...)defer fmt.Println([Reader goroutine is exit] connID, c.ConnID, remote addr is , c.RemoteAddr().String())defer c.Stop()//读取数据for {//创建一个拆包对象dp : NewDataPack()//读取客户端的msg Head 二进制流 8字节headData : make([]byte, dp.GetHeadLen())if _, err : io.ReadFull(c.GetTCPConnection(), headData); err ! nil {fmt.Println(read msg head err , err)break}//拆包将读取到的headData封装为msgmsg, err : dp.UnPack(headData)if err ! nil {fmt.Println(unpack msg err , err)break}//根据dataLen再次读取Data放在msg.Data中var data []byte//如果数据包中有数据则读取if msg.GetMsgLen() 0 {data make([]byte, msg.GetMsgLen())//将切片data读满if _, err : io.ReadFull(c.GetTCPConnection(), data); err ! nil {fmt.Println(read msg data err , err)break}}msg.SetData(data)//封装请求改为router处理r : Request{conn: c,msg: msg,}//判断是否开启workerPool,如果没有开启则直接创建协程处理如果开启则通过workerPool处理if util.GlobalObject.WorkerPoolSize 0 {c.MsgHandler.SendMsgToTaskQueue(r)} else {go c.MsgHandler.DoMsgHandler(r)}}
}//启动连接
func (c *Connection) Start() {fmt.Printf(ConnID %d is Start..., c.ConnID)//开启读、写go c.StartReader()go c.StartWriter()//执行钩子函数c.TcpServer.CallOnConnStart(c)
}//停止连接
func (c *Connection) Stop() {fmt.Println(Connection Stop()...ConnectionID , c.ConnID)if c.isClosed {return}c.isClosed true//连接关闭之前执行hook关闭的钩子函数c.TcpServer.CallOnConnStop(c)c.Conn.Close()c.ExitChan - true//连接conn关闭时需要从连接管理模块中移除c.TcpServer.GetConnMgr().Remove(c)close(c.msgChannel)close(c.ExitChan)
}//获取当前连接的Conn对象
func (c *Connection) GetTCPConnection() *net.TCPConn {return c.Conn
}//获取当前连接模块的id
func (c *Connection) GetConnectionID() uint32 {return c.ConnID
}//获取远程客户端的TCP状态 IPPort
func (c *Connection) RemoteAddr() net.Addr {return c.Conn.RemoteAddr()
}//发送数据
func (c *Connection) SendMsg(msgId uint32, data []byte) error {if c.isClosed {return errors.New(connection closed\n)}//将data进行封包dp : NewDataPack()binaryMsg, err : dp.Pack(NewMessage(msgId, data))if err ! nil {fmt.Println(Pack error msg id, msgId)return errors.New(pack error msg)}//将数据发送给客户端if _, err : c.Conn.Write(binaryMsg); err ! nil {fmt.Println(write msg id , msgId, error , err)return errors.New(conn write err )}return nil
}func (c *Connection) SetProperty(key string, value interface{}) {c.propertyLock.Lock()defer c.propertyLock.Unlock()c.property[key] value
}func (c *Connection) GetProperty(key string) (interface{}, error) {c.propertyLock.RLock()defer c.propertyLock.RUnlock()if value, ok : c.property[key]; ok {return value, nil} else {return nil, errors.New(no property found)}
}func (c *Connection) RemoveProperty(key string) {c.propertyLock.Lock()defer c.propertyLock.Unlock()delete(c.property, key)
}测试
myDemo/ZinxV0.10/Server.go
package mainimport (fmtmyTest/zinx/zifacemyTest/zinx/znet
)//自定义一个Router测试路由功能
type PingRouter struct {znet.BaseRouter
}func (pr *PingRouter) Handler(request ziface.IRequest) {fmt.Println(call router handler...)//先读取客户端数据再回写ping...ping...ping...fmt.Println(receive from client msgId, request.GetMsgID(),data, string(request.GetData()))//回写pingerr : request.GetConnection().SendMsg(0, []byte(ping...ping...ping...))if err ! nil {fmt.Println(err)}
}//定义第二个Router
type HelloRouter struct {znet.BaseRouter
}func (hr *HelloRouter) Handler(request ziface.IRequest) {fmt.Println(receive from client msgId, request.GetMsgID(),data, string(request.GetData()))err : request.GetConnection().SendMsg(1, []byte(hello zinx, Im the other handler))if err ! nil {fmt.Println(err)}
}//连接创建成功之后需要执行的逻辑
func DoConnBegin(conn ziface.IConnection) {fmt.Println(Do Conn Begin...)if err : conn.SendMsg(202, []byte(do connection begin...)); err ! nil {fmt.Println(err)}//给conn设置属性conn.SetProperty(Name, ziyi)conn.SetProperty(士兵突击, https://www.bilibili.com/video/BV1Lk4y1N7tC/)
}//连接断开之前要执行的逻辑
func DoConnLost(conn ziface.IConnection) {fmt.Println(Do Conn Lost...)fmt.Println(connID, conn.GetConnectionID(), is Lost....)//读取属性property, _ : conn.GetProperty(Name)fmt.Println(Get Property Name, property)property, _ conn.GetProperty(士兵突击)fmt.Println(Get Property 士兵突击, property)
}func main() {s : znet.NewServer([Zinx v0.10])//添加自定义路由PingRouter和HelloRouterrouter0 : PingRouter{}s.AddRouter(0, router0)router1 : HelloRouter{}s.AddRouter(1, router1)//注册hook钩子函数s.SetOnConnStart(DoConnBegin)s.SetOnConnStop(DoConnLost)s.Serve()
}myDemo/ZinxV0.10/Client.go
package mainimport (fmtiomyTest/zinx/znetnettime
)/*
模拟客户端
*/
func main() {fmt.Println(client start...)time.Sleep(time.Second * 1)//1 创建服务器连接conn, err : net.Dial(tcp, 127.0.0.1:8092)if err ! nil {fmt.Println(client start err , err)return}for {//发送封装后的数据包dp : znet.NewDataPack()binaryMsg, err : dp.Pack(znet.NewMessage(0, []byte(Zinx client0 test msg)))if err ! nil {fmt.Println(client pack msg err , err)return}if _, err : conn.Write(binaryMsg); err ! nil {fmt.Println(client write err , err)return}//服务器应该给我们回复一个message数据msgId为1内容为ping...ping...//1 先读取流中的head部分得到Id和dataLenbinaryHead : make([]byte, dp.GetHeadLen())if _, err : io.ReadFull(conn, binaryHead); err ! nil {fmt.Println(client read head err , err)break}//将二进制的head拆包到msg中msgHead, err : dp.UnPack(binaryHead)if err ! nil {fmt.Println(client unpack msgHead err , err)break}if msgHead.GetMsgLen() 0 {//2 有数据, 再根据dataLen进行二次读取将data读出来msg : msgHead.(*znet.Message)msg.Data make([]byte, msg.GetMsgLen())if _, err : io.ReadFull(conn, msg.Data); err ! nil {fmt.Println(read msg data error , err)return}fmt.Println(-------- Receive Server msg , ID, msg.Id, ,len, msg.DataLen, ,data, string(msg.Data))}//cpu阻塞让出cpu时间片避免无限for循环导致其他程序无法获取cpu时间片time.Sleep(time.Second * 1)}
}