1. 程式人生 > >Spark為何使用Netty通訊框架替代Akka

Spark為何使用Netty通訊框架替代Akka

轉自:http://www.aboutyun.com/thread-21115-1-1.html

問題導讀:


1. spark 如何在1.6.0之後使用Netty替代了Akka?
2. Spark Network Common怎麼實現?
3. BlockTransfer 與 Shuffle 之間的聯絡?
4. Akka 實現原理是什麼?





解決方案:

一直以來,基於Akka實現的RPC通訊框架是Spark引以為豪的主要特性,也是與Hadoop等分散式計算框架對比過程中一大亮點,但是時代和技術都在演化,從Spark1.3.1版本開始,為了解決大塊資料(如Shuffle)的傳輸問題,Spark引入了Netty通訊框架,到了1.6.0版本,Netty居然完成取代了Akka,承擔Spark內部所有的RPC通訊以及資料流傳輸。


網路IO掃盲貼

在Linux作業系統層面,網路操作即為IO操作,總共有:阻塞式,非阻塞式,複用模型,訊號驅動和非同步五種IO模型。其中

  • 阻塞式IO操作請求發起以後,從網絡卡等待/讀取資料,核心/到使用者態的拷貝,整個IO過程中,使用者的執行緒都是處於阻塞狀態。
  • 非阻塞與阻塞的區別在於應用層不會等待網絡卡接收資料,即在核心資料未準備好之前,IO將返回EWOULDBLOCK,使用者端通過主動輪詢,直到核心態資料準備好,然後再主動發起核心態資料到使用者態的拷貝讀操作(阻塞)。
  • 在非阻塞IO中,每個IO的應用層程式碼都需要主動地去核心態輪詢直到資料OK,在IO複用模型中,將“輪詢/事件驅動”的工作交給一個單獨的select/epoll的IO控制代碼去做,即所謂的IO複用。
  • 訊號驅動IO是向核心註冊訊號回撥函式,在資料OK的時候自動觸發回撥函式,進而可以在回撥函式中啟用資料的讀取,即由核心告訴我們何時可以開始IO操作。
  • 非同步IO是將IO資料讀取到使用者態記憶體的函式註冊到系統中,在核心資料OK的時候,自動完成核心態到使用者態的拷貝,並通知應用態資料已經讀取完成,即由核心告訴我們何時IO操作已經完成;

JAVA IO也經歷來上面幾次演化,從最早的BIO(阻塞式/非阻塞IO),到1.4版本的NIO(IO複用),到1.7版本的NIO2.0/AIO(非同步IO);基於早期BIO來實現高併發網路伺服器都是依賴多執行緒來實現,但是執行緒開銷較大,BIO的瓶頸明顯,NIO的出現解決了這一大難題,基於IO複用解決了IO高併發;但是NIO有也有幾個缺點:


  • API可用性較低(拿ByteBuffer來說,共用一個curent指標,讀寫切換需要進行flip和rewind,相當麻煩);
  • 僅僅是API,如果想在NIO上實現一個網路模型,還需要自己寫很多比如執行緒池,解碼,半包/粘包,限流等邏輯;
  • 最後就是著名的NIO-Epoll死迴圈的BUG 因為這幾個原因,促使了很多JAVA-IO通訊框架的出現,Netty就是其中一員,它也因為高度的穩定性,功能性,效能等特性,成為Javaer們的首選。

那麼Netty和JDK-NIO之間到底是什麼關係?是JDK-NIO的封裝還是重寫?首先是NIO的上層封裝,Netty提供了NioEventLoopGroup/NioSocketChannel/NioServerSocketChannel的組合來完成實際IO操作,繼而在此之上實現資料流Pipeline以及EventLoop執行緒池等功能。另外它又重寫了NIO,JDK-NIO底層是基於Epoll的LT模式來實現,而Netty是基於Epoll的ET模式實現的一組IO操作EpollEventLoopGroup/EpollSocketChannel/EpollServerSocketChannel;Netty對兩種實現進行完美的封裝,可以根據業務的需求來選擇不同的實現(Epoll的ET和LT模式真的有很大的效能差別嗎?單從Epoll的角度來看,ET肯定是比LT要效能好那麼一點。但是如果為了編碼簡潔性,LT還是首選,ET如果使用者層邏輯實現不夠優美,相比ET還會帶來更大大效能開銷;不過Netty這麼大的開源團隊,相信ET模式應該實現的不錯吧!!純屬猜測!!)。

那麼Akka又是什麼東西?從Akka出現背景來說,它是基於Actor的RPC通訊系統,它的核心概念也是Message,它是基於協程的,效能不容置疑;基於scala的偏函式,易用性也沒有話說,但是它畢竟只是RPC通訊,無法適用大的package/stream的資料傳輸,這也是Spark早期引入Netty的原因。

那麼Netty為什麼可以取代Akka?首先不容置疑的是Akka可以做到的,Netty也可以做到,但是Netty可以做到,Akka卻無法做到,原因是啥?在軟體棧中,Akka相比Netty要Higher一點,它專門針對RPC做了很多事情,而Netty相比更加基礎一點,可以為不同的應用層通訊協議(RPC,FTP,HTTP等)提供支援,在早期的Akka版本,底層的NIO通訊就是用的Netty;其次一個優雅的工程師是不會允許一個系統中容納兩套通訊框架,噁心!最後,雖然Netty沒有Akka協程級的效能優勢,但是Netty內部高效的Reactor執行緒模型,無鎖化的序列設計,高效的序列化,零拷貝,記憶體池等特性也保證了Netty不會存在效能問題。

那麼Spark是怎麼用Netty來取代Akka呢?一句話,利用偏函式的特性,基於Netty“仿造”出一個簡約版本的Actor模型!!

Spark Network Common的實現

Byte的表示

對於Network通訊,不管傳輸的是序列化後的物件還是檔案,在網路上表現的都是位元組流。在傳統IO中,位元組流表示為Stream;在NIO中,位元組流表示為ByteBuffer;在Netty中位元組流表示為ByteBuff或FileRegion;在Spark中,針對Byte也做了一層包裝,支援對Byte和檔案流進行處理,即ManagedBuffer;

ManagedBuffer包含了三個函式createInputStream(),nioByteBuffer(),convertToNetty()來對Buffer進行“型別轉換”,分別獲取stream,ByteBuffer,ByteBuff或FileRegion;NioManagedBuffer/NettyManagedBuffer/FileSegmentManagedBuffer也是針對這ByteBuffer,ByteBuff或FileRegion提供了具體的實現。

更好的理解ManagedBuffer:比如Shuffle BlockManager模組需要在記憶體中維護本地executor生成的shuffle-map輸出的檔案引用,從而可以提供給shuffleFetch進行遠端讀取,此時檔案表示為FileSegmentManagedBuffer,shuffleFetch遠端呼叫FileSegmentManagedBuffer.nioByteBuffer/createInputStream函式從檔案中讀取為Bytes,並進行後面的網路傳輸。如果已經在記憶體中bytes就更好理解了,比如將一個字元陣列表示為NettyManagedBuffer。

Protocol的表示

協議是應用層通訊的基礎,它提供了應用層通訊的資料表示,以及編碼和解碼的能力。在Spark Network Common中,繼承AKKA中的定義,將協議命名為Message,它繼承Encodable,提供了encode的能力。


 

Message根據請求響應可以劃分為RequestMessage和ResponseMessage兩種;對於Response,根據處理結果,可以劃分為Failure和Success兩種型別;根據功能的不同,z主要劃分為Stream,ChunkFetch,Rpc。

Stream訊息就是上面提到的ManagedBuffer中的Stream流,在Spark內部,比如SparkContext.addFile操作會在Driver中針對每一個add進來的file/jar會分配唯一的StreamID(file/[]filename],jars/[filename]);worker通過該StreamID向Driver發起一個StreamRequest的請求,Driver將檔案轉換為FileSegmentManagedBuffer返回給Worker,這就是StreamMessage的用途之一;

ChunkFetch也有一個類似Stream的概念,ChunkFetch的物件是“一個記憶體中的Iterator[ManagedBuffer]”,即一組Buffer,每一個Buffer對應一個chunkIndex,整個Iterator[ManagedBuffer]由一個StreamID標識。Client每次的ChunkFetch請求是由(streamId,chunkIndex)組成的唯一的StreamChunkId,Server端根據StreamChunkId獲取為一個Buffer並返回給Client; 不管是Stream還是ChunkFetch,在Server的記憶體中都需要管理一組由StreamID與資源之間對映,即StreamManager類,它提供了getChunk和openStream兩個介面來分別響應ChunkFetch與Stream兩種操作,並且針對Server的ChunkFetch提供一個registerStream介面來註冊一組Buffer,比如可以將BlockManager中一組BlockID對應的Iterator[ManagedBuffer]註冊到StreamManager,從而支援遠端Block Fetch操作。

Case:對於ExternalShuffleService(一種單獨shuffle服務程序,對其他計算節點提供本節點上面的所有shuffle map輸出),它為遠端Executor提供了一種OpenBlocks的RPC介面,即根據請求的appid,executorid,blockid(appid+executor對應本地一組目錄,blockid拆封出)從本地磁碟中載入一組FileSegmentManagedBuffer到記憶體,並返回載入後的streamId返回給客戶端,從而支援後續的ChunkFetch的操作。

RPC是第三種核心的Message,和Stream/ChunkFetch的Message不同,每次通訊的Body是型別是確定的,在rpcHandler可以根據每種Body的型別進行相應的處理。 在Spark1.6.*版本中,也正式使用基於Netty的RPC框架來替代Akka。

Server的結構

Server構建在Netty之上,它提供兩種模型NIO和Epoll,可以通過引數(spark.[module].io.mode)進行配置,最基礎的module就是shuffle,不同的IOMode選型,對應了Netty底層不同的實現,Server的Init過程中,最重要的步驟就是根據不同的IOModel完成EventLoop和Pipeline的構造,如下所示:

//根據IO模型的不同,構造不同的EventLoop/ClientChannel/ServerChannel
[Scala] 純文字檢視 複製程式碼 ?
01 02 03 04 05 06 07 08 09 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 EventLoopGroup createEventLoop(IOMode mode, int numThreads, String threadPrefix) { switch (mode) { case NIO