1. 程式人生 > >Spark內建框架rpc通訊機制及RpcEnv基礎設施-Spark商業實戰

Spark內建框架rpc通訊機制及RpcEnv基礎設施-Spark商業實戰

1. Spark 內建框架rpc通訊機制

TransportContext 內部握有建立TransPortClient和TransPortServer的方法實現,但卻屬於最底層的RPC通訊設施。為什麼呢?

因為成員變數RPCHandler是抽象的,所以TransportContext只能為最底層的通訊基礎。上層為NettyRPCEnv高層封裝,並持有TransportContext引用,在TransportContext中傳入NettyRpcHandler實體,來實現netty通訊回撥Handler處理。請參照第二部分詳解。

 /* The TransportServer and TransportClientFactory both create a TransportChannelHandler for each
 * channel. As each TransportChannelHandler contains a TransportClient, this enables server
 * processes to send messages back to the client on an existing channel.
 */
 
 
  public class TransportContext {
  private final Logger logger = LoggerFactory.getLogger(TransportContext.class);
  private final TransportConf conf;
  private final RpcHandler rpcHandler;
  private final boolean closeIdleConnections;

  private final MessageEncoder encoder;
  private final MessageDecoder decoder;

  public TransportContext(TransportConf conf, RpcHandler rpcHandler) {
    this(conf, rpcHandler, false);
  }

1.1 客戶端和服務端統一的訊息接收處理器 TransportChannelHandlerer

transportClient 和TransportServer 在配置Netty的pipeLine的handler處理器時,均採用TransportChannelHandler,

不過transportClient對應的是TransportResponseHander,TransportServer對應的的是TransportRequestHander。 如下圖所示,在進行訊息處理時,首先會經過TransportChannelHandler根據訊息型別進行處理器選擇。

1.2 transportClient對應的是ResponseMessage

客戶端一旦傳送訊息(均為Request訊息),就會在

private final Map<Long, RpcResponseCallback> outstandingRpcs;

private final Map<StreamChunkId, ChunkReceivedCallback> outstandingFetches

中快取,用於回撥處理。

1.3 transportServer對應的是RequestMessage

服務端接收訊息型別(均為Request訊息)

  • ChunkFetchRequest
  • RpcRequest
  • OneWayMessage
  • StremRequest

服務端響應型別(均為Response訊息):

  • ChunkFetchSucess
  • ChunkFetchFailure
  • RpcResponse
  • RpcFailure

2. Spark RpcEnv基礎設施

2.1 上層建築NettyRPCEnv

上層建築NettyRPCEnv,持有TransportContext引用,在TransportContext中傳入NettyRpcHandler實體,來實現netty通訊回撥Handler處理

  • Dispatcher
  • TransportContext
  • TransPortClientFactroy
  • TransportServer
  • TransportConf

2.2 RpcEndPoint 與 RPCEndPointRef 端點

  • RpcEndPoint 為服務端
  • RPCEndPointRef 為客戶端

2.2 Dispacher 與 Inbox 與 Outbox

  • 一個端點對應一個Dispacher,一個Inbox , 多個OutBox
  1. RpcEndpoint:RPC端點 ,Spark針對於每個節點(Client/Master/Worker)都稱之一個Rpc端點 ,且都實現RpcEndpoint介面,內部根據不同端點的需求,設計不同的訊息和不同的業務處理,如果需要傳送(詢問)則呼叫Dispatcher
  2. RpcEnv:RPC上下文環境,每個Rpc端點執行時依賴的上下文環境稱之為RpcEnv
  3. Dispatcher:訊息分發器,針對於RPC端點需要傳送訊息或者從遠端RPC接收到的訊息,分發至對應的指令收件箱/發件箱。如果指令接收方是自己存入收件箱,如果指令接收方為非自身端點,則放入發件箱
  4. Inbox:指令訊息收件箱,一個本地端點對應一個收件箱,Dispatcher在每次向Inbox存入訊息時,都將對應EndpointData加入內部待Receiver Queue中,另外Dispatcher建立時會啟動一個單獨執行緒進行輪詢Receiver Queue,進行收件箱訊息消費
  5. OutBox:指令訊息發件箱,一個遠端端點對應一個發件箱,當訊息放入Outbox後,緊接著將訊息通過TransportClient傳送出去。訊息放入發件箱以及傳送過程是在同一個執行緒中進行,這樣做的主要原因是遠端訊息分為RpcOutboxMessage, OneWayOutboxMessage兩種訊息,而針對於需要應答的訊息直接傳送且需要得到結果進行處理
  6. TransportClient:Netty通訊客戶端,根據OutBox訊息的receiver資訊,請求對應遠端TransportServer
  7. TransportServer:Netty通訊服務端,一個RPC端點一個TransportServer,接受遠端訊息後呼叫Dispatcher分發訊息至對應收發件箱

Spark在Endpoint的設計上核心設計即為Inbox與Outbox,其中Inbox核心要點為:

  1. 內部的處理流程拆分為多個訊息指令(InboxMessage)存放入Inbox
  2. 當Dispatcher啟動最後,會啟動一個名為【dispatcher-event-loop】的執行緒掃描Inbox待處理InboxMessage,並呼叫Endpoint根據InboxMessage型別做相應處理
  3. 當Dispatcher啟動最後,預設會向Inbox存入OnStart型別的InboxMessage,Endpoint在根據OnStart指令做相關的額外啟動工作,三端啟動後所有的工作都是對OnStart指令處理衍生出來的,因此可以說OnStart指令是相互通訊的源頭
  • 注意: 一個端點對應一個Dispacher,一個Inbox , 多個OutBox,可以看到 inbox在Dispacher 中且在EndPointData內部:

     private final RpcHandler rpcHandler;
    /**
    * A message dispatcher, responsible for routing RPC messages to the appropriate endpoint(s).
    */
     private[netty] class Dispatcher(nettyEnv: NettyRpcEnv) extends Logging {
     private class EndpointData(
        val name: String,
        val endpoint: RpcEndpoint,
        val ref: NettyRpcEndpointRef) {
      val inbox = new Inbox(ref, endpoint)
    }
    private val endpoints = new ConcurrentHashMap[String, EndpointData]
    private val endpointRefs = new ConcurrentHashMap[RpcEndpoint, RpcEndpointRef]
    
    // Track the receivers whose inboxes may contain messages.
    private val receivers = new LinkedBlockingQueue[EndpointData]
    

  • 注意: 一個端點對應一個Dispacher,一個Inbox , 多個OutBox,可以看到 OutBox在NettyRpcEnv內部:

    private[netty] class NettyRpcEnv(
      val conf: SparkConf,
      javaSerializerInstance: JavaSerializerInstance,
      host: String,
      securityManager: SecurityManager) extends RpcEnv(conf) with Logging {
      
      private val dispatcher: Dispatcher = new Dispatcher(this)
      
      private val streamManager = new NettyStreamManager(this)
      private val transportContext = new TransportContext(transportConf,
      new NettyRpcHandler(dispatcher, this, streamManager))
      
    /**
     * A map for [[RpcAddress]] and [[Outbox]]. When we are connecting to a remote [[RpcAddress]],
     * we just put messages to its [[Outbox]] to implement a non-blocking `send` method.
     */
    private val outboxes = new ConcurrentHashMap[RpcAddress, Outbox]()
    

3 結語

秦凱新 於深圳 2018-10-28