Spark1.6-----原始碼解讀之BlockManager元件shuffle服務和客戶端
spark是分散式部署的,每個Task最終都執行在不同的機器節點上,map任務的輸出結果直接儲存到map任務所在的機器的儲存體系,reduce極有可能不再同一個機器上執行,所以需要遠端下載map任務的中間輸出。所以儲存系統中也包含ShuffleClient。
在BlockManager 176行initialize方法中初始化了shuffleClient:
def initialize(appId: String): Unit = {
blockTransferService.init(this)
shuffleClient.init(appId)
預設為NettyBlockTransferService 在其53行,初始化步驟為:
建立RpcServer
構造TransportContext
建立Rpc客戶端工廠TransportClientFactory
建立Netty伺服器TransportServer
override def init(blockDataManager: BlockDataManager): Unit = { val rpcHandler = new NettyBlockRpcServer(conf.getAppId, serializer, blockDataManager) var serverBootstrap: Option[TransportServerBootstrap] = None var clientBootstrap: Option[TransportClientBootstrap] = None if (authEnabled) { serverBootstrap = Some(new SaslServerBootstrap(transportConf, securityManager)) clientBootstrap = Some(new SaslClientBootstrap(transportConf, conf.getAppId, securityManager, securityManager.isSaslEncryptionEnabled())) } transportContext = new TransportContext(transportConf, rpcHandler) clientFactory = transportContext.createClientFactory(clientBootstrap.toSeq.asJava) server = createServer(serverBootstrap.toList) appId = conf.getAppId logInfo("Server created on " + server.getPort) }
首先來看RpcServer NettyBlockRpcServer的48行receive方法:
可以看出該Rpc能提供檔案的上傳和下載
override def receive( client: TransportClient, rpcMessage: ByteBuffer, responseContext: RpcResponseCallback): Unit = { val message = BlockTransferMessage.Decoder.fromByteBuffer(rpcMessage) logTrace(s"Received request: $message") message match { //下載Block操作 case openBlocks: OpenBlocks => val blocks: Seq[ManagedBuffer] = openBlocks.blockIds.map(BlockId.apply).map(blockManager.getBlockData) val streamId = streamManager.registerStream(appId, blocks.iterator.asJava) logTrace(s"Registered streamId $streamId with ${blocks.size} buffers") responseContext.onSuccess(new StreamHandle(streamId, blocks.size).toByteBuffer) //上傳Block操作 case uploadBlock: UploadBlock => // StorageLevel is serialized as bytes using our JavaSerializer. val level: StorageLevel = serializer.newInstance().deserialize(ByteBuffer.wrap(uploadBlock.metadata)) val data = new NioManagedBuffer(ByteBuffer.wrap(uploadBlock.blockData)) blockManager.putBlockData(BlockId(uploadBlock.blockId), data, level) responseContext.onSuccess(ByteBuffer.allocate(0)) } }
NettyBlockTransferService 62行建立TransportContext
transportContext = new TransportContext(transportConf, rpcHandler)
在TransportContext 71行:
TransportContext 既能建立Netty服務也能建立Netty訪問客戶端,由以下部分組成:
TransportConf:主要控制Netty框架提供的shuffle的I/O互動的客戶端和服務執行緒數量
RpcHandler:負責shuffle的I/O服務端在接到客戶端的RPC請求後提供Block的上傳或者下載處理。此處為NettyBlockRpcServer。
decoder:shuffle的I/O服務端對客戶端的ByteBuf進行解析。
encoder:shuffle的I/O客戶端對訊息進行編碼。
public TransportContext(
TransportConf conf,
RpcHandler rpcHandler,
boolean closeIdleConnections) {
this.conf = conf;
this.rpcHandler = rpcHandler;
this.encoder = new MessageEncoder();
this.decoder = new MessageDecoder();
this.closeIdleConnections = closeIdleConnections;
}
NettyBlockTransferService 63行建立Rpc客戶端工廠類:
TransportClient用於向Netty服務端傳送Rpc請求。
clientFactory = transportContext.createClientFactory(clientBootstrap.toSeq.asJava)
建立Netty伺服器TransportServer 64行:
TransportServer 提供Netty實現的服務端,用於Rpc服務上傳和下載。
server = createServer(serverBootstrap.toList)
獲取遠端Shuffle檔案
NettyBlockTransferService 80行:通過建立TransportClient的曲連線遠端的Netty服務端來獲取資料
override def fetchBlocks(
host: String,
port: Int,
execId: String,
blockIds: Array[String],
listener: BlockFetchingListener): Unit = {
logTrace(s"Fetch blocks from $host:$port (executor id $execId)")
try {
val blockFetchStarter = new RetryingBlockFetcher.BlockFetchStarter {
override def createAndStart(blockIds: Array[String], listener: BlockFetchingListener) {
//建立客戶端去連線Netty服務端
val client = clientFactory.createClient(host, port)
//去獲取資料
new OneForOneBlockFetcher(client, appId, execId, blockIds.toArray, listener).start()
}
}
val maxRetries = transportConf.maxIORetries()
if (maxRetries > 0) {
// Note this Fetcher will correctly handle maxRetries == 0; we avoid it just in case there's
// a bug in this code. We should remove the if statement once we're sure of the stability.
new RetryingBlockFetcher(transportConf, blockFetchStarter, blockIds, listener).start()
} else {
//呼叫上面的方法createAndStart
blockFetchStarter.createAndStart(blockIds, listener)
}
} catch {
case e: Exception =>
logError("Exception while beginning fetchBlocks", e)
blockIds.foreach(listener.onBlockFetchFailure(_, e))
}
}
上傳Shuffle檔案
NettyBlockTransferService 114行:
建立Netty客戶端。
將Block的儲存級別序列化
將Block的ByteBuffer轉為陣列,好序列化。
將這些資訊封裝為UploadBlock,並序列化。
同sendRpc方法上傳。
override def uploadBlock(
hostname: String,
port: Int,
execId: String,
blockId: BlockId,
blockData: ManagedBuffer,
level: StorageLevel): Future[Unit] = {
val result = Promise[Unit]()
val client = clientFactory.createClient(hostname, port)
// StorageLevel is serialized as bytes using our JavaSerializer. Everything else is encoded
// using our binary protocol.
val levelBytes = serializer.newInstance().serialize(level).array()
// Convert or copy nio buffer into array in order to serialize it.
val nioBuffer = blockData.nioByteBuffer()
val array = if (nioBuffer.hasArray) {
nioBuffer.array()
} else {
val data = new Array[Byte](nioBuffer.remaining())
nioBuffer.get(data)
data
}
client.sendRpc(new UploadBlock(appId, execId, blockId.toString, levelBytes, array).toByteBuffer,
new RpcResponseCallback {
override def onSuccess(response: ByteBuffer): Unit = {
logTrace(s"Successfully uploaded block $blockId")
result.success((): Unit)
}
override def onFailure(e: Throwable): Unit = {
logError(s"Error while uploading block $blockId", e)
result.failure(e)
}
})
result.future
}