Spark內建框架rpc通訊機制及RpcEnv基礎設施-Spark商業環境實戰
本套系列部落格從真實商業環境抽取案例進行總結和分享,並給出Spark原始碼解讀及商業實戰指導,請持續關注本套部落格。版權宣告:本套Spark原始碼解讀及商業實戰歸作者(秦凱新)所有,禁止轉載,歡迎學習。
Spark商業環境實戰及調優進階系列
- Spark商業環境實戰-Spark內建框架rpc通訊機制及RpcEnv基礎設施
- Spark商業環境實戰-Spark事件監聽匯流排流程分析
- Spark商業環境實戰-Spark儲存體系底層架構剖析
- Spark商業環境實戰-Spark底層多個MessageLoop迴圈執行緒執行流程分析
- Spark商業環境實戰-Spark二級排程系統Stage劃分演算法和最佳任務排程細節剖析
1. Spark 內建框架rpc通訊機制
TransportContext 內部握有建立TransPortClient和TransPortServer的方法實現,但卻屬於最底層的RPC通訊設施。為什麼呢?
因為成員變數RPCHandler是抽象的,並沒有具體的訊息處理,而且TransportContext功能也在於建立TransPortClient客戶端和TransPortServer服務端。具體解釋如下:
Contains the context to create a {@link TransportServer}, {@link TransportClientFactory}, and to
setup Netty Channel pipelines with a
{@link org.apache.spark.network.server.TransportChannelHandler}.
複製程式碼
所以TransportContext只能為最底層的通訊基礎。上層為NettyRPCEnv高層封裝,並持有TransportContext引用,在TransportContext中傳入NettyRpcHandler實體,來實現netty通訊回撥Handler處理。TransportContext程式碼片段如下:
/* The TransportServer and TransportClientFactory both create a TransportChannelHandler for each
* channel. As each TransportChannelHandler contains a TransportClient, this enables server
* processes to send messages back to the client on an existing channel.
*/
public class TransportContext {
private final Logger logger = LoggerFactory.getLogger(TransportContext.class);
private final TransportConf conf;
private final RpcHandler rpcHandler;
private final boolean closeIdleConnections;
private final MessageEncoder encoder;
private final MessageDecoder decoder;
public TransportContext(TransportConf conf, RpcHandler rpcHandler) {
this(conf, rpcHandler, false);
}
複製程式碼
1.1 客戶端和服務端統一的訊息接收處理器 TransportChannelHandlerer
TransportClient 和TransportServer 在配置Netty的pipeLine的handler處理器時,均採用TransportChannelHandler, 來做統一的訊息receive處理。為什麼呢?在於統一訊息處理入口,TransportChannelHandlerer根據訊息型別執行不同的處理,程式碼片段如下:
public void channelRead(ChannelHandlerContext ctx, Object request) throws Exception {
if (request instanceof RequestMessage) {
requestHandler.handle((RequestMessage) request);
} else if (request instanceof ResponseMessage) {
responseHandler.handle((ResponseMessage) request);
} else {
ctx.fireChannelRead(request);
}
複製程式碼
}
TransportContext初始化Pipeline的程式碼片段:
public TransportChannelHandler initializePipeline(
SocketChannel channel,
RpcHandler channelRpcHandler) {
try {
TransportChannelHandler channelHandler = createChannelHandler(channel,
channelRpcHandler);
channel.pipeline()
.addLast("encoder", ENCODER)
.addLast(TransportFrameDecoder.HANDLER_NAME, NettyUtils.createFrameDecoder())
.addLast("decoder", DECODER)
.addLast("idleStateHandler", new IdleStateHandler(0, 0,
conf.connectionTimeoutMs() / 1000))
.addLast("handler", channelHandler);
return channelHandler;
} catch (RuntimeException e) {
logger.error("Error while initializing Netty pipeline", e);
throw e;
}
複製程式碼
客戶端和服務端統一的訊息接收處理器 TransportChannelHandlerer 是這個函式:createChannelHandler(channel, channelRpcHandler)實現的,也即統一了這個netty的訊息接受處理,程式碼片段如下:
/**
* Creates the server- and client-side handler which is used to handle both RequestMessages and
* ResponseMessages. The channel is expected to have been successfully created, though certain
* properties (such as the remoteAddress()) may not be available yet.
*/
private TransportChannelHandler createChannelHandler(Channel channel, RpcHandler rpcHandler) {
TransportResponseHandler responseHandler = new
TransportResponseHandler(channel);
TransportClient client = new TransportClient(channel, responseHandler);
TransportRequestHandler requestHandler = new TransportRequestHandler(channel, client,
rpcHandler, conf.maxChunksBeingTransferred());
return new TransportChannelHandler(client, responseHandler, requestHandler,
conf.connectionTimeoutMs(), closeIdleConnections);
}
複製程式碼
不過transportClient對應的是TransportResponseHander,TransportServer對應的的是TransportRequestHander。 在進行訊息處理時,首先會經過TransportChannelHandler根據訊息型別進行處理器選擇,分別進行netty的訊息生命週期管理:
- exceptionCaught
- channelActive
- channelInactive
- channelRead
- userEventTriggered
1.2 transportClient對應的是ResponseMessage
客戶端一旦傳送訊息(均為Request訊息),就會在
private final Map<Long, RpcResponseCallback> outstandingRpcs;
private final Map<StreamChunkId, ChunkReceivedCallback> outstandingFetches
中快取,用於回撥處理。
1.3 transportServer對應的是RequestMessage
服務端接收訊息型別(均為Request訊息)
- ChunkFetchRequest
- RpcRequest
- OneWayMessage
- StremRequest
服務端響應型別(均為Response訊息):
- ChunkFetchSucess
- ChunkFetchFailure
- RpcResponse
- RpcFailure
2. Spark RpcEnv基礎設施
2.1 上層建築NettyRPCEnv
上層建築NettyRPCEnv,持有TransportContext引用,在TransportContext中傳入NettyRpcHandler實體,來實現netty通訊回撥Handler處理
- Dispatcher
- TransportContext
- TransPortClientFactroy
- TransportServer
- TransportConf
2.2 RpcEndPoint 與 RPCEndPointRef 端點
- RpcEndPoint 為服務端
- RPCEndPointRef 為客戶端
2.2 Dispacher 與 Inbox 與 Outbox
- 一個端點對應一個Dispacher,一個Inbox , 多個OutBox
- RpcEndpoint:RPC端點 ,Spark針對於每個節點(Client/Master/Worker)都稱之一個Rpc端點 ,且都實現RpcEndpoint介面,內部根據不同端點的需求,設計不同的訊息和不同的業務處理,如果需要傳送(詢問)則呼叫Dispatcher
- RpcEnv:RPC上下文環境,每個Rpc端點執行時依賴的上下文環境稱之為RpcEnv
- Dispatcher:訊息分發器,針對於RPC端點需要傳送訊息或者從遠端RPC接收到的訊息,分發至對應的指令收件箱/發件箱。如果指令接收方是自己存入收件箱,如果指令接收方為非自身端點,則放入發件箱
- Inbox:指令訊息收件箱,一個本地端點對應一個收件箱,Dispatcher在每次向Inbox存入訊息時,都將對應EndpointData加入內部待Receiver Queue中,另外Dispatcher建立時會啟動一個單獨執行緒進行輪詢Receiver Queue,進行收件箱訊息消費
- OutBox:指令訊息發件箱,一個遠端端點對應一個發件箱,當訊息放入Outbox後,緊接著將訊息通過TransportClient傳送出去。訊息放入發件箱以及傳送過程是在同一個執行緒中進行,這樣做的主要原因是遠端訊息分為RpcOutboxMessage, OneWayOutboxMessage兩種訊息,而針對於需要應答的訊息直接傳送且需要得到結果進行處理
- TransportClient:Netty通訊客戶端,根據OutBox訊息的receiver資訊,請求對應遠端TransportServer
- TransportServer:Netty通訊服務端,一個RPC端點一個TransportServer,接受遠端訊息後呼叫Dispatcher分發訊息至對應收發件箱
Spark在Endpoint的設計上核心設計即為Inbox與Outbox,其中Inbox核心要點為:
- 內部的處理流程拆分為多個訊息指令(InboxMessage)存放入Inbox
- 當Dispatcher啟動最後,會啟動一個名為【dispatcher-event-loop】的執行緒掃描Inbox待處理InboxMessage,並呼叫Endpoint根據InboxMessage型別做相應處理
- 當Dispatcher啟動最後,預設會向Inbox存入OnStart型別的InboxMessage,Endpoint在根據OnStart指令做相關的額外啟動工作,端點啟動後所有的工作都是對OnStart指令處理衍生出來的,因此可以說OnStart指令是相互通訊的源頭。
-
注意: 一個端點對應一個Dispacher,一個Inbox , 多個OutBox,可以看到 inbox在Dispacher 中且在EndPointData內部:
private final RpcHandler rpcHandler; /** * A message dispatcher, responsible for routing RPC messages to the appropriate endpoint(s). */ private[netty] class Dispatcher(nettyEnv: NettyRpcEnv) extends Logging { private class EndpointData( val name: String, val endpoint: RpcEndpoint, val ref: NettyRpcEndpointRef) { val inbox = new Inbox(ref, endpoint) } private val endpoints = new ConcurrentHashMap[String, EndpointData] private val endpointRefs = new ConcurrentHashMap[RpcEndpoint, RpcEndpointRef] // Track the receivers whose inboxes may contain messages. private val receivers = new LinkedBlockingQueue[EndpointData] 複製程式碼
-
注意: 一個端點對應一個Dispacher,一個Inbox , 多個OutBox,可以看到 OutBox在NettyRpcEnv內部:
private[netty] class NettyRpcEnv( val conf: SparkConf, javaSerializerInstance: JavaSerializerInstance, host: String, securityManager: SecurityManager) extends RpcEnv(conf) with Logging { private val dispatcher: Dispatcher = new Dispatcher(this) private val streamManager = new NettyStreamManager(this) private val transportContext = new TransportContext(transportConf, new NettyRpcHandler(dispatcher, this, streamManager)) /** * A map for [[RpcAddress]] and [[Outbox]]. When we are connecting to a remote [[RpcAddress]], * we just put messages to its [[Outbox]] to implement a non-blocking `send` method. */ private val outboxes = new ConcurrentHashMap[RpcAddress, Outbox]() 複製程式碼
2.3 Dispacher 與 Inbox 與 Outbox
Dispatcher的程式碼片段中,包含了核心的訊息傳送程式碼邏輯,意思是:向服務端傳送一條訊息,也即同時放進Dispatcher中的receiverrs中,也放進inbox的messages中。這個高層封裝,如Master和Worker端點發送訊息都是通過NettyRpcEnv中的 Dispatcher來實現的。在Dispatcher中有一個執行緒,叫做MessageLoop,實現訊息的及時處理。
/**
* Posts a message to a specific endpoint.
*
* @param endpointName name of the endpoint.
* @param message the message to post
* @param callbackIfStopped callback function if the endpoint is stopped.
*/
private def postMessage(
endpointName: String,
message: InboxMessage,
callbackIfStopped: (Exception) => Unit): Unit = {
val error = synchronized {
val data = endpoints.get(endpointName)
if (stopped) {
Some(new RpcEnvStoppedException())
} else if (data == null) {
Some(new SparkException(s"Could not find $endpointName."))
} else {
data.inbox.post(message)
receivers.offer(data)
None
}
}
複製程式碼
注意:預設第一條訊息為onstart,為什麼呢?看這裡:
看到下面的 new EndpointData(name, endpoint, endpointRef) 了嗎?
def registerRpcEndpoint(name: String, endpoint: RpcEndpoint): NettyRpcEndpointRef = {
val addr = RpcEndpointAddress(nettyEnv.address, name)
val endpointRef = new NettyRpcEndpointRef(nettyEnv.conf, addr, nettyEnv)
synchronized {
if (stopped) {
throw new IllegalStateException("RpcEnv has been stopped")
}
if (endpoints.putIfAbsent(name, new EndpointData(name, endpoint, endpointRef)) != null) {
throw new IllegalArgumentException(s"There is already an RpcEndpoint called $name")
}
val data = endpoints.get(name)
endpointRefs.put(data.endpoint, data.ref)
receivers.offer(data) // for the OnStart message
}
endpointRef
複製程式碼
}
注意EndpointData裡面包含了inbox,因此Inbox初始化的時候,放進了onstart
private class EndpointData(
val name: String,
val endpoint: RpcEndpoint,
val ref: NettyRpcEndpointRef) {
val inbox = new Inbox(ref, endpoint)
複製程式碼
}
onstart在Inbox初始化時出現了,注意每一個端點只有一個inbox,比如:master 節點。
2.4 傳送訊息流程為分為兩種,一種端點(Master)自己把訊息傳送到本地Inbox,一種端點(Master)接收到訊息後,通過TransPortRequestHander接收後處理,扔進Inbox
2.4.1 端點(Master)自己把訊息傳送到本地Inbox
- endpoint(Master) -> NettyRpcEnv-> Dispatcher -> postMessage -> MessageLoop(Dispatcher) -> inbox -> process -> endpoint.receiveAndReply
複製程式碼
解釋如下:端點通過自己的RPCEnv環境,向自己的Inbox中傳送訊息,然後交由Dispatch來進行訊息的處理,呼叫了端點自己的receiveAndReply方法
-
這裡著重講一下MessageLoop是什麼時候啟動的,參照Dispatcher的程式碼段如下,一旦初始化就會啟動,因為是成員變數:
private val threadpool: ThreadPoolExecutor = { val numThreads = nettyEnv.conf.getInt("spark.rpc.netty.dispatcher.numThreads", math.max(2, Runtime.getRuntime.availableProcessors())) val pool = ThreadUtils.newDaemonFixedThreadPool(numThreads, "dispatcher-event-loop") for (i <- 0 until numThreads) { pool.execute(new MessageLoop) } pool } 複製程式碼
-
接著講nettyRpcEnv是何時初始化的,Dispatcher是何時初始化的?
master初始化RpcEnv環境時,呼叫NettyRpcEnvFactory().create(config)進行初始化nettyRpcEnv,然後其成員變數Dispatcher開始初始化,然後Dispatcher內部成員變數threadpool開始啟動messageLoop,然後開始處理訊息,可謂是一環套一環啊。如下是Master端點初始化RPCEnv。
在NettyRpcEnv中,NettyRpcEnvFactory的create方法如下:其中nettyRpcEnv.startServer,程式碼段如下,然後呼叫底層 transportContext.createServer來建立Server,並初始化netty 的 pipeline:
server = transportContext.createServer(host, port, bootstraps)
dispatcher.registerRpcEndpoint(
RpcEndpointVerifier.NAME, new RpcEndpointVerifier(this, dispatcher))
複製程式碼
最終端點開始不斷向自己的Inboxz中傳送訊息即可,程式碼段如下:
private def postMessage(
endpointName: String,
message: InboxMessage,
callbackIfStopped: (Exception) => Unit): Unit = {
error = synchronized {
val data = endpoints.get(endpointName)
if (stopped) {
Some(new RpcEnvStoppedException())
} else if (data == null) {
Some(new SparkException(s"Could not find $endpointName."))
} else {
data.inbox.post(message)
receivers.offer(data)
None
}
}
複製程式碼
2.4.2 端點(Master)接收到訊息後,通過TransPortRequestHander接收後處理,扔進Inbox
- endpointRef(Worker) ->TransportChannelHandler -> channelRead0 -> TransPortRequestHander -> handle -> processRpcRequest ->NettyRpcHandler(在NettyRpcEnv中) -> receive -> internalReceive -> dispatcher.postToAll(RemoteProcessConnected(remoteEnvAddress)) (響應)-> dispatcher.postRemoteMessage(messageToDispatch, callback) (傳送遠端來的訊息放進inbox)-> postMessage -> inbox -> process
複製程式碼
如下圖展示了整個訊息接收到inbox的流程:
下圖展示了 TransportChannelHandler接收訊息:
@Override
public void channelRead0(ChannelHandlerContext ctx, Message request) throws Exception {
if (request instanceof RequestMessage) {
requestHandler.handle((RequestMessage) request);
} else {
responseHandler.handle((ResponseMessage) request);
}
}
複製程式碼
然後TransPortRequestHander來進行訊息匹配處理:
最終交給inbox的process方法,實際上由端點 endpoint.receiveAndReply(context)方法處理:
/**
* Process stored messages.
*/
def process(dispatcher: Dispatcher): Unit = {
var message: InboxMessage = null
inbox.synchronized {
if (!enableConcurrent && numActiveThreads != 0) {
return
}
message = messages.poll()
if (message != null) {
numActiveThreads += 1
} else {
return
}
}
while (true) {
safelyCall(endpoint) {
message match {
case RpcMessage(_sender, content, context) =>
try {
endpoint.receiveAndReply(context).applyOrElse[Any, Unit](content, { msg =>
throw new SparkException(s"Unsupported message $message from ${_sender}")
})
} catch {
case NonFatal(e) =>
context.sendFailure(e)
// Throw the exception -- this exception will be caught by the safelyCall function.
// The endpoint's onError function will be called.
throw e
}
case OneWayMessage(_sender, content) =>
endpoint.receive.applyOrElse[Any, Unit](content, { msg =>
throw new SparkException(s"Unsupported message $message from ${_sender}")
})
case OnStart =>
endpoint.onStart()
if (!endpoint.isInstanceOf[ThreadSafeRpcEndpoint]) {
inbox.synchronized {
if (!stopped) {
enableConcurrent = true
}
}
}
case OnStop =>
val activeThreads = inbox.synchronized { inbox.numActiveThreads }
assert(activeThreads == 1,
s"There should be only a single active thread but found $activeThreads threads.")
dispatcher.removeRpcEndpointRef(endpoint)
endpoint.onStop()
assert(isEmpty, "OnStop should be the last message")
case RemoteProcessConnected(remoteAddress) =>
endpoint.onConnected(remoteAddress)
case RemoteProcessDisconnected(remoteAddress) =>
endpoint.onDisconnected(remoteAddress)
case RemoteProcessConnectionError(cause, remoteAddress) =>
endpoint.onNetworkError(cause, remoteAddress)
}
}
inbox.synchronized {
// "enableConcurrent" will be set to false after `onStop` is called, so we should check it
// every time.
if (!enableConcurrent && numActiveThreads != 1) {
// If we are not the only one worker, exit
numActiveThreads -= 1
return
}
message = messages.poll()
if (message == null) {
numActiveThreads -= 1
return
}
}
}
複製程式碼
}
3 結語
本文花了將近兩天時間進行剖析Spark的 Rpc 工作原理,真是不容易,關鍵是你看懂了嗎?歡迎評論
秦凱新