1. 程式人生 > >Spark 底層網路模組

Spark 底層網路模組

對於分散式系統來說,網路是最基本的一環,其設計的好壞直接影響到整個分散式系統的穩定性及可用性。為此,Spark專門獨立出基礎網路模組spark-network,為上層RPC、Shuffle資料傳輸、RDD Block同步以及資原始檔傳輸等提供可靠的網路服務。在spark-1.6以前,RPC是單獨通過akka實現,資料以及檔案傳輸是通過netty實現,然而akka實質上底層也是採用netty實現,對於一個優雅的工程師來說,不會在系統中同時使用具有重複功能的框架,否則會使得系統越來越重,所以自spark-1.6開始,通過netty封裝了一套簡潔的類似於akka actor模式的RPC介面,逐步拋棄akka這個大框架。從spark-2.0起,所有的網路功能都是通過netty來實現。

系統抽象

在介紹spark網路模組前,我們先溫習下netty的基本工作流程。無論是伺服器還是客戶端都會關聯一個channel(socket),channel上會繫結一個pipeline,pipeline繫結若干個handler,用來專門用來處理和業務有關的東西,handler有DownHandler和UpHandler兩種,DownHandler用來處理髮包,UpHandler用來處理收包,大致過程如下圖所示。

spark-network-netty-overview

Spark的底層網路實現也是遵循上圖所示流程,其總體實現流程如下圖所示。客戶端傳送請求訊息,經過Encoder(一種DownHandler)編碼,加上包頭資訊,再通過網路發給服務端,服務端收到訊息後,首先經過TransportFrameDecoder(一種UpHandler)處理粘包拆包,得到訊息型別和訊息體,然後經過Decoder解析訊息型別,得到一個個具體的請求訊息,最後由TransportChannelHandler處理具體的請求訊息,並根據具體的訊息型別判斷是否返回一個響應。類似地,響應訊息傳給客戶端也是先經過Encoder編碼,客戶端先通過TransportFrameDecoder、Decoder解包訊息,再通過TransportChannelHandler處理具體的響應訊息。

spark-network-basic

整個網路模型非常清晰簡單,最核心的當屬訊息抽象以及如何定義訊息傳輸和處理,即上圖中的Message的定義以及編解碼傳輸等,下面詳細介紹spark網路模組的訊息抽象以及相關handler的定義。

訊息抽象

總結起來,Spark中定義三種類型的訊息:RPC訊息、ChunkFetch訊息以及Stream訊息。Message是這些訊息的抽象介面,它定義了三個關鍵介面,分別得到訊息型別、訊息體以及判斷訊息體是否編碼在header中,訊息體統一由ManagedBuffer表示,ManagedBuffer抽象了JAVA NIO ByteBuffer、Netty ByteBuf以及File Segment,所以無論是ByteBuffer、ByteBuf還是File Segment,都可以表示為ManagedBuffer。如下圖列出所有spark中涉及到的具體訊息,下面分別詳細闡述各種訊息。

spark-network-protocol

RPC訊息用於抽象所有spark中涉及到RPC操作時需要傳輸的訊息,通常這類訊息很小,一般都是些控制類訊息,在spark-1.6以前,RPC都是通過akka來實現的,自spark-1.6開始逐漸把akka剔除,通過netty實現,所以在spark-network公共模組中定義該類訊息,其包括RpcRequest、OneWayMessage、RpcResponse以及RpcFailure四種訊息。RpcRequest封裝RPC請求訊息,這類RPC請求是需要得到一個RPC響應的,RpcRequest除了訊息體外,還包括一個requestId欄位,用於唯一標識一個RPC請求,與RpcRequest對應地,有兩種RPC響應訊息,RpcResponse是RPC呼叫正常返回的響應訊息,RpcFailure是RPC呼叫異常返回的響應訊息,同樣地,它們除了訊息體外也包括requestId欄位,該欄位用於對應RpcRequest。OneWayMessage作為另一種RPC請求訊息,但是這類RPC請求是不需要響應的,所以它只包含訊息體,不需要諸如requestId等欄位來唯一標識,該訊息被髮送後可不用管它。

