做网站的公司哪家好一点,做网站推广电话,网站单页在线,网站平台多少钱Spark的RPC通信一-初稿 文章目录 Spark的RPC通信一-初稿Spark的RPC顶层设计核心类NettyRpcEnv核心类RpcEndpoint核心类RpcEndpointRef Spark RPC消息的发送与接收实现核心类Inbox核心类Dispatcher核心类Outbox Spark的RPC顶层设计
在RpcEnv中定义了RPC通信框架的启动、停止和关…Spark的RPC通信一-初稿 文章目录 Spark的RPC通信一-初稿Spark的RPC顶层设计核心类NettyRpcEnv核心类RpcEndpoint核心类RpcEndpointRef Spark RPC消息的发送与接收实现核心类Inbox核心类Dispatcher核心类Outbox Spark的RPC顶层设计
在RpcEnv中定义了RPC通信框架的启动、停止和关闭等抽象方法表示RPC的顶层环境。唯一的子类NettyRpcEnv。
RpcEndpoints 需要向 RpcEnv 注册自己的名称以便接收信息。然后RpcEnv 将处理从 RpcEndpointRef 或远程节点发送的信息并将它们传送到相应的 RpcEndpoints。对于 RpcEnv 捕捉到的未捕获异常RpcEnv 会使用 RpcCallContext.sendFailure 将异常发回给发送者如果没有发送者或出现 NotSerializableException则记录异常。
RpcEnv 还提供了一些方法来检索给定名称或 uri 的 RpcEndpointRefs。 #mermaid-svg-d0CZExuMoO08sEMx {font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:16px;fill:#333;}#mermaid-svg-d0CZExuMoO08sEMx .error-icon{fill:#552222;}#mermaid-svg-d0CZExuMoO08sEMx .error-text{fill:#552222;stroke:#552222;}#mermaid-svg-d0CZExuMoO08sEMx .edge-thickness-normal{stroke-width:2px;}#mermaid-svg-d0CZExuMoO08sEMx .edge-thickness-thick{stroke-width:3.5px;}#mermaid-svg-d0CZExuMoO08sEMx .edge-pattern-solid{stroke-dasharray:0;}#mermaid-svg-d0CZExuMoO08sEMx .edge-pattern-dashed{stroke-dasharray:3;}#mermaid-svg-d0CZExuMoO08sEMx .edge-pattern-dotted{stroke-dasharray:2;}#mermaid-svg-d0CZExuMoO08sEMx .marker{fill:#333333;stroke:#333333;}#mermaid-svg-d0CZExuMoO08sEMx .marker.cross{stroke:#333333;}#mermaid-svg-d0CZExuMoO08sEMx svg{font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:16px;}#mermaid-svg-d0CZExuMoO08sEMx g.classGroup text{fill:#9370DB;fill:#131300;stroke:none;font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:10px;}#mermaid-svg-d0CZExuMoO08sEMx g.classGroup text .title{font-weight:bolder;}#mermaid-svg-d0CZExuMoO08sEMx .nodeLabel,#mermaid-svg-d0CZExuMoO08sEMx .edgeLabel{color:#131300;}#mermaid-svg-d0CZExuMoO08sEMx .edgeLabel .label rect{fill:#ECECFF;}#mermaid-svg-d0CZExuMoO08sEMx .label text{fill:#131300;}#mermaid-svg-d0CZExuMoO08sEMx .edgeLabel .label span{background:#ECECFF;}#mermaid-svg-d0CZExuMoO08sEMx .classTitle{font-weight:bolder;}#mermaid-svg-d0CZExuMoO08sEMx .node rect,#mermaid-svg-d0CZExuMoO08sEMx .node circle,#mermaid-svg-d0CZExuMoO08sEMx .node ellipse,#mermaid-svg-d0CZExuMoO08sEMx .node polygon,#mermaid-svg-d0CZExuMoO08sEMx .node path{fill:#ECECFF;stroke:#9370DB;stroke-width:1px;}#mermaid-svg-d0CZExuMoO08sEMx .divider{stroke:#9370DB;stroke:1;}#mermaid-svg-d0CZExuMoO08sEMx g.clickable{cursor:pointer;}#mermaid-svg-d0CZExuMoO08sEMx g.classGroup rect{fill:#ECECFF;stroke:#9370DB;}#mermaid-svg-d0CZExuMoO08sEMx g.classGroup line{stroke:#9370DB;stroke-width:1;}#mermaid-svg-d0CZExuMoO08sEMx .classLabel .box{stroke:none;stroke-width:0;fill:#ECECFF;opacity:0.5;}#mermaid-svg-d0CZExuMoO08sEMx .classLabel .label{fill:#9370DB;font-size:10px;}#mermaid-svg-d0CZExuMoO08sEMx .relation{stroke:#333333;stroke-width:1;fill:none;}#mermaid-svg-d0CZExuMoO08sEMx .dashed-line{stroke-dasharray:3;}#mermaid-svg-d0CZExuMoO08sEMx #compositionStart,#mermaid-svg-d0CZExuMoO08sEMx .composition{fill:#333333!important;stroke:#333333!important;stroke-width:1;}#mermaid-svg-d0CZExuMoO08sEMx #compositionEnd,#mermaid-svg-d0CZExuMoO08sEMx .composition{fill:#333333!important;stroke:#333333!important;stroke-width:1;}#mermaid-svg-d0CZExuMoO08sEMx #dependencyStart,#mermaid-svg-d0CZExuMoO08sEMx .dependency{fill:#333333!important;stroke:#333333!important;stroke-width:1;}#mermaid-svg-d0CZExuMoO08sEMx #dependencyStart,#mermaid-svg-d0CZExuMoO08sEMx .dependency{fill:#333333!important;stroke:#333333!important;stroke-width:1;}#mermaid-svg-d0CZExuMoO08sEMx #extensionStart,#mermaid-svg-d0CZExuMoO08sEMx .extension{fill:#333333!important;stroke:#333333!important;stroke-width:1;}#mermaid-svg-d0CZExuMoO08sEMx #extensionEnd,#mermaid-svg-d0CZExuMoO08sEMx .extension{fill:#333333!important;stroke:#333333!important;stroke-width:1;}#mermaid-svg-d0CZExuMoO08sEMx #aggregationStart,#mermaid-svg-d0CZExuMoO08sEMx .aggregation{fill:#ECECFF!important;stroke:#333333!important;stroke-width:1;}#mermaid-svg-d0CZExuMoO08sEMx #aggregationEnd,#mermaid-svg-d0CZExuMoO08sEMx .aggregation{fill:#ECECFF!important;stroke:#333333!important;stroke-width:1;}#mermaid-svg-d0CZExuMoO08sEMx .edgeTerminals{font-size:11px;}#mermaid-svg-d0CZExuMoO08sEMx :root{--mermaid-font-family:"trebuchet ms",verdana,arial,sans-serif;} 继承 继承 继承 继承 继承 继承 继承 继承 继承 继承 继承 Association Association Dispatcher -endpoints -endpointRefs -receivers -stopped -threadpool registerRpcEndpoint(name: String, endpoint: RpcEndpoint) getRpcEndpointRef(endpoint: RpcEndpoint) removeRpcEndpointRef(endpoint: RpcEndpoint) unregisterRpcEndpoint(name: String) postToAll(message: InboxMessage) postMessage(endpointName: String,message: InboxMessage,callbackIfStopped: (Exception) Unit) «trait» RpcEndpoint rpcEnv: RpcEnv self() receive() receiveAndReply(context: RpcCallContext) onError(cause: Throwable) onConnected(remoteAddress: RpcAddress) onDisconnected(remoteAddress: RpcAddress) onNetworkError(cause: Throwable, remoteAddress: RpcAddress) onStart() onStop() stop() «trait» ThreadSafeRpcEndpoint «trait» RpcEnvFactory create(config: RpcEnvConfig) NettyRpcEnvFactory create(config: RpcEnvConfig) «abstract» RpcEndpointRef «abstract» RpcEnv NettyRpcEnv -dispatcher: Dispatcher -streamManager:NettyStreamManager -transportContext:TransportContext -clientFactory:TransportClientFactory startServer(bindAddress: String, port: Int) setupEndpoint(name: String, endpoint: RpcEndpoint) send(message: RequestMessage) ask[T: ClassTag](message: RequestMessage, timeout: RpcTimeout) postToOutbox(receiver: NettyRpcEndpointRef, message: OutboxMessage) asyncSetupEndpointRefByURI(uri: String) DummyMaster ClientEndpoint DriverEndpoint Master Worker LocalEndpoint 其他 NettyRpcEndpointRef 在RpcEnvFactory中定义了创建RpcEnv的抽象方法在NettyRpcEnv和NettyRpcEnvFactory中使用Netty对继承的方式进行了实现。
在NettRpcEnv中启动终端点方法setEndpoint中会将RpcEndpoint和RpcEndpointRef相互以键值对的形式存储到ConcurrentHashMap中最后在RpcEnv的object类中通过反射方式实现创建RpcEnv的实例的静态方法。
核心类NettyRpcEnv
NettyRpcEnv的核心成员和核心方法
transportConfTransportConf的实例对象加载一些关于RPC的配置项dispatcherDispatcher的实例对象消息转发器将RPC消息路由到要该对此消息处理的RpcEndpoint。streamManagerNettyStreamManager的实例对象流的管理器为NettyRpcEnv提供流式服务。transportContextTransportContext的实例对象clientFactory 用于构造发送和接收响应的TransportClientfileDownloadFactory 用于文件下载的独立客户端工厂。这样可以避免使用与主 RPC 上下文相同的 RPC 处理程序从而将这些客户端引起的事件与主 RPC 流量隔离开来。它还允许对某些属性进行不同的配置例如每个对等节点的连接数。serverTransportServer提供高效的底层流媒体服务。ConcurrentHashMap[RpcAddress, Outbox] outboxes远程地址与Outbox的映射map。startServer(bindAddress: String, port: Int) 创建一个TransportServer向消息转发器中注册RpcEndpointVerifierRpcEndpointVerifier的注册名称为endpoint-verifier用来校验RpcEndpoint是否存在的RpcEndpoint服务 send(message: RequestMessage): Unit 发送消息时将本地消息交于InBox远程消息交于OutBox ask[T: ClassTag](message: RequestMessage, timeout: RpcTimeout) 若请求消息的接收者的地址与当前的NettyRpcEnv的地址相同将消息交通过dispatcher.postLocalMessage(message, p)方法处理p中是成功和失败的回调函数。若请求消息的的接收者的地址与当前的NettyRpcEnv的地址不同时将消息通过postToOutbox(message.receiver, rpcMessage)方法处理主要是将消息放入outbox然后传输到远程地址上。在方法的最后设定了一个定时器实现消息请求的超时机制。 postToOutbox(receiver: NettyRpcEndpointRef, message: OutboxMessage)将消息传到远程节点上 如果receiver.client不为空那么消息将直接通过TransportClient发送到远端节点如果receiver.client为空则获取远端结点地址对应的Outbox若没有则新建一个如果NettyRpcEnv已经停止移除该Outbox并停止否则调用Outbox.send()发送消息。
核心类RpcEndpoint
RpcEndpoint是对能够处理RPC请求给某一特定服务提供本地调用及跨节点调用的RPC组件的抽象所有运行于RPC框架之上的实体都应该继承RpcEndpoint。
RPC 的RpcEndpoint它定义了给定消息时要触发的函数。保证按调用顺序为 onStart、receive 和 onStop。RpcEndpoint的生命周期为constructor - onStart - receive* - onStop。receive 可以并发调用。如果希望接收是线程安全的则需要请使用 ThreadSafeRpcEndpoint。如果 RpcEndpoint 方法onError 除外抛出任何错误onError 将被调用并说明原因。如果 onError 抛出错误RpcEnv会将忽略。
ThreadSafeRpcEndpoint是继承自RpcEndpoint的特质需要 RpcEnv 以线程安全方式向其发送消息的特性。主要用于对消息的处理必须是线程安全的场景。ThreadSafeRpcEndpoint对消息的处理都是串行的即前一条消息处理完才能接着处理下一条消息。
核心类RpcEndpointRef
远程 RpcEndpoint 的引用。RpcEndpointRef 是线程安全的。用于消息发送方持有并发送消息。
核心成员
maxRetries最大尝试连接次数。可以通过spark.rpc.numRetries参数指定默认3次retryWaitMs每次尝试连接最大等待毫秒值。可以通过spark.rpc.retry.wait默认3秒defaultAskTimeoutRPC ask操作的超时时间。可以通过spark.rpc.askTimeout默认120秒address远程RpcEndpoint引用的地址name远程RpcEndpoint引用的名称
核心方法
send()发送单向异步信息。只管发送不管结果。ask()系列向远程的RpcEndpoint.receiveAndReply()方法发送消息并带有超时机制的Future。该类方法只发送一次消息从不重试。askSync()系列向相应的 RpcEndpoint.receiveAndReply 发送消息并在指定超时内获取结果如果失败则抛出异常。 这是一个阻塞操作可能会耗费大量时间因此不要在 RpcEndpoint 的消息循环中调用它。
NettyRpcEndpointRef是其唯一的继承类。重写了ask()和send()方法主要是消息封装成RequestMessage然后通过nettyEnv的ask和send方法将消息发送出去。
客户端发送请求简单示例图 #mermaid-svg-8JNaie07WjKgUzlw {font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:16px;fill:#333;}#mermaid-svg-8JNaie07WjKgUzlw .error-icon{fill:#552222;}#mermaid-svg-8JNaie07WjKgUzlw .error-text{fill:#552222;stroke:#552222;}#mermaid-svg-8JNaie07WjKgUzlw .edge-thickness-normal{stroke-width:2px;}#mermaid-svg-8JNaie07WjKgUzlw .edge-thickness-thick{stroke-width:3.5px;}#mermaid-svg-8JNaie07WjKgUzlw .edge-pattern-solid{stroke-dasharray:0;}#mermaid-svg-8JNaie07WjKgUzlw .edge-pattern-dashed{stroke-dasharray:3;}#mermaid-svg-8JNaie07WjKgUzlw .edge-pattern-dotted{stroke-dasharray:2;}#mermaid-svg-8JNaie07WjKgUzlw .marker{fill:#333333;stroke:#333333;}#mermaid-svg-8JNaie07WjKgUzlw .marker.cross{stroke:#333333;}#mermaid-svg-8JNaie07WjKgUzlw svg{font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:16px;}#mermaid-svg-8JNaie07WjKgUzlw .label{font-family:"trebuchet ms",verdana,arial,sans-serif;color:#333;}#mermaid-svg-8JNaie07WjKgUzlw .cluster-label text{fill:#333;}#mermaid-svg-8JNaie07WjKgUzlw .cluster-label span{color:#333;}#mermaid-svg-8JNaie07WjKgUzlw .label text,#mermaid-svg-8JNaie07WjKgUzlw span{fill:#333;color:#333;}#mermaid-svg-8JNaie07WjKgUzlw .node rect,#mermaid-svg-8JNaie07WjKgUzlw .node circle,#mermaid-svg-8JNaie07WjKgUzlw .node ellipse,#mermaid-svg-8JNaie07WjKgUzlw .node polygon,#mermaid-svg-8JNaie07WjKgUzlw .node path{fill:#ECECFF;stroke:#9370DB;stroke-width:1px;}#mermaid-svg-8JNaie07WjKgUzlw .node .label{text-align:center;}#mermaid-svg-8JNaie07WjKgUzlw .node.clickable{cursor:pointer;}#mermaid-svg-8JNaie07WjKgUzlw .arrowheadPath{fill:#333333;}#mermaid-svg-8JNaie07WjKgUzlw .edgePath .path{stroke:#333333;stroke-width:2.0px;}#mermaid-svg-8JNaie07WjKgUzlw .flowchart-link{stroke:#333333;fill:none;}#mermaid-svg-8JNaie07WjKgUzlw .edgeLabel{background-color:#e8e8e8;text-align:center;}#mermaid-svg-8JNaie07WjKgUzlw .edgeLabel rect{opacity:0.5;background-color:#e8e8e8;fill:#e8e8e8;}#mermaid-svg-8JNaie07WjKgUzlw .cluster rect{fill:#ffffde;stroke:#aaaa33;stroke-width:1px;}#mermaid-svg-8JNaie07WjKgUzlw .cluster text{fill:#333;}#mermaid-svg-8JNaie07WjKgUzlw .cluster span{color:#333;}#mermaid-svg-8JNaie07WjKgUzlw div.mermaidTooltip{position:absolute;text-align:center;max-width:200px;padding:2px;font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:12px;background:hsl(80, 100%, 96.2745098039%);border:1px solid #aaaa33;border-radius:2px;pointer-events:none;z-index:100;}#mermaid-svg-8JNaie07WjKgUzlw :root{--mermaid-font-family:"trebuchet ms",verdana,arial,sans-serif;} Client Server RpcEnv outboxs outbox messages RpcEnv 5 3 4 1 2 Dispatcher receivers threadpool EndpointData messages 1.1 1.2 1.2 1.3 EndpointData MessageLoop InboxMessage Inbox RpcEndpoint NettyRpcEndpointRef InboxMessage EndpointData MessageLoop NettyRpcEndPointRef outbox outbox TransportClient OutboxMessage OutboxMessage RpcEndPoint Dispatcher 若是向本地节点的RpcEndpoint发送消息 通过调用NettyRpcEndpointRef的send()和ask()方法向本地节点的RpcEndpoint发送消息。由于是在同一节点所以直接调用Dispatcher的postLocalMessage()或postOneWayMessage()方法将消息放入EndpointData内部Inbox的messages中。InboxMessage放入后Inbox后Inbox所属的endPointData就会放入receivers一旦receivers中有数据原本阻塞的MessageLoop就可以取到数据MessageLoop将调用inbox.process()方法消息的处理。对不同的消息类型调用endpoint的不同回调函数即完成了消息的处理。 通过调用NettyRpcEndpointRef的send()和ask()方法向远端节点的RpcEndpoint发送消息。消息将首先被封装为OutboxMessage然后放入到远端RpcEndpoint的地址所对应的Outbox的messages中。每个Outbox的drainOutbox()方法通过循环不断从messages列表中取得OutboxMessage并通过TransportClient发送底层依赖Netty。TransportClient和远端NettyRpcEnv的TransportServer建立了连接后请求消息首先经过Netty管道的处理由TransportChannelHandler将消息分发给TransportRequestHandler最终会调用NettyRpcHandler或StreamManager处理。如果是RPC消息则会调用NettyRpcHandler.receive()方法之后与第一步所述一致调用Dispatcher的postRemoteMessage()或postOneWayMessage()方法。如果TransportRequestHandler处理的是RpcRequest那么server端的TransportRequestHandler处理消息时还会对client端进行响应依赖Netty将响应消息发送给client端。client端接收到消息时由TransportChannelHandler将消息分发给TransportResponseHandler处理。
Spark RPC消息的发送与接收实现
OutboxMessage在客户端使用是对外发送消息的封装。InboxMessage在服务端使用是对接收消息的封装。 #mermaid-svg-lBRXp7roQgEctFdG {font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:16px;fill:#333;}#mermaid-svg-lBRXp7roQgEctFdG .error-icon{fill:#552222;}#mermaid-svg-lBRXp7roQgEctFdG .error-text{fill:#552222;stroke:#552222;}#mermaid-svg-lBRXp7roQgEctFdG .edge-thickness-normal{stroke-width:2px;}#mermaid-svg-lBRXp7roQgEctFdG .edge-thickness-thick{stroke-width:3.5px;}#mermaid-svg-lBRXp7roQgEctFdG .edge-pattern-solid{stroke-dasharray:0;}#mermaid-svg-lBRXp7roQgEctFdG .edge-pattern-dashed{stroke-dasharray:3;}#mermaid-svg-lBRXp7roQgEctFdG .edge-pattern-dotted{stroke-dasharray:2;}#mermaid-svg-lBRXp7roQgEctFdG .marker{fill:#333333;stroke:#333333;}#mermaid-svg-lBRXp7roQgEctFdG .marker.cross{stroke:#333333;}#mermaid-svg-lBRXp7roQgEctFdG svg{font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:16px;}#mermaid-svg-lBRXp7roQgEctFdG g.classGroup text{fill:#9370DB;fill:#131300;stroke:none;font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:10px;}#mermaid-svg-lBRXp7roQgEctFdG g.classGroup text .title{font-weight:bolder;}#mermaid-svg-lBRXp7roQgEctFdG .nodeLabel,#mermaid-svg-lBRXp7roQgEctFdG .edgeLabel{color:#131300;}#mermaid-svg-lBRXp7roQgEctFdG .edgeLabel .label rect{fill:#ECECFF;}#mermaid-svg-lBRXp7roQgEctFdG .label text{fill:#131300;}#mermaid-svg-lBRXp7roQgEctFdG .edgeLabel .label span{background:#ECECFF;}#mermaid-svg-lBRXp7roQgEctFdG .classTitle{font-weight:bolder;}#mermaid-svg-lBRXp7roQgEctFdG .node rect,#mermaid-svg-lBRXp7roQgEctFdG .node circle,#mermaid-svg-lBRXp7roQgEctFdG .node ellipse,#mermaid-svg-lBRXp7roQgEctFdG .node polygon,#mermaid-svg-lBRXp7roQgEctFdG .node path{fill:#ECECFF;stroke:#9370DB;stroke-width:1px;}#mermaid-svg-lBRXp7roQgEctFdG .divider{stroke:#9370DB;stroke:1;}#mermaid-svg-lBRXp7roQgEctFdG g.clickable{cursor:pointer;}#mermaid-svg-lBRXp7roQgEctFdG g.classGroup rect{fill:#ECECFF;stroke:#9370DB;}#mermaid-svg-lBRXp7roQgEctFdG g.classGroup line{stroke:#9370DB;stroke-width:1;}#mermaid-svg-lBRXp7roQgEctFdG .classLabel .box{stroke:none;stroke-width:0;fill:#ECECFF;opacity:0.5;}#mermaid-svg-lBRXp7roQgEctFdG .classLabel .label{fill:#9370DB;font-size:10px;}#mermaid-svg-lBRXp7roQgEctFdG .relation{stroke:#333333;stroke-width:1;fill:none;}#mermaid-svg-lBRXp7roQgEctFdG .dashed-line{stroke-dasharray:3;}#mermaid-svg-lBRXp7roQgEctFdG #compositionStart,#mermaid-svg-lBRXp7roQgEctFdG .composition{fill:#333333!important;stroke:#333333!important;stroke-width:1;}#mermaid-svg-lBRXp7roQgEctFdG #compositionEnd,#mermaid-svg-lBRXp7roQgEctFdG .composition{fill:#333333!important;stroke:#333333!important;stroke-width:1;}#mermaid-svg-lBRXp7roQgEctFdG #dependencyStart,#mermaid-svg-lBRXp7roQgEctFdG .dependency{fill:#333333!important;stroke:#333333!important;stroke-width:1;}#mermaid-svg-lBRXp7roQgEctFdG #dependencyStart,#mermaid-svg-lBRXp7roQgEctFdG .dependency{fill:#333333!important;stroke:#333333!important;stroke-width:1;}#mermaid-svg-lBRXp7roQgEctFdG #extensionStart,#mermaid-svg-lBRXp7roQgEctFdG .extension{fill:#333333!important;stroke:#333333!important;stroke-width:1;}#mermaid-svg-lBRXp7roQgEctFdG #extensionEnd,#mermaid-svg-lBRXp7roQgEctFdG .extension{fill:#333333!important;stroke:#333333!important;stroke-width:1;}#mermaid-svg-lBRXp7roQgEctFdG #aggregationStart,#mermaid-svg-lBRXp7roQgEctFdG .aggregation{fill:#ECECFF!important;stroke:#333333!important;stroke-width:1;}#mermaid-svg-lBRXp7roQgEctFdG #aggregationEnd,#mermaid-svg-lBRXp7roQgEctFdG .aggregation{fill:#ECECFF!important;stroke:#333333!important;stroke-width:1;}#mermaid-svg-lBRXp7roQgEctFdG .edgeTerminals{font-size:11px;}#mermaid-svg-lBRXp7roQgEctFdG :root{--mermaid-font-family:"trebuchet ms",verdana,arial,sans-serif;} 继承 继承 继承 继承 继承 继承 继承 继承 继承 聚合 聚合 Dispatcher -endpoints -endpointRefs -receivers -stopped -threadpool registerRpcEndpoint(name: String, endpoint: RpcEndpoint) getRpcEndpointRef(endpoint: RpcEndpoint) removeRpcEndpointRef(endpoint: RpcEndpoint) unregisterRpcEndpoint(name: String) postToAll(message: InboxMessage) postMessage(endpointName: String,message: InboxMessage,callbackIfStopped: (Exception) Unit) EndpointData MessageLoop Inbox #messages -stopped -enableConcurrent -numActiveThreads process(dispatcher: Dispatcher) post(message: InboxMessage) stop() Outbox -messages -client -connectFuture -stopped -draining send(message: OutboxMessage) -drainOutbox() -launchConnectTask() -handleNetworkFailure(e: Throwable) -closeClient() stop() RpcMessage OnStart OnStop RemoteProcessConnected RemoteProcessDisconnected RemoteProcessConnectionError OneWayMessage «trait» InboxMessage OneWayOutboxMessage «trait» OutboxMessage sendWith(client: TransportClient) onFailure(e: Throwable) RpcOutboxMessage InboxMessage是一个scala特质类所有的RPC消息都继承自InboxMessage。下面是继承自InboxMessage的子类
OneWayMessageRpcEndpoint处理此类型的消息后不需要向客户端回复信息。RpcMessageRpcEndpoint处理完此消息后需要向客户端回复信息。OnStartInbox实例化后再通知与此Inbox相关联的RpcEndpoint启动。OnStopInbox停止后通知与此Inbox相关联的RpcEndpoint停止。RemoteProcessConnected告诉所有的RpcEndpoint有远端的进程已经与当前RPC服务建立了连接。RemoteProcessDisconnected告诉所有的RpcEndpoint有远端的进程已经与当前RPC服务断开了连接。RemoteProcessConnectionError告诉所有的RpcEndpoint与远端某个地址之间的连接发生了错误。
核心类Inbox
Inbox为RpcEndpoint存储了消息即InboxMessage并线程安全地发送给RpcEndPoint。
private[netty] class Inbox(val endpointRef: NettyRpcEndpointRef,val endpoint: RpcEndpoint)extends Logging {//相当于给this起了一个别名为inboxinbox }重要的属性
messages所有的消息以消息盒子的方式通过LinkedList链式存储enableConcurrent是否同时允许多线程同时处理消息numActiveThreadsInbox中正在处理消息的线程数
重要方法 post()将InboxMessage投递到box中从下面的代码可以看出使用了synchronized保证线程安全如果该box已经关闭消息将会丢弃。 def post(message: InboxMessage): Unit inbox.synchronized {if (stopped) {// 日志进行warning输出onDrop(message)} else {messages.add(message)false}
}process()处理存储在messages中的消息。 def process(dispatcher: Dispatcher): Unit {var message: InboxMessage null// 1.以synchronized进行并发检查开启并发则取消息numActiveThreads自增1。inbox.synchronized {if (!enableConcurrent numActiveThreads ! 0) {return}message messages.poll()if (message ! null) {numActiveThreads 1} else {return}}while (true) {// 安全回调处理异常的safelyCall(endpoint) {//对不同消息通过模式匹配进行通过不同的endpoint进行处理message match {case RpcMessage(_sender, content, context) try {endpoint.receiveAndReply(context).applyOrElse[Any, Unit](content, { msg throw new SparkException(sUnsupported message $message from ${_sender})})} catch {case e: Throwable context.sendFailure(e)// Throw the exception -- this exception will be caught by the safelyCall function.// The endpoints onError function will be called.throw e}case OneWayMessage(_sender, content) endpoint.receive.applyOrElse[Any, Unit](content, { msg throw new SparkException(sUnsupported message $message from ${_sender})})case OnStart endpoint.onStart()if (!endpoint.isInstanceOf[ThreadSafeRpcEndpoint]) {inbox.synchronized {if (!stopped) {enableConcurrent true}}}case OnStop val activeThreads inbox.synchronized { inbox.numActiveThreads }assert(activeThreads 1,sThere should be only a single active thread but found $activeThreads threads.)dispatcher.removeRpcEndpointRef(endpoint)endpoint.onStop()assert(isEmpty, OnStop should be the last message)case RemoteProcessConnected(remoteAddress) endpoint.onConnected(remoteAddress)case RemoteProcessDisconnected(remoteAddress) endpoint.onDisconnected(remoteAddress)case RemoteProcessConnectionError(cause, remoteAddress) endpoint.onNetworkError(cause, remoteAddress)}}inbox.synchronized {// 调用 onStop 后enableConcurrent 将被设置为 false所以需要每次都检查它。if (!enableConcurrent numActiveThreads ! 1) {// 此线程退出降低并发最终归于一个线程处理剩下的消息numActiveThreads - 1return}message messages.poll()// 没有消息之后退出当前循环if (message null) {numActiveThreads - 1return}}}}stop()enableConcurrent赋值为false保证当前是唯一活跃的线程。并在messages中添加onStop消息。 def stop(): Unit inbox.synchronized {// 优雅关闭是关闭并发只留一个线程处理消息。确保OnStop为最后一个消息这样RpcEndpoint.onStop 就可以安全地释放资源了。if (!stopped) {enableConcurrent falsestopped truemessages.add(OnStop)}
}核心类Dispatcher
Dispatcher负责将RPC消息路由到要该对此消息处理的RpcEndpoint。
内部类
EndpointData包装一个Inbox类。一个RpcEndpoint与NettyRpcEndpointRef映射关联在一起。即一个Inbox只为一个映射关系服务。MessageLoop用于转发信息的循环任务类从receivers中获取有消息的inbox进行处理。
重要属性
endpoints储存name和EndpointData的映射关系。EndpointData包含了nameRpcEndpoint, NettyRpcEndpointRef和Inbox采用ConcureentHashMap保证线程安全endpointRefs储存RpcEndpoint和RpcEndpointRef的映射关系。采用ConcureentHashMap保证线程安全receivers存储inbox中可能包含message的EndpointData。在MessageLoop中取出并处理消息。使用阻塞队列LinkedBlockingQueue存储。threadpool用于调度消息的线程池。根据spark.rpc.netty.dispatcher.numThreads创建固定大小的线程池启动与线程池大小相同个数的MessageLoop任务。
重要方法
registerRpcEndpoint()在调度器中注册endpoint。由name和RpcEndpoint构建NettyRpcEndpointRef并加入到endpoints, endpointRefs, receivers中postToAll()将message投递到在注册到该Dispatcher的所有RpcEndpoint。postMessage()将message投递到注册到该Dispatcher指定name的RpcEndpoint中并将EndpointData放入receivers中该方法中还传入了失败回调函数unregisterRpcEndpoint(), stop()注销所有已注册的RpcEndpoint从endpoints中移除并在inbox中增加了onstop消息。在receivers中插入哨兵等待receivers中的所有消息都处理完毕后关闭线程池。
Dispatcher中的消息处理流程。
postToAll()或者postxx()方法会调用postMessage()方法将InboxMessage放到对应endPointData里inbox的messages列表(调用inbox.post())InboxMessage放入后inbox后inbox所属的endPointData就会放入receivers一旦receivers中有数据原本阻塞的MessageLoop就可以取到数据因为receivers是一个阻塞队列MessageLoop将调用inbox.process()方法消息的处理。利用模式匹配对不同的消息类型调用endpoint的不同回调函数即完成了消息的处理。
核心类Outbox
OutboxMessage是一个特质内部只有未实现的SendWith方法和onFailure方法。OneWayOutboxMessage和RpcOutboxMessage都继承自OutboxMessage特质实现的SendWith通过调用TransportClient的sendRpc()方法发送信息其中RpcOutboxMessage还增加了超时和发送成功的回调方法。
Outbox的重要属性
messages: 保存要发送的OutboxMessage。LinkedList类型线程不安全client: TransportClientstopped: 当前Outbox是否停止的标识draining: 表示当前Outbox内正有线程在处理messages中消息的状态
重要方法 send()将要发送的OutboxMessage首先保存到成员变量链表messages中若Outbox未停止则调用drainOutbox()方法处理messages中的信息。因为messages是LinkedList类型线程不安全所以在添加和删除时使用了同步机制。之后调用了私有的drainOutbox()方法发送消息。发送信息。如果没有活动连接则缓存并启动新连接。如果[[发件箱]]被停止发送者将收到[[SparkException]]通知。 def send(message: OutboxMessage): Unit {val dropped synchronized {if (stopped) {true} else {messages.add(message)false}}if (dropped) {message.onFailure(new SparkException(Message is dropped because Outbox is stopped))} else {drainOutbox()}}drainOutbox()先判断是否已停止client是否空等前置条件。取出一条消息并将draining置为true接下来将messages中所有消息调用sendWith()方法发送。耗尽消息队列。如果有其他线程正在排空则直接退出。如果尚未建立连接则在 nettyEnv.clientConnectionExecutor 中启动一个任务来建立连接。 launchConnectTask() 初始化client stop()停止Outbox 将Outbox的停止状态stopped置为true关闭TransportClient清空messages中的消息
之所以要使用这种机制来发消息是保证并发发送消息时所有消息依次添加到Outbox中并依次传输同时不会阻塞send()方法