当前位置: 首页 > news >正文

苏州智能网站开发360网页

苏州智能网站开发,360网页,seo职位是什么意思,自己创建的网站简介#xff1a; Java 的世界里#xff0c;大家广泛使用的一个高性能网络通信框架 netty#xff0c;很多 RPC 框架都是基于 netty 来实现的。在 golang 的世界里#xff0c;getty 也是一个类似 netty 的高性能网络通信库。getty 最初由 dubbogo 项目负责人于雨开发#xf…简介 Java 的世界里大家广泛使用的一个高性能网络通信框架 netty很多 RPC 框架都是基于 netty 来实现的。在 golang 的世界里getty 也是一个类似 netty 的高性能网络通信库。getty 最初由 dubbogo 项目负责人于雨开发作为底层通信库在 dubbo-go 中使用。随着 dubbo-go 捐献给 apache 基金会在社区小伙伴的共同努力下getty 也最终进入到 apache 这个大家庭并改名 dubbo-getty 。 作者 | 刘晓敏 于雨 一、简介 Java 的世界里大家广泛使用的一个高性能网络通信框架 netty很多 RPC 框架都是基于 netty 来实现的。在 golang 的世界里getty 也是一个类似 netty 的高性能网络通信库。getty 最初由 dubbogo 项目负责人于雨开发作为底层通信库在 dubbo-go 中使用。随着 dubbo-go 捐献给 apache 基金会在社区小伙伴的共同努力下getty 也最终进入到 apache 这个大家庭并改名 dubbo-getty 。 18 年的时候我在公司里实践微服务当时遇到最大的问题就是分布式事务问题。同年阿里在社区开源他们的分布式事务解决方案我也很快关注到这个项目起初还叫 fescar后来更名 seata。由于我对开源技术很感兴趣加了很多社区群当时也很关注 dubbo-go 这个项目在里面默默潜水。随着对 seata 的了解逐渐萌生了做一个 go 版本的分布式事务框架的想法。 要做一个 golang 版的分布式事务框架首要的一个问题就是如何实现 RPC 通信。dubbo-go 就是很好的一个例子摆在眼前遂开始研究 dubbo-go 的底层 getty。 二、如何基于 getty 实现 RPC 通信 getty 框架的整体模型图如下 下面结合相关代码详述 seata-golang 的 RPC 通信过程。 1. 建立连接 实现 RPC 通信首先要建立网络连接吧我们从 client.go 开始看起。 func (c *client) connect() {var (err errorss Session)for {// 建立一个 session 连接ss c.dial()if ss nil {// client has been closedbreak}err c.newSession(ss)if err nil {// 收发报文ss.(*session).run()// 此处省略部分代码break}// dont distinguish between tcp connection and websocket connection. Because// gorilla/websocket/conn.go:(Conn)Close also invoke net.Conn.Close()ss.Conn().Close()} } connect() 方法通过 dial() 方法得到了一个 session 连接进入 dial() 方法 func (c *client) dial() Session {switch c.endPointType {case TCP_CLIENT:return c.dialTCP()case UDP_CLIENT:return c.dialUDP()case WS_CLIENT:return c.dialWS()case WSS_CLIENT:return c.dialWSS()}return nil } 我们关注的是 TCP 连接所以继续进入 c.dialTCP() 方法 func (c *client) dialTCP() Session {var (err errorconn net.Conn)for {if c.IsClosed() {return nil}if c.sslEnabled {if sslConfig, err : c.tlsConfigBuilder.BuildTlsConfig(); err nil sslConfig ! nil {d : net.Dialer{Timeout: connectTimeout}// 建立加密连接conn, err tls.DialWithDialer(d, tcp, c.addr, sslConfig)}} else {// 建立 tcp 连接conn, err net.DialTimeout(tcp, c.addr, connectTimeout)}if err nil gxnet.IsSameAddr(conn.RemoteAddr(), conn.LocalAddr()) {conn.Close()err errSelfConnect}if err nil {// 返回一个 TCPSessionreturn newTCPSession(conn, c)}log.Infof(net.DialTimeout(addr:%s, timeout:%v) error:%v, c.addr, connectTimeout, perrors.WithStack(err))-wheel.After(connectInterval)} } 至此我们知道了 getty 如何建立 TCP 连接并返回 TCPSession。 2. 收发报文 那它是怎么收发报文的呢我们回到 connection 方法接着往下看有这样一行 ss.(*session).run()在这行代码之后代码都是很简单的操作我们猜测这行代码运行的逻辑里面一定包含收发报文的逻辑接着进入 run() 方法 func (s *session) run() {// 省略部分代码go s.handleLoop()go s.handlePackage() } 这里起了两个 goroutinehandleLoop 和 handlePackage看字面意思符合我们的猜想进入 handleLoop() 方法 func (s *session) handleLoop() {// 省略部分代码for {// A select blocks until one of its cases is ready to run.// It choose one at random if multiple are ready. Otherwise it choose default branch if none is ready.select {// 省略部分代码case outPkg, ok -s.wQ:// 省略部分代码iovec iovec[:0]for idx : 0; idx maxIovecNum; idx {// 通过 s.writer 将 interface{} 类型的 outPkg 编码成二进制的比特pkgBytes, err s.writer.Write(s, outPkg)// 省略部分代码iovec append(iovec, pkgBytes)//省略部分代码}// 将这些二进制比特发送出去err s.WriteBytesArray(iovec[:]...)if err ! nil {log.Errorf(%s, [session.handleLoop]s.WriteBytesArray(iovec len:%d) error:%v,s.sessionToken(), len(iovec), perrors.WithStack(err))s.stop()// break LOOPflag false}case -wheel.After(s.period):if flag {if wsFlag {err : wsConn.writePing()if err ! nil {log.Warnf(wsConn.writePing() error:%v, perrors.WithStack(err))}}// 定时执行的逻辑心跳等s.listener.OnCron(s)}}} } 通过上面的代码我们不难发现handleLoop() 方法处理的是发送报文的逻辑RPC 需要发送的消息首先由 s.writer 编码成二进制比特然后通过建立的 TCP 连接发送出去。这个 s.writer 对应的 Writer 接口是 RPC 框架必须要实现的一个接口。 继续看 handlePackage() 方法 func (s *session) handlePackage() {// 省略部分代码if _, ok : s.Connection.(*gettyTCPConn); ok {if s.reader nil {errStr : fmt.Sprintf(session{name:%s, conn:%#v, reader:%#v}, s.name, s.Connection, s.reader)log.Error(errStr)panic(errStr)}err s.handleTCPPackage()} else if _, ok : s.Connection.(*gettyWSConn); ok {err s.handleWSPackage()} else if _, ok : s.Connection.(*gettyUDPConn); ok {err s.handleUDPPackage()} else {panic(fmt.Sprintf(unknown type session{%#v}, s))} } 进入 handleTCPPackage() 方法 func (s *session) handleTCPPackage() error {// 省略部分代码conn s.Connection.(*gettyTCPConn)for {// 省略部分代码bufLen 0for {// for clause for the network timeout condition check// s.conn.SetReadTimeout(time.Now().Add(s.rTimeout))// 从 TCP 连接中收到报文bufLen, err conn.recv(buf)// 省略部分代码break}// 省略部分代码// 将收到的报文二进制比特写入 pkgBufpktBuf.Write(buf[:bufLen])for {if pktBuf.Len() 0 {break}// 通过 s.reader 将收到的报文解码成 RPC 消息pkg, pkgLen, err s.reader.Read(s, pktBuf.Bytes())// 省略部分代码s.UpdateActive()// 将收到的消息放入 TaskQueue 供 RPC 消费端消费s.addTask(pkg)pktBuf.Next(pkgLen)// continue to handle case 5}if exit {break}}return perrors.WithStack(err) } 从上面的代码逻辑我们分析出RPC 消费端需要将从 TCP 连接收到的二进制比特报文解码成 RPC 能消费的消息这个工作由 s.reader 实现所以我们要构建 RPC 通信层也需要实现 s.reader 对应的 Reader 接口。 3. 底层处理网络报文的逻辑如何与业务逻辑解耦 我们都知道netty 通过 boss 线程和 worker 线程实现了底层网络逻辑和业务逻辑的解耦。那么getty 是如何实现的呢 在 handlePackage() 方法最后我们看到收到的消息被放入了 s.addTask(pkg) 这个方法接着往下分析 func (s *session) addTask(pkg interface{}) {f : func() {s.listener.OnMessage(s, pkg)s.incReadPkgNum()}if taskPool : s.EndPoint().GetTaskPool(); taskPool ! nil {taskPool.AddTaskAlways(f)return}f() } pkg 参数传递到了一个匿名方法这个方法最终放入了 taskPool。这个方法很关键在我后来写 seata-golang 代码的时候就遇到了一个坑这个坑后面分析。 接着我们看一下 taskPool 的定义 // NewTaskPoolSimple build a simple task pool func NewTaskPoolSimple(size int) GenericTaskPool {if size 1 {size runtime.NumCPU() * 100}return taskPoolSimple{work: make(chan task),sem: make(chan struct{}, size),done: make(chan struct{}),} } 构建了一个缓冲大小为 size 默认为  runtime.NumCPU() * 100 的 channel sem。再看方法 AddTaskAlways(t task) func (p *taskPoolSimple) AddTaskAlways(t task) {select {case -p.done:returndefault:}select {case p.work - t:returndefault:}select {case p.work - t:case p.sem - struct{}{}:p.wg.Add(1)go p.worker(t)default:goSafely(t)} } 加入的任务会先由 len(p.sem) 个 goroutine 去消费如果没有 goroutine 空闲则会启动一个临时的 goroutine 去运行 t()。相当于有  len(p.sem) 个 goroutine 组成了 goroutine poolpool 中的 goroutine 去处理业务逻辑而不是由处理网络报文的 goroutine 去运行业务逻辑从而实现了解耦。写 seata-golang 时遇到的一个坑就是忘记设置 taskPool 造成了处理业务逻辑和处理底层网络报文逻辑的 goroutine 是同一个我在业务逻辑中阻塞等待一个任务完成时阻塞了整个 goroutine使得阻塞期间收不到任何报文。 4. 具体实现 下面的代码见 getty.go // Reader is used to unmarshal a complete pkg from buffer type Reader interface {Read(Session, []byte) (interface{}, int, error) }// Writer is used to marshal pkg and write to session type Writer interface {// if Session is udpGettySession, the second parameter is UDPContext.Write(Session, interface{}) ([]byte, error) }// ReadWriter interface use for handle application packages type ReadWriter interface {ReaderWriter } // EventListener is used to process pkg that received from remote session type EventListener interface {// invoked when session opened// If the return error is not nil, Session will be closed.OnOpen(Session) error// invoked when session closed.OnClose(Session)// invoked when got error.OnError(Session, error)// invoked periodically, its period can be set by (Session)SetCronPeriodOnCron(Session)// invoked when getty received a package. Pls attention that do not handle long time// logic processing in this func. Youd better set the packages maximum length.// If the messages length is greater than it, u should should return err in// Reader{Read} and getty will close this connection soon.//// If ur logic processing in this func will take a long time, u should start a goroutine// pool(like working thread pool in cpp) to handle the processing asynchronously. Or u// can do the logic processing in other asynchronous way.// !!!In short, ur OnMessage callback func should return asap.//// If this is a udp event listener, the second parameter type is UDPContext.OnMessage(Session, interface{}) } 通过对整个 getty 代码的分析我们只要实现  ReadWriter 来对 RPC  消息编解码再实现 EventListener 来处理 RPC 消息的对应的具体逻辑将 ReadWriter 实现和 EventLister 实现注入到 RPC 的 Client 和 Server 端则可实现 RPC 通信。 4.1 编解码协议实现 下面是 seata 协议的定义 在 ReadWriter 接口的实现 RpcPackageHandler 中调用 Codec 方法对消息体按照上面的格式编解码 // 消息编码为二进制比特 func MessageEncoder(codecType byte, in interface{}) []byte {switch codecType {case SEATA:return SeataEncoder(in)default:log.Errorf(not support codecType, %s, codecType)return nil} }// 二进制比特解码为消息体 func MessageDecoder(codecType byte, in []byte) (interface{}, int) {switch codecType {case SEATA:return SeataDecoder(in)default:log.Errorf(not support codecType, %s, codecType)return nil, 0} } 4.2 Client 端实现 再来看 client 端 EventListener 的实现 RpcRemotingClient func (client *RpcRemoteClient) OnOpen(session getty.Session) error {go func() request : protocal.RegisterTMRequest{AbstractIdentifyRequest: protocal.AbstractIdentifyRequest{ApplicationId: client.conf.ApplicationId,TransactionServiceGroup: client.conf.TransactionServiceGroup,}}// 建立连接后向 Transaction Coordinator 发起注册 TransactionManager 的请求_, err : client.sendAsyncRequestWithResponse(session, request, RPC_REQUEST_TIMEOUT)if err nil {// 将与 Transaction Coordinator 建立的连接保存在连接池供后续使用clientSessionManager.RegisterGettySession(session)client.GettySessionOnOpenChannel - session.RemoteAddr()}}()return nil }// OnError ... func (client *RpcRemoteClient) OnError(session getty.Session, err error) {clientSessionManager.ReleaseGettySession(session) }// OnClose ... func (client *RpcRemoteClient) OnClose(session getty.Session) {clientSessionManager.ReleaseGettySession(session) }// OnMessage ... func (client *RpcRemoteClient) OnMessage(session getty.Session, pkg interface{}) {log.Info(received message:{%v}, pkg)rpcMessage, ok : pkg.(protocal.RpcMessage)if ok {heartBeat, isHeartBeat : rpcMessage.Body.(protocal.HeartBeatMessage)if isHeartBeat heartBeat protocal.HeartBeatMessagePong {log.Debugf(received PONG from %s, session.RemoteAddr())}}if rpcMessage.MessageType protocal.MSGTYPE_RESQUEST ||rpcMessage.MessageType protocal.MSGTYPE_RESQUEST_ONEWAY {log.Debugf(msgId:%s, body:%v, rpcMessage.Id, rpcMessage.Body)// 处理事务消息提交 or 回滚client.onMessage(rpcMessage, session.RemoteAddr())} else {resp, loaded : client.futures.Load(rpcMessage.Id)if loaded {response : resp.(*getty2.MessageFuture)response.Response rpcMessage.Bodyresponse.Done - trueclient.futures.Delete(rpcMessage.Id)}} }// OnCron ... func (client *RpcRemoteClient) OnCron(session getty.Session) {// 发送心跳client.defaultSendRequest(session, protocal.HeartBeatMessagePing) } clientSessionManager.RegisterGettySession(session) 的逻辑将在下文中分析。 4.3 Server 端 Transaction Coordinator 实现 代码见 DefaultCoordinator func (coordinator *DefaultCoordinator) OnOpen(session getty.Session) error {log.Infof(got getty_session:%s, session.Stat())return nil }func (coordinator *DefaultCoordinator) OnError(session getty.Session, err error) {// 释放 TCP 连接SessionManager.ReleaseGettySession(session)session.Close()log.Errorf(getty_session{%s} got error{%v}, will be closed., session.Stat(), err) }func (coordinator *DefaultCoordinator) OnClose(session getty.Session) {log.Info(getty_session{%s} is closing......, session.Stat()) }func (coordinator *DefaultCoordinator) OnMessage(session getty.Session, pkg interface{}) {log.Debugf(received message:{%v}, pkg)rpcMessage, ok : pkg.(protocal.RpcMessage)if ok {_, isRegTM : rpcMessage.Body.(protocal.RegisterTMRequest)if isRegTM {// 将 TransactionManager 信息和 TCP 连接建立映射关系coordinator.OnRegTmMessage(rpcMessage, session)return}heartBeat, isHeartBeat : rpcMessage.Body.(protocal.HeartBeatMessage)if isHeartBeat heartBeat protocal.HeartBeatMessagePing {coordinator.OnCheckMessage(rpcMessage, session)return}if rpcMessage.MessageType protocal.MSGTYPE_RESQUEST ||rpcMessage.MessageType protocal.MSGTYPE_RESQUEST_ONEWAY {log.Debugf(msgId:%s, body:%v, rpcMessage.Id, rpcMessage.Body)_, isRegRM : rpcMessage.Body.(protocal.RegisterRMRequest)if isRegRM {// 将 ResourceManager 信息和 TCP 连接建立映射关系coordinator.OnRegRmMessage(rpcMessage, session)} else {if SessionManager.IsRegistered(session) {defer func() {if err : recover(); err ! nil {log.Errorf(Catch Exception while do RPC, request: %v,err: %w, rpcMessage, err)}}()// 处理事务消息全局事务注册、分支事务注册、分支事务提交、全局事务回滚等coordinator.OnTrxMessage(rpcMessage, session)} else {session.Close()log.Infof(close a unhandled connection! [%v], session)}}} else {resp, loaded : coordinator.futures.Load(rpcMessage.Id)if loaded {response : resp.(*getty2.MessageFuture)response.Response rpcMessage.Bodyresponse.Done - truecoordinator.futures.Delete(rpcMessage.Id)}}} }func (coordinator *DefaultCoordinator) OnCron(session getty.Session) {} coordinator.OnRegTmMessage(rpcMessage, session) 注册 Transaction Managercoordinator.OnRegRmMessage(rpcMessage, session) 注册 Resource Manager。具体逻辑分析见下文。 消息进入 coordinator.OnTrxMessage(rpcMessage, session) 方法将按照消息的类型码路由到具体的逻辑当中 switch msg.GetTypeCode() {case protocal.TypeGlobalBegin:req : msg.(protocal.GlobalBeginRequest)resp : coordinator.doGlobalBegin(req, ctx)return respcase protocal.TypeGlobalStatus:req : msg.(protocal.GlobalStatusRequest)resp : coordinator.doGlobalStatus(req, ctx)return respcase protocal.TypeGlobalReport:req : msg.(protocal.GlobalReportRequest)resp : coordinator.doGlobalReport(req, ctx)return respcase protocal.TypeGlobalCommit:req : msg.(protocal.GlobalCommitRequest)resp : coordinator.doGlobalCommit(req, ctx)return respcase protocal.TypeGlobalRollback:req : msg.(protocal.GlobalRollbackRequest)resp : coordinator.doGlobalRollback(req, ctx)return respcase protocal.TypeBranchRegister:req : msg.(protocal.BranchRegisterRequest)resp : coordinator.doBranchRegister(req, ctx)return respcase protocal.TypeBranchStatusReport:req : msg.(protocal.BranchReportRequest)resp : coordinator.doBranchReport(req, ctx)return respdefault:return nil} 4.4 session manager 分析 Client 端同 Transaction Coordinator 建立连接起连接后通过 clientSessionManager.RegisterGettySession(session) 将连接保存在 serverSessions sync.Map{} 这个 map 中。map 的 key 为从 session 中获取的 RemoteAddress 即 Transaction Coordinator 的地址value 为 session。这样Client 端就可以通过 map 中的一个 session 来向 Transaction Coordinator 注册 Transaction Manager 和 Resource Manager 了。具体代码见 getty_client_session_manager.go。 Transaction Manager 和 Resource Manager 注册到 Transaction Coordinator 后一个连接既有可能用来发送 TM 消息也有可能用来发送 RM 消息。我们通过 RpcContext 来标识一个连接信息 type RpcContext struct {Version stringTransactionServiceGroup stringClientRole meta.TransactionRoleApplicationId stringClientId stringResourceSets *model.SetSession getty.Session } 当收到事务消息时我们需要构造这样一个 RpcContext 供后续事务处理逻辑使用。所以我们会构造下列 map 来缓存映射关系 var (// session - transactionRole// TM will register before RM, if a session is not the TM registered,// it will be the RM registeredsession_transactionroles sync.Map{}// session - applicationIdidentified_sessions sync.Map{}// applicationId - ip - port - sessionclient_sessions sync.Map{}// applicationId - resourceIdsclient_resources sync.Map{} ) 这样Transaction Manager 和 Resource Manager 分别通过 coordinator.OnRegTmMessage(rpcMessage, session)和 coordinator.OnRegRmMessage(rpcMessage, session) 注册到 Transaction Coordinator 时会在上述 client_sessions map 中缓存 applicationId、ip、port 与 session 的关系在 client_resources map 中缓存 applicationId 与 resourceIds一个应用可能存在多个 Resource Manager 的关系。在需要时我们就可以通过上述映射关系构造一个 RpcContext。这部分的实现和 java 版 seata 有很大的不同感兴趣的可以深入了解一下。具体代码见 getty_session_manager.go。 至此我们就分析完了 seata-golang 整个 RPC 通信模型的机制。 三、seata-golang 的未来 seata-golang  从今年 4 月份开始开发到 8 月份基本实现和 java 版 seata 1.2 协议的互通对 mysql 数据库实现了 AT 模式自动协调分布式事务的提交回滚实现了 TCC 模式TC 端使用 mysql 存储数据使 TC 变成一个无状态应用支持高可用部署。下图展示了 AT 模式的原理 后续还有许多工作可以做比如对注册中心的支持、对配置中心的支持、和 java 版 seata 1.4 的协议互通、其他数据库的支持、raft transaction coordinator 的实现等希望对分布式事务问题感兴趣的开发者可以加入进来一起来打造一个完善的 golang 的分布式事务框架。如果你有任何疑问欢迎加入交流群【钉钉群号 33069364】。 另外欢迎对 dubbogo 感兴趣的朋友到 dubbogo 社区钉钉群钉钉群号 31363295沟通 dubbogo 技术问题。 原文链接 本文为阿里云原创内容未经允许不得转载。
http://www.zqtcl.cn/news/29932/

