Spark內建框架rpc通訊機制及RpcEnv基礎設施-Spark商業實戰
1. Spark 內建框架rpc通訊機制
TransportContext 內部握有建立TransPortClient和TransPortServer的方法實現,但卻屬於最底層的RPC通訊設施。為什麼呢?
因為成員變數RPCHandler是抽象的,所以TransportContext只能為最底層的通訊基礎。上層為NettyRPCEnv高層封裝,並持有TransportContext引用,在TransportContext中傳入NettyRpcHandler實體,來實現netty通訊回撥Handler處理。請參照第二部分詳解。
/* The TransportServer and TransportClientFactory both create a TransportChannelHandler for each * channel. As each TransportChannelHandler contains a TransportClient, this enables server * processes to send messages back to the client on an existing channel. */ public class TransportContext { private final Logger logger = LoggerFactory.getLogger(TransportContext.class); private final TransportConf conf; private final RpcHandler rpcHandler; private final boolean closeIdleConnections; private final MessageEncoder encoder; private final MessageDecoder decoder; public TransportContext(TransportConf conf, RpcHandler rpcHandler) { this(conf, rpcHandler, false); }
1.1 客戶端和服務端統一的訊息接收處理器 TransportChannelHandlerer
transportClient 和TransportServer 在配置Netty的pipeLine的handler處理器時,均採用TransportChannelHandler,
不過transportClient對應的是TransportResponseHander,TransportServer對應的的是TransportRequestHander。 如下圖所示,在進行訊息處理時,首先會經過TransportChannelHandler根據訊息型別進行處理器選擇。
1.2 transportClient對應的是ResponseMessage
客戶端一旦傳送訊息(均為Request訊息),就會在
private final Map<Long, RpcResponseCallback> outstandingRpcs;
private final Map<StreamChunkId, ChunkReceivedCallback> outstandingFetches
中快取,用於回撥處理。
1.3 transportServer對應的是RequestMessage
服務端接收訊息型別(均為Request訊息)
- ChunkFetchRequest
- RpcRequest
- OneWayMessage
- StremRequest
服務端響應型別(均為Response訊息):
- ChunkFetchSucess
- ChunkFetchFailure
- RpcResponse
- RpcFailure
2. Spark RpcEnv基礎設施
2.1 上層建築NettyRPCEnv
上層建築NettyRPCEnv,持有TransportContext引用,在TransportContext中傳入NettyRpcHandler實體,來實現netty通訊回撥Handler處理
- Dispatcher
- TransportContext
- TransPortClientFactroy
- TransportServer
- TransportConf
2.2 RpcEndPoint 與 RPCEndPointRef 端點
- RpcEndPoint 為服務端
- RPCEndPointRef 為客戶端
2.2 Dispacher 與 Inbox 與 Outbox
- 一個端點對應一個Dispacher,一個Inbox , 多個OutBox
- RpcEndpoint:RPC端點 ,Spark針對於每個節點(Client/Master/Worker)都稱之一個Rpc端點 ,且都實現RpcEndpoint介面,內部根據不同端點的需求,設計不同的訊息和不同的業務處理,如果需要傳送(詢問)則呼叫Dispatcher
- RpcEnv:RPC上下文環境,每個Rpc端點執行時依賴的上下文環境稱之為RpcEnv
- Dispatcher:訊息分發器,針對於RPC端點需要傳送訊息或者從遠端RPC接收到的訊息,分發至對應的指令收件箱/發件箱。如果指令接收方是自己存入收件箱,如果指令接收方為非自身端點,則放入發件箱
- Inbox:指令訊息收件箱,一個本地端點對應一個收件箱,Dispatcher在每次向Inbox存入訊息時,都將對應EndpointData加入內部待Receiver Queue中,另外Dispatcher建立時會啟動一個單獨執行緒進行輪詢Receiver Queue,進行收件箱訊息消費
- OutBox:指令訊息發件箱,一個遠端端點對應一個發件箱,當訊息放入Outbox後,緊接著將訊息通過TransportClient傳送出去。訊息放入發件箱以及傳送過程是在同一個執行緒中進行,這樣做的主要原因是遠端訊息分為RpcOutboxMessage, OneWayOutboxMessage兩種訊息,而針對於需要應答的訊息直接傳送且需要得到結果進行處理
- TransportClient:Netty通訊客戶端,根據OutBox訊息的receiver資訊,請求對應遠端TransportServer
- TransportServer:Netty通訊服務端,一個RPC端點一個TransportServer,接受遠端訊息後呼叫Dispatcher分發訊息至對應收發件箱
Spark在Endpoint的設計上核心設計即為Inbox與Outbox,其中Inbox核心要點為:
- 內部的處理流程拆分為多個訊息指令(InboxMessage)存放入Inbox
- 當Dispatcher啟動最後,會啟動一個名為【dispatcher-event-loop】的執行緒掃描Inbox待處理InboxMessage,並呼叫Endpoint根據InboxMessage型別做相應處理
- 當Dispatcher啟動最後,預設會向Inbox存入OnStart型別的InboxMessage,Endpoint在根據OnStart指令做相關的額外啟動工作,三端啟動後所有的工作都是對OnStart指令處理衍生出來的,因此可以說OnStart指令是相互通訊的源頭
-
注意: 一個端點對應一個Dispacher,一個Inbox , 多個OutBox,可以看到 inbox在Dispacher 中且在EndPointData內部:
private final RpcHandler rpcHandler; /** * A message dispatcher, responsible for routing RPC messages to the appropriate endpoint(s). */ private[netty] class Dispatcher(nettyEnv: NettyRpcEnv) extends Logging { private class EndpointData( val name: String, val endpoint: RpcEndpoint, val ref: NettyRpcEndpointRef) { val inbox = new Inbox(ref, endpoint) } private val endpoints = new ConcurrentHashMap[String, EndpointData] private val endpointRefs = new ConcurrentHashMap[RpcEndpoint, RpcEndpointRef] // Track the receivers whose inboxes may contain messages. private val receivers = new LinkedBlockingQueue[EndpointData]
-
注意: 一個端點對應一個Dispacher,一個Inbox , 多個OutBox,可以看到 OutBox在NettyRpcEnv內部:
private[netty] class NettyRpcEnv( val conf: SparkConf, javaSerializerInstance: JavaSerializerInstance, host: String, securityManager: SecurityManager) extends RpcEnv(conf) with Logging { private val dispatcher: Dispatcher = new Dispatcher(this) private val streamManager = new NettyStreamManager(this) private val transportContext = new TransportContext(transportConf, new NettyRpcHandler(dispatcher, this, streamManager)) /** * A map for [[RpcAddress]] and [[Outbox]]. When we are connecting to a remote [[RpcAddress]], * we just put messages to its [[Outbox]] to implement a non-blocking `send` method. */ private val outboxes = new ConcurrentHashMap[RpcAddress, Outbox]()
3 結語
秦凱新 於深圳 2018-10-28