1. 程式人生 > >Spark1.6之後為何使用Netty通信框架替代Akka

Spark1.6之後為何使用Netty通信框架替代Akka

chan try 線程池大小 -- 核心概念 事情 ike 新的 inux


解決方案:

一直以來,基於Akka實現的RPC通信框架是Spark引以為豪的主要特性,也是與Hadoop等分布式計算框架對比過程中一大亮點。

但是時代和技術都在演化,從Spark1.3.1版本開始,為了解決大塊數據(如Shuffle)的傳輸問題,Spark引入了Netty通信框架,到了1.6.0版本,Netty居然完成取代了Akka,承擔Spark內部所有的RPC通信以及數據流傳輸。

網絡IO掃盲貼

在Linux操作系統層面,網絡操作即為IO操作,總共有:阻塞式非阻塞式復用模型信號驅動異步五種IO模型。其中

  • 阻塞式IO操作請求發起以後,從網卡等待/讀取數據,內核/到用戶態的拷貝,整個IO過程中,用戶的線程都是處於阻塞狀態。
  • 非阻塞與阻塞的區別在於應用層不會等待網卡接收數據,即在內核數據未準備好之前,IO將返回EWOULDBLOCK,用戶端通過主動輪詢,直到內核態數據準備好,然後再主動發起內核態數據到用戶態的拷貝讀操作(阻塞)。
  • 在非阻塞IO中,每個IO的應用層代碼都需要主動地去內核態輪詢直到數據OK,在IO復用模型中,將“輪詢/事件驅動”的工作交給一個單獨的select/epoll的IO句柄去做,即所謂的IO復用。
  • 信號驅動IO是向內核註冊信號回調函數,在數據OK的時候自動觸發回調函數,進而可以在回調函數中啟用數據的讀取,即由內核告訴我們何時可以開始IO操作。
  • 異步IO是將IO數據讀取到用戶態內存的函數註冊到系統中,在內核數據OK的時候,自動完成內核態到用戶態的拷貝,並通知應用態數據已經讀取完成,即由內核告訴我們何時IO操作已經完成;

JAVA IO也經歷來上面幾次演化,從最早的BIO(阻塞式/非阻塞IO),到1.4版本的NIO(IO復用),到1.7版本的NIO2.0/AIO(異步IO);

基於早期BIO來實現高並發網絡服務器都是依賴多線程來實現,但是線程開銷較大,BIO的瓶頸明顯,NIO的出現解決了這一大難題,基於IO復用解決了IO高並發;但是NIO有也有幾個缺點:

  • API可用性較低(拿ByteBuffer來說,共用一個curent指針,讀寫切換需要進行flip和rewind,相當麻煩);
  • 僅僅是API,如果想在NIO上實現一個網絡模型,還需要自己寫很多比如線程池,解碼,半包/粘包,限流等邏輯;
  • 最後就是著名的NIO-Epoll死循環的BUG 因為這幾個原因,促使了很多JAVA-IO通信框架的出現,Netty就是其中一員,它也因為高度的穩定性,功能性,性能等特性,成為Javaer們的首選。


那麽Netty和JDK-NIO之間到底是什麽關系?是JDK-NIO的封裝還是重寫?

  首先是NIO的上層封裝,Netty提供了NioEventLoopGroup/NioSocketChannel/NioServerSocketChannel的組合來完成實際IO操作,繼而在此之上實現數據流Pipeline以及EventLoop線程池等功能。

另外它又重寫了NIO,JDK-NIO底層是基於Epoll的LT模式來實現,而Netty是基於Epoll的ET模式實現的一組IO操作EpollEventLoopGroup/EpollSocketChannel/EpollServerSocketChannel;

Netty對兩種實現進行完美的封裝,可以根據業務的需求來選擇不同的實現(Epoll的ET和LT模式真的有很大的性能差別嗎?單從Epoll的角度來看,ET肯定是比LT要性能好那麽一點。

但是如果為了編碼簡潔性,LT還是首選,ET如果用戶層邏輯實現不夠優美,相比ET還會帶來更大大性能開銷;不過Netty這麽大的開源團隊,相信ET模式應該實現的不錯吧!!純屬猜測!!)。

那麽Akka又是什麽東西?

  從Akka出現背景來說,它是基於Actor的RPC通信系統,它的核心概念也是Message,它是基於協程的,性能不容置疑;基於scala的偏函數,易用性也沒有話說。

但是它畢竟只是RPC通信,無法適用大的package/stream的數據傳輸,這也是Spark早期引入Netty的原因。

註:

Akka is a concurrency framework built around the notion of actors and composable futures, Akka was inspired by Erlang which was built from the ground up around the Actor paradigm. It would usually be used to replace blocking locks such as synchronized, read write locks and the like with higher level asynchronous abstractions.

Akka是一個建立在Actors概念和可組合Futures之上的並發框架,,Akka設計靈感來源於Erlang,Erlang是基於Actor模型構建的。它通常被用來取代阻塞鎖如同步、讀寫鎖及類似的更高級別的異步抽象。

Netty is an asynchronous network library used to make Java NIO easier to use.

Netty是一個異步網絡庫,使JAVA NIO的功能更好用。

Notice that they both embrace asynchronous approaches, and that one could use the two together, or entirely separately.

註意:它們兩個都提供了異步方法,你可以使用其中一個,或兩個都用

Where there is an overlap is that Akka has an IO abstraction too, and Akka can be used to create computing clusters that pass messages between actors on different machines. From this point of view, Akka is a higher level abstraction that could (and does) make use of Netty under the hood

Akka針對IO操作有一個抽象,這和netty是一樣的。使用Akka可以用來創建計算集群,Actor在不同的機器之間傳遞消息。從這個角度來看,Akka相對於Netty來說,是一個更高層次的抽象

那麽Netty為什麽可以取代Akka?首先不容置疑的是Akka可以做到的,Netty也可以做到,但是Netty可以做到,Akka卻無法做到,原因是啥?

在軟件棧中,Akka相比Netty要Higher一點,它專門針對RPC做了很多事情,而Netty相比更加基礎一點,可以為不同的應用層通信協議(RPC,FTP,HTTP等)提供支持。

在早期的Akka版本,底層的NIO通信就是用的Netty;其次一個優雅的工程師是不會允許一個系統中容納兩套通信框架,惡心!

最後,雖然Netty沒有Akka協程級的性能優勢,但是Netty內部高效的Reactor線程模型,無鎖化的串行設計,高效的序列化,零拷貝,內存池等特性也保證了Netty不會存在性能問題。

那麽Spark是怎麽用Netty來取代Akka呢?一句話,利用偏函數的特性,基於Netty“仿造”出一個簡約版本的Actor模型!!


Spark Network Common的實現

Byte的表示

對於Network通信,不管傳輸的是序列化後的對象還是文件,在網絡上表現的都是字節流。在傳統IO中,字節流表示為Stream;在NIO中,字節流表示為ByteBuffer;在Netty中字節流表示為ByteBuff或FileRegion;在Spark中,針對Byte也做了一層包裝,支持對Byte和文件流進行處理,即ManagedBuffer;

ManagedBuffer包含了三個函數createInputStream(),nioByteBuffer(),convertToNetty()來對Buffer進行“類型轉換”,分別獲取stream,ByteBuffer,ByteBuff或FileRegion;NioManagedBuffer/NettyManagedBuffer/FileSegmentManagedBuffer也是針對這ByteBuffer,ByteBuff或FileRegion提供了具體的實現。

更好的理解ManagedBuffer:比如Shuffle BlockManager模塊需要在內存中維護本地executor生成的shuffle-map輸出的文件引用,從而可以提供給shuffleFetch進行遠程讀取,此時文件表示為FileSegmentManagedBuffer,shuffleFetch遠程調用FileSegmentManagedBuffer.nioByteBuffer/createInputStream函數從文件中讀取為Bytes,並進行後面的網絡傳輸。如果已經在內存中bytes就更好理解了,比如將一個字符數組表示為NettyManagedBuffer。


Protocol的表示

協議是應用層通信的基礎,它提供了應用層通信的數據表示,以及編碼和解碼的能力。在Spark Network Common中,繼承AKKA中的定義,將協議命名為Message,它繼承Encodable,提供了encode的能力。


<ignore_js_op>技術分享圖片

Message根據請求響應可以劃分為RequestMessage和ResponseMessage兩種;對於Response,根據處理結果,可以劃分為Failure和Success兩種類型;根據功能的不同,z主要劃分為Stream,ChunkFetch,Rpc。

Stream消息就是上面提到的ManagedBuffer中的Stream流,在Spark內部,比如SparkContext.addFile操作會在Driver中針對每一個add進來的file/jar會分配唯一的StreamID(file/[]filename],jars/[filename]);worker通過該StreamID向Driver發起一個StreamRequest的請求,Driver將文件轉換為FileSegmentManagedBuffer返回給Worker,這就是StreamMessage的用途之一;

ChunkFetch也有一個類似Stream的概念,ChunkFetch的對象是“一個內存中的Iterator[ManagedBuffer]”,即一組Buffer,每一個Buffer對應一個chunkIndex,整個Iterator[ManagedBuffer]由一個StreamID標識。Client每次的ChunkFetch請求是由(streamId,chunkIndex)組成的唯一的StreamChunkId,Server端根據StreamChunkId獲取為一個Buffer並返回給Client; 不管是Stream還是ChunkFetch,在Server的內存中都需要管理一組由StreamID與資源之間映射,即StreamManager類,它提供了getChunk和openStream兩個接口來分別響應ChunkFetch與Stream兩種操作,並且針對Server的ChunkFetch提供一個registerStream接口來註冊一組Buffer,比如可以將BlockManager中一組BlockID對應的Iterator[ManagedBuffer]註冊到StreamManager,從而支持遠程Block Fetch操作。

Case:對於ExternalShuffleService(一種單獨shuffle服務進程,對其他計算節點提供本節點上面的所有shuffle map輸出),它為遠程Executor提供了一種OpenBlocks的RPC接口,即根據請求的appid,executorid,blockid(appid+executor對應本地一組目錄,blockid拆封出)從本地磁盤中加載一組FileSegmentManagedBuffer到內存,並返回加載後的streamId返回給客戶端,從而支持後續的ChunkFetch的操作。

RPC是第三種核心的Message,和Stream/ChunkFetch的Message不同,每次通信的Body是類型是確定的,在rpcHandler可以根據每種Body的類型進行相應的處理。 在Spark1.6.*版本中,也正式使用基於Netty的RPC框架來替代Akka。

Server的結構

Server構建在Netty之上,它提供兩種模型NIO和Epoll,可以通過參數(spark.[module].io.mode)進行配置,最基礎的module就是shuffle,不同的IOMode選型,對應了Netty底層不同的實現,Server的Init過程中,最重要的步驟就是根據不同的IOModel完成EventLoop和Pipeline的構造,如下所示:

//根據IO模型的不同,構造不同的EventLoop/ClientChannel/ServerChannel

EventLoopGroup createEventLoop(IOMode mode, int numThreads, String threadPrefix) {
    switch (mode) {
    case NIO:
        return new NioEventLoopGroup(numThreads, threadFactory);
    case EPOLL:
        return new EpollEventLoopGroup(numThreads, threadFactory);
    }
}
 
Class<? extends Channel> getClientChannelClass(IOMode mode) {
    switch (mode) {
    case NIO:
        return NioSocketChannel.class;
    case EPOLL:
        return EpollSocketChannel.class;
    }
}
 
Class<? extends ServerChannel> getServerChannelClass(IOMode mode) {
    switch (mode) {
    case NIO:
        return NioServerSocketChannel.class;
    case EPOLL:
        return EpollServerSocketChannel.class;
    }
}
 
//構造pipelet
responseHandler = new TransportResponseHandler(channel);
TransportClient client = new TransportClient(channel, responseHandler);
requestHandler = new TransportRequestHandler(channel, client,rpcHandler);
channelHandler = new TransportChannelHandler(client, responseHandler, requestHandler,
                                 conf.connectionTimeoutMs(), closeIdleConnections);
channel.pipeline()
   .addLast("encoder", encoder)
   .addLast(TransportFrameDecoder.HANDLER_NAME, NettyUtils.createFrameDecoder())
   .addLast("decoder", decoder)
   .addLast("idleStateHandler", new IdleStateHandler())
   .addLast("handler", channelHandler);

其中,MessageEncoder/Decoder針對網絡包到Message的編碼和解碼,而最為核心就TransportRequestHandler,它封裝了對所有請求/響應的處理;TransportChannelHandler內部實現也很簡單,它封裝了responseHandler和requestHandler,當從Netty中讀取一條Message以後,根據判斷路由給相應的responseHandler和requestHandler。

public void channelRead0(ChannelHandlerContext ctx, Message request) throws Exception {
    if (request instanceof RequestMessage) {
        requestHandler.handle((RequestMessage) request);
    } else {
        responseHandler.handle((ResponseMessage) request);
    }
}

Sever提供的RPC,ChunkFecth,Stream的功能都是依賴TransportRequestHandler來實現的;從原理上來說,RPC與ChunkFecth/Stream還是有很大不同的,其中RPC對於TransportRequestHandler來說是功能依賴,而ChunkFecth/Stream對於TransportRequestHandler來說只是數據依賴。

怎麽理解?即TransportRequestHandler已經提供了ChunkFecth/Stream的實現,只需要在構造的時候,向TransportRequestHandler提供一個streamManager,告訴RequestHandler從哪裏可以讀取到Chunk或者Stream。

而RPC需要向TransportRequestHandler註冊一個rpcHandler,針對每個RPC接口進行功能實現,同時RPC與ChunkFecth/Stream都會有同一個streamManager的依賴,因此註入到TransportRequestHandler中的streamManager也是依賴rpcHandler來實現,即rpcHandler中提供了RPC功能實現和streamManager的數據依賴。

//參考TransportRequestHandler的構造函數
public TransportRequestHandler(RpcHandler rpcHandler) {
    this.rpcHandler = rpcHandler;//****註入功能****
    this.streamManager = rpcHandler.getStreamManager();//****註入streamManager****
}
//實現ChunkFecth的功能
private void processFetchRequest(final ChunkFetchRequest req) {
    buf = streamManager.getChunk(req.streamId, req.chunkIndex);
    respond(new ChunkFetchSuccess(req.streamChunkId, buf));
}
//實現Stream的功能
private void processStreamRequest(final StreamRequest req) {
    buf = streamManager.openStream(req.streamId);
    respond(new StreamResponse(req.streamId, buf.size(), buf));
}
//實現RPC的功能
private void processRpcRequest(final RpcRequest req) {
    rpcHandler.receive(reverseClient, req.body().nioByteBuffer(),
        new RpcResponseCallback() {
            public void onSuccess(ByteBuffer response) {
            respond(new RpcResponse(req.requestId, new NioManagedBuffer(response)));
            }
    });
}

Client的結構

Server是通過監聽一個端口,註入rpcHandler和streamManager從而對外提供RPC,ChunkFecth,Stream的服務,而Client即為一個客戶端類,通過該類,可以將一個streamId/chunkIndex對應的ChunkFetch請求,streamId對應的Stream請求,以及一個RPC數據包對應的RPC請求發送到服務端,並監聽和處理來自服務端的響應;其中最重要的兩個類即為TransportClient和TransportResponseHandler分別為上述的“客戶端類”和“監聽和處理來自服務端的響應"。

那麽TransportClient和TransportResponseHandler是怎麽配合一起完成Client的工作呢?


<ignore_js_op>技術分享圖片

如上所示,由TransportClient將用戶的RPC,ChunkFecth,Stream的請求進行打包並發送到Server端,同時將用戶提供的回調函數註冊到TransportResponseHandler,在上面一節中說過,TransportResponseHandler是TransportChannelHandler的一部分,在TransportChannelHandler接收到數據包,並判斷為響應包以後,將包數據路由到TransportResponseHandler中,在TransportResponseHandler中通過註冊的回調函數,將響應包的數據返回給客戶端

//以TransportResponseHandler中處理ChunkFetchSuccess響應包的處理邏輯
public void handle(ResponseMessage message) throws Exception {
    String remoteAddress = NettyUtils.getRemoteAddress(channel);
    if (message instanceof ChunkFetchSuccess) {
        resp = (ChunkFetchSuccess) message;
        listener = outstandingFetches.get(resp.streamChunkId);
        if (listener == null) {
            //沒有監聽的回調函數
        } else {
            outstandingFetches.remove(resp.streamChunkId);
            //回調函數,並把resp.body()對應的chunk數據返回給listener
            listener.onSuccess(resp.streamChunkId.chunkIndex, resp.body());
            resp.body().release();
        }
    }
}
//ChunkFetchFailure/RpcResponse/RpcFailure/StreamResponse/StreamFailure處理的方法是一致的


Spark Network的功能應用--BlockTransfer&&Shuffle

無論是BlockTransfer還是ShuffleFetch都需要跨executor的數據傳輸,在每一個executor裏面都需要運行一個Server線程(後面也會分析到,對於Shuffle也可能是一個獨立的ShuffleServer進程存在)來提供對Block數據的遠程讀寫服務。

在每個Executor裏面,都有一個BlockManager模塊,它提供了對當前Executor所有的Block的“本地管理”,並對進程內其他模塊暴露getBlockData(blockId: BlockId): ManagedBuffer的Block讀取接口,但是這裏GetBlockData僅僅是提供本地的管理功能,對於跨遠程的Block傳輸,則由NettyBlockTransferService提供服務。

NettyBlockTransferService本身即是Server,為其他其他遠程Executor提供Block的讀取功能,同時它即為Client,為本地其他模塊暴露fetchBlocks的接口,支持通過host/port拉取任何Executor上的一組的Blocks。


NettyBlockTransferService作為一個Server

NettyBlockTransferService作為一個Server,與Executor或Driver裏面其他的服務一樣,在進程啟動時,由SparkEnv初始化構造並啟動服務,在整個運行時的一部分。

val blockTransferService =
    new NettyBlockTransferService(conf, securityManager, hostname, numUsableCores)
 
val envInstance = new SparkEnv(executorId,rpcEnv,serializer, closureSerializer,
    blockTransferService,//為SparkEnv的一個組成
    ....,conf)


在上文,我們談到,一個Server的構造依賴RpcHandler提供RPC的功能註入以及提供streamManager的數據註入。對於NettyBlockTransferService,該RpcHandler即為NettyBlockRpcServer,在構造的過程中,需要與本地的BlockManager進行管理,從而支持對外提供本地BlockMananger中管理的數據

"RpcHandler提供RPC的功能註入"在這裏還是屬於比較“簡陋的”,畢竟他是屬於數據傳輸模塊,Server中提供的chunkFetch和stream已經足夠滿足他的功能需要,那現在問題就是怎麽從streamManager中讀取數據來提供給chunkFetch和stream進行使用呢?

就是NettyBlockRpcServer作為RpcHandler提供的一個Rpc接口之一:OpenBlocks,它接受由Client提供一個Blockids列表,Server根據該BlockIds從BlockManager獲取到相應的數據並註冊到streamManager中,同時返回一個StreamID,後續Client即可以使用該StreamID發起ChunkFetch的操作。

//case openBlocks: OpenBlocks =>
val blocks: Seq[ManagedBuffer] =
    openBlocks.blockIds.map(BlockId.apply).map(blockManager.getBlockData)
val streamId = streamManager.registerStream(appId, blocks.iterator.asJava)
responseContext.onSuccess(new StreamHandle(streamId, blocks.size).toByteBuffer)


NettyBlockTransferService作為一個Client

從NettyBlockTransferService作為一個Server,我們基本可以推測NettyBlockTransferService作為一個Client支持fetchBlocks的功能的基本方法:

  • Client將一組Blockid表示為一個openMessage請求,發送到服務端,服務針對該組Blockid返回一個唯一的streamId
  • Client針對該streamId發起size(blockids)個fetchChunk操作。


核心代碼如下:

//發出openMessage請求
client.sendRpc(openMessage.toByteBuffer(), new RpcResponseCallback() {
    @Override
    public void onSuccess(ByteBuffer response) {
        streamHandle = (StreamHandle)response;//獲取streamId
        //針對streamid發出一組fetchChunk
        for (int i = 0; i < streamHandle.numChunks; i++) {
            client.fetchChunk(streamHandle.streamId, i, chunkCallback);
        }
    }
});


同時,為了提高服務端穩定性,針對fetchBlocks操作NettyBlockTransferService提供了非重試版本和重試版本的BlockFetcher,分別為OneForOneBlockFetcher和RetryingBlockFetcher,通過參數(spark.[module].io.maxRetries)進行配置,默認是重試3次,除非你蛋疼,你不重試!!!

在Spark,Block有各種類型,可以是ShuffleBlock,也可以是BroadcastBlock等等,對於ShuffleBlock的Fetch,除了由Executor內部的NettyBlockTransferService提供服務以外,也可以由外部的ShuffleService來充當Server的功能,並由專門的ExternalShuffleClient來與其進行交互,從而獲取到相應Block數據。功能的原理和實現,基本一致,但是問題來了?為什麽需要一個專門的ShuffleService服務呢?主要原因還是為了做到任務隔離,即減輕因為fetch帶來對Executor的壓力,讓其專心的進行數據的計算。

其實外部的ShuffleService最終是來自Hadoop的AuxiliaryService概念,AuxiliaryService為計算節點NodeManager常駐的服務線程,早期的MapReduce是進程級別的調度,ShuffleMap完成shuffle文件的輸出以後,即立即退出,在ShuffleReduce過程中由誰來提供文件的讀取服務呢?即AuxiliaryService,每一個ShuffleMap都會將自己在本地的輸出,註冊到AuxiliaryService,由AuxiliaryService提供本地數據的清理以及外部讀取的功能。

在目前Spark中,也提供了這樣的一個AuxiliaryService:YarnShuffleService,但是對於Spark不是必須的,如果你考慮到需要“通過減輕因為fetch帶來對Executor的壓力”,那麽就可以嘗試嘗試。

同時,如果啟用了外部的ShuffleService,對於shuffleClient也不是使用上面的NettyBlockTransferService,而是專門的ExternalShuffleClient,功能邏輯基本一致!


Spark Network的功能應用--新的RPC框架

Akka的通信模型是基於Actor,一個Actor可以理解為一個Service服務對象,它可以針對相應的RPC請求進行處理,如下所示,定義了一個最為基本的Actor:

class HelloActor extends Actor {
    def receive = {
        case "hello" => println("world")
        case _       => println("huh?")
    }
}
//
Receive = PartialFunction[Any, Unit]


Actor內部只有唯一一個變量(當然也可以理解為函數了),即Receive,它為一個偏函數,通過case語句可以針對Any信息可以進行相應的處理,這裏Any消息在實際項目中就是消息包。

另外一個很重要的概念就是ActorSystem,它是一個Actor的容器,多個Actor可以通過name->Actor的註冊到Actor中,在ActorSystem中可以根據請求不同將請求路由給相應的Actor。ActorSystem和一組Actor構成一個完整的Server端,此時客戶端通過host:port與ActorSystem建立連接,通過指定name就可以相應的Actor進行通信,這裏客戶端就是ActorRef。所有Akka整個RPC通信系列是由Actor,ActorRef,ActorSystem組成。

Spark基於這個思想在上述的Network的基礎上實現一套自己的RPC Actor模型,從而取代Akka。其中RpcEndpoint對於Actor,RpcEndpointRef對應ActorRef,RpcEnv即對應了ActorSystem。

下面我們具體進行分析它的實現原理。

private[spark] trait RpcEndpoint {
    def receive: PartialFunction[Any, Unit] = {
        case _ => throw new SparkException()
    }
    def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
        case _ => context.sendFailure(new SparkException())
    }
    //onStart(),onStop()
}

RpcEndpoint與Actor一樣,不同RPC Server可以根據業務需要指定相應receive/receiveAndReply的實現,在Spark內部現在有N多個這樣的Actor,比如Executor就是一個Actor,它處理來自Driver的LaunchTask/KillTask等消息。

RpcEnv相對於ActorSystem:

  • 首先它作為一個Server,它通過NettyRpcHandler來提供了Server的服務能力,
  • 其次它作為RpcEndpoint的容器,它提供了setupEndpoint(name,endpoint)接口,從而實現將一個RpcEndpoint以一個Name對應關系註冊到容器中,從而通過Server對外提供Service
  • 最後它作為Client的適配器,它提供了setupEndpointRef/setupEndpointRefByURI接口,通過指定Server端的Host和PORT,並指定RpcEndpointName,從而獲取一個與指定Endpoint通信的引用。


RpcEndpointRef即為與相應Endpoint通信的引用,它對外暴露了send/ask等接口,實現將一個Message發送到Endpoint中。

這就是新版本的RPC框架的基本功能,它的實現基本上與Akka無縫對接,業務的遷移的功能很小,目前基本上都全部遷移完了。

RpcEnv內部實現原理

RpcEnv不僅從外部接口與Akka基本一致,在內部的實現上,也基本差不多,都是按照MailBox的設計思路來實現的;


<ignore_js_op>技術分享圖片

與上圖所示,RpcEnv即充當著Server,同時也為Client內部實現。 當As Server,RpcEnv會初始化一個Server,並註冊NettyRpcHandler,在前面描述過,RpcHandler的receive接口負責對每一個請求進行處理,一般情況下,簡單業務可以在RpcHandler直接完成請求的處理,但是考慮一個RpcEnv的Server上會掛載了很多個RpcEndpoint,每個RpcEndpoint的RPC請求頻率不可控,因此需要對一定的分發機制和隊列來維護這些請求,其中Dispatcher為分發器,InBox即為請求隊列;

在將RpcEndpoint註冊到RpcEnv過程中,也間接的將RpcEnv註冊到Dispatcher分發器中,Dispatcher針對每個RpcEndpoint維護一個InBox,在Dispatcher維持一個線程池(線程池大小默認為系統可用的核數,當然也可以通過spark.rpc.netty.dispatcher.numThreads進行配置),線程針對每個InBox裏面的請求進行處理。當然實際的處理過程是由RpcEndpoint來完成。

這就是RpcEnv As Server的基本過程!

其次RpcEnv也完成Client的功能實現,RpcEndpointRef是以RpcEndpoint為單位,即如果一個進程需要和遠程機器上N個RpcEndpoint服務進行通信,就對應N個RpcEndpointRef(後端的實際的網絡連接是公用,這個是TransportClient內部提供了連接池來實現的),當調用一個RpcEndpointRef的ask/send等接口時候,會將把“消息內容+RpcEndpointRef+本地地址”一起打包為一個RequestMessage,交由RpcEnv進行發送。註意這裏打包的消息裏面包括RpcEndpointRef本身是很重要的,從而可以由Server端識別出這個消息對應的是哪一個RpcEndpoint。

和發送端一樣,在RpcEnv中,針對每個remote端的host:port維護一個隊列,即OutBox,RpcEnv的發送僅僅是把消息放入到相應的隊列中,但是和發送端不一樣的是:在OutBox中沒有維護一個所謂的線程池來定時清理OutBox,而是通過一堆synchronized來實現的,這點值得商討。

Spark1.6之後為何使用Netty通信框架替代Akka