相关文章:

  • 关于做公司官方网站域名申请google推广技巧
  • 最简单网站建设网站标题psd
  • 有没有专门做游戏辅助的网站系统安装wordpress
  • 有没有做软件的网站2022年app拉新推广项目
  • 网站建设 博采中国最新军事新闻直播
  • 介休做网站全屋家具定制价格表
  • 求个没封的a站2022国都建设集团网站
  • 伍佰亿网站建设污水处理厂网站建设
  • 佛山网站建设佛山网站制作wordpress 微信 同步
  • 网站开发是什么意思啊wordpress 主题预览
  • 重庆公司建站网站建设使用软件
  • 怎么建设自己导购网站合肥网
  • asp化妆品网站 后台wordpress做商城网站吗
  • 虚拟主机怎么设计网站吗手机微信小程序免费制作平台
  • 网站备案有电话来dw个人网站建立教学
  • 网站开发 前端 外包中国国家培训网官网
  • 广州知名网站设计青岛seo优化
  • 贵阳市小程序网站开发公司怎么做微信小程序
  • 餐厅装修设计公司网站wordpress移动底部导航菜单
  • 网站建设题库及答案国内小程序最好的公司
  • 网站建设注意哪些内容网站做排名
  • wdcp 网站迁移网站建设扌金手指六六
  • 网站导航怎么做外链好看的网站地图样式
  • 设备建设网站做外贸哪个英文网站好
  • 运城市做网站制作公司内部网站
  • 国外做科研的网站园林效果图网站
  • 东莞网站营销推广公司专业制作公众号公司
  • 专业建站提供商wordpress积分系统
  • 天津个人网站制作广州企业网站建设
  • 上海企业网站制作哪家专业视频投票网站怎么做的