ChunkFetch訊息用於抽象所有spark中涉及到資料拉取操作時需要傳輸的訊息,它用於shuffle資料以及RDD Block資料傳輸。在shuffle階段,reduce task會去拉取map task結果中的對應partition資料,這需要發起一個ChunkFetch;另外,當RDD被快取後,如果節點上沒有所需的RDD Block,則會發起一個ChunkFetch拉取其他節點上的RDD Block。ChunkFetch訊息包括ChunkFetchRequest、ChunkFetchSuccess以及ChunkFetchFailure三種訊息。ChunkFetchRequest封裝ChunkFetch請求訊息,其只包括StreamChunkId欄位,沒有訊息體,StreamChunkId包括streamId和chunkIndex兩個欄位,streamId標識這次chunk fetch,chunkIndex標識fetch的資料塊,通常一次fetch可能會fetch多個chunk。ChunkFetchSuccess是成功Fetch後的響應訊息,ChunkFetchFailure是Fetch失敗的響應訊息,它們包含了Fetch的訊息體外,還包括StreamChunkId,以對應ChunkFetchRequest。

Stream訊息很簡單,主要用於driver到executor傳輸jar、file檔案等。Stream訊息包括StreamRequest、StreamResponse以及StreamFailure三種訊息,其中StreamRequest表示Stream請求訊息,只包含一個streamId,標識這個請求。StreamResponse表示Stream成功響應訊息,包含streamId以及響應的位元組數,並後面跟資料內容,實際使用時,客戶端會根據響應中的位元組數進一步獲取實際內容。StreamFailure表示Stream失敗的響應訊息,包含streamId以及異常資訊。executor需要獲取相關jar包或file檔案時,會發起一個StreamRequest訊息給driver,driver會返回一個StreamResponse,executor根據響應中的位元組數來進一步去截獲後續資料內容。

Handler定義

一個message從被髮送到被接收需要經過”MessageEncoder->網路->TransportFrameDecoder->MessageDecoder”,下面按照這一過程詳細闡述各handler的作用。

一個message進入網路前,需要經過MessageEncoder編碼,加上包頭資訊,以便後續收包時正確粘包拆包。頭資訊主要是包括三部分:

  • 整個包的長度
  • 訊息型別
  • 除訊息體外的訊息元資料,例如RpcRequest訊息的元資料資訊包括requestId和訊息長度bodysize

message-with-header

經過上述編碼後,一個個Message被編碼成一個個MessageWithHeader傳送到網路中,接收端收到資料後,首先通過TransportFrameDecoder和MessageDecoder來解碼出一個個具體的Message,這裡就涉及到粘包拆包問題,這也是為什麼在編碼階段在頭部加上frame length的原因。TransportFrameDecoder在解碼過程中,首先讀取8位元組的frame length(Long型),用frame length減去8就是除frame length外其他部分的長度,即為message type、message meta、message body三部分的長度,迴圈讀取直到這個長度,把讀到的Bytebuf交給MessagerDecoder,MessagerDecoder首先解析出message type,根據message type去反序列化(例項化)出具體的Message,例如message type如果是RpcRequest,那麼則繼續解析requestId和body size,根據body size解析後續位元組得到body,並構造出RpcRequest物件。反序列化得到的message物件會交給TransportChannelHandler,TransportChannelHandler裡封裝了TransportRequestHandler和TransportResponseHandler,分別處理RequestMessage和ResponseMessage,在服務端,TransportChannelHandler一般處理RequestMessage,在客戶端,TransportChannelHandler一般處理ResponseMessage。

spark-network-handler

系統訊息流程

根據上述系統抽象可以看出,spark-network將RPC、ChunkFetch以及Stream統一抽象出來,其中任意一種功能都依賴於spark-network的實現,下面分別詳細闡述這三種功能的一般使用流程。

RPC訊息處理

客戶端傳送一個RPC請求訊息(RpcRequest或OneWayMessage),經過編碼到網路,解碼到服務端的TransportChannelHandler,RPC請求會被交給TransportRequestHandler處理,而TransportRequestHandler中包括了一個RpcHandler專門用來處理RPC請求訊息,RpcHandler中有兩個關鍵receive介面,帶callback和不帶callback引數分別處理RpcRequest和OneWayMessage。

1
2
3
4
5
6
7
8
9
10
11
12
13
public abstract class RpcHandler {

  public abstract void receive(
      TransportClient client,
      ByteBuffer message,
      RpcResponseCallback callback);

  public void receive(TransportClient client, ByteBuffer message) {
    receive(client, message, ONE_WAY_CALLBACK);
  }

  ...
}

當收到RpcRequest時,處理後會在callback中傳送響應訊息,成功則傳送RpcResponse,失敗則傳送RpcFailure。當收到OneWayMessage時,處理後則直接不用管,客戶端也不用關心是否被處理了。

類似地,服務端傳送RPC響應訊息(RpcResponse或RpcFailure),也經過編碼到網路,解碼到客戶端的TransportChannelHandler,RPC響應會被交給TransportResponseHandler處理,在客戶端傳送RpcRequest的時候,會註冊一個RpcResponseCallback,通過requestId來標識,這樣在收到響應訊息的時候,根據響應訊息中的requestId就可以取出對應的RpcResponseCallback對響應訊息進行處理。

ChunkFetch訊息處理

對於ChunkFetch請求,客戶端一般需要首先發送一個RPC請求,告訴服務端需要拉取哪些資料,服務端收到這個RPC請求後,會為客戶端準備好需要的資料。上一節也提到,RPC請求會通過RpcHandler來處理,當RpcHandler接收到ChunkFetch的RPC請求訊息時,則會為客戶端準備好它需要的資料,這些即將要被fetch的資料是通過一個StreamManager來管理的,所以RpcHandler中有一個介面專門獲取StreamManager,StreamManager為後續到來的ChunkFetchRequest服務。

1
2
3
4
5
6
7
public abstract class RpcHandler {
  ...

  public abstract StreamManager getStreamManager();

  ...
}

RPC請求成功後,服務端表示資料準備好,客戶端傳送ChunkFetchRequest訊息,服務端收到該訊息後,最後會交給TransportRequestHandler處理,TransportRequestHandler則根據請求訊息中的StreamChunkId,從前面準備好的StreamManager中拿到對應的資料,封裝成ChunkFetchSuccess返回給客戶端,如果出錯或找不到對應的資料,則返回ChunkFetchFailure。

1
2
3
4
5
6
7
public abstract class StreamManager {
  ...

  public abstract ManagedBuffer getChunk(long streamId, int chunkIndex);

  ...
}

響應訊息到達客戶端後,最後會被交給TransportResponseHandler處理,在客戶端傳送ChunkFetchRequest的時候,會註冊一個ChunkReceivedCallback,通過StreamChunkId來標識,這樣在收到響應訊息的時候,根據響應訊息中的StreamChunkId就可以取出對應的ChunkReceivedCallback對響應訊息進行處理。

Stream訊息處理

Stream類似於ChunkFetch,主要用於檔案服務。客戶端一般也需要首先發送一個RPC請求,告訴服務端需要開啟一個stream,服務端收到這個RPC請求後,會為客戶端開啟所需的檔案流。

RPC請求成功後,服務端表示資料準備好,客戶端傳送StreamRequest訊息,服務端收到該訊息後,最後會交給TransportRequestHandler處理,TransportRequestHandler則根據請求訊息中的streamId,從準備好的StreamManager中開啟對應的檔案流,同時返回StreamResponse給客戶端,如果出錯或找不到對應的流,則返回ChunkFetchFailure。

1
2
3
4
5
6
7
public abstract class StreamManager {
  ...

  public ManagedBuffer openStream(String streamId);

  ...
}

響應訊息到達客戶端後,最後會被交給TransportResponseHandler處理,在客戶端傳送StreamRequest的時候,會註冊一個StreamCallback,同時維護一個StreamCallback的佇列,這樣在收到響應訊息的時候,就會從佇列中取出StreamCallback去處理截獲的資料。注意這裡說的是截獲的資料,這塊有點不一樣的是,收到響應訊息後,會根據響應訊息中資料大小,在TransportFrameDecoder物件中設定截獲器Interceptor物件,TransportFrameDecoder在接收資料的時候會被這個截獲器Interceptor擷取它想要的資料。雖然程式碼看懂了,但是這裡卻不知道為啥通過截獲的方式去拉取檔案流資料。

小結

本文主要闡述spark-network公共模組,詳細分析spark底層網路編解碼以及訊息處理的抽象,在後續文章中會更加詳細地介紹具體spark中的RPC、ShuffleService、BlockTransformService以及FileServer的實現,這些功能服務都是基於spark-network公共模組來實現的。

文章出處: