1. 程式人生 > >Spark1.6-----原始碼解讀之BlockManager元件shuffle服務和客戶端

Spark1.6-----原始碼解讀之BlockManager元件shuffle服務和客戶端

spark是分散式部署的,每個Task最終都執行在不同的機器節點上,map任務的輸出結果直接儲存到map任務所在的機器的儲存體系,reduce極有可能不再同一個機器上執行,所以需要遠端下載map任務的中間輸出。所以儲存系統中也包含ShuffleClient。

在BlockManager 176行initialize方法中初始化了shuffleClient:

  def initialize(appId: String): Unit = {
    blockTransferService.init(this)
    shuffleClient.init(appId)

預設為NettyBlockTransferService 在其53行,初始化步驟為:

建立RpcServer

構造TransportContext

建立Rpc客戶端工廠TransportClientFactory

建立Netty伺服器TransportServer

  override def init(blockDataManager: BlockDataManager): Unit = {
    val rpcHandler = new NettyBlockRpcServer(conf.getAppId, serializer, blockDataManager)
    var serverBootstrap: Option[TransportServerBootstrap] = None
    var clientBootstrap: Option[TransportClientBootstrap] = None
    if (authEnabled) {
      serverBootstrap = Some(new SaslServerBootstrap(transportConf, securityManager))
      clientBootstrap = Some(new SaslClientBootstrap(transportConf, conf.getAppId, securityManager,
        securityManager.isSaslEncryptionEnabled()))
    }
    transportContext = new TransportContext(transportConf, rpcHandler)
    clientFactory = transportContext.createClientFactory(clientBootstrap.toSeq.asJava)
    server = createServer(serverBootstrap.toList)
    appId = conf.getAppId
    logInfo("Server created on " + server.getPort)
  }

首先來看RpcServer     NettyBlockRpcServer的48行receive方法:

可以看出該Rpc能提供檔案的上傳和下載

  override def receive(
      client: TransportClient,
      rpcMessage: ByteBuffer,
      responseContext: RpcResponseCallback): Unit = {
    val message = BlockTransferMessage.Decoder.fromByteBuffer(rpcMessage)
    logTrace(s"Received request: $message")

    message match {
     //下載Block操作
      case openBlocks: OpenBlocks =>
        val blocks: Seq[ManagedBuffer] =
          openBlocks.blockIds.map(BlockId.apply).map(blockManager.getBlockData)
        val streamId = streamManager.registerStream(appId, blocks.iterator.asJava)
        logTrace(s"Registered streamId $streamId with ${blocks.size} buffers")
        responseContext.onSuccess(new StreamHandle(streamId, blocks.size).toByteBuffer)
      //上傳Block操作
      case uploadBlock: UploadBlock =>
        // StorageLevel is serialized as bytes using our JavaSerializer.
        val level: StorageLevel =
          serializer.newInstance().deserialize(ByteBuffer.wrap(uploadBlock.metadata))
        val data = new NioManagedBuffer(ByteBuffer.wrap(uploadBlock.blockData))
        blockManager.putBlockData(BlockId(uploadBlock.blockId), data, level)
        responseContext.onSuccess(ByteBuffer.allocate(0))
    }
  }

NettyBlockTransferService 62行建立TransportContext 

    transportContext = new TransportContext(transportConf, rpcHandler)

在TransportContext  71行:

TransportContext 既能建立Netty服務也能建立Netty訪問客戶端,由以下部分組成:

TransportConf:主要控制Netty框架提供的shuffle的I/O互動的客戶端和服務執行緒數量

RpcHandler:負責shuffle的I/O服務端在接到客戶端的RPC請求後提供Block的上傳或者下載處理。此處為NettyBlockRpcServer。

decoder:shuffle的I/O服務端對客戶端的ByteBuf進行解析。

encoder:shuffle的I/O客戶端對訊息進行編碼。

  public TransportContext(
      TransportConf conf,
      RpcHandler rpcHandler,
      boolean closeIdleConnections) {
    this.conf = conf;
    this.rpcHandler = rpcHandler;
    this.encoder = new MessageEncoder();
    this.decoder = new MessageDecoder();
    this.closeIdleConnections = closeIdleConnections;
  }

NettyBlockTransferService  63行建立Rpc客戶端工廠類:

TransportClient用於向Netty服務端傳送Rpc請求。

    clientFactory = transportContext.createClientFactory(clientBootstrap.toSeq.asJava)

建立Netty伺服器TransportServer 64行:

TransportServer 提供Netty實現的服務端,用於Rpc服務上傳和下載。

    server = createServer(serverBootstrap.toList)

獲取遠端Shuffle檔案 

NettyBlockTransferService 80行:通過建立TransportClient的曲連線遠端的Netty服務端來獲取資料

  override def fetchBlocks(
      host: String,
      port: Int,
      execId: String,
      blockIds: Array[String],
      listener: BlockFetchingListener): Unit = {
    logTrace(s"Fetch blocks from $host:$port (executor id $execId)")
    try {
      val blockFetchStarter = new RetryingBlockFetcher.BlockFetchStarter {
        override def createAndStart(blockIds: Array[String], listener: BlockFetchingListener) {
         //建立客戶端去連線Netty服務端
          val client = clientFactory.createClient(host, port)
          //去獲取資料
          new OneForOneBlockFetcher(client, appId, execId, blockIds.toArray, listener).start()
        }
      }

      val maxRetries = transportConf.maxIORetries()
      if (maxRetries > 0) {
        // Note this Fetcher will correctly handle maxRetries == 0; we avoid it just in case there's
        // a bug in this code. We should remove the if statement once we're sure of the stability.
        new RetryingBlockFetcher(transportConf, blockFetchStarter, blockIds, listener).start()
      } else {
        //呼叫上面的方法createAndStart
        blockFetchStarter.createAndStart(blockIds, listener)
      }
    } catch {
      case e: Exception =>
        logError("Exception while beginning fetchBlocks", e)
        blockIds.foreach(listener.onBlockFetchFailure(_, e))
    }
  }

上傳Shuffle檔案

NettyBlockTransferService  114行:

建立Netty客戶端。

將Block的儲存級別序列化

將Block的ByteBuffer轉為陣列,好序列化。

將這些資訊封裝為UploadBlock,並序列化。

同sendRpc方法上傳。

override def uploadBlock(
      hostname: String,
      port: Int,
      execId: String,
      blockId: BlockId,
      blockData: ManagedBuffer,
      level: StorageLevel): Future[Unit] = {
    val result = Promise[Unit]()
    val client = clientFactory.createClient(hostname, port)

    // StorageLevel is serialized as bytes using our JavaSerializer. Everything else is encoded
    // using our binary protocol.
    val levelBytes = serializer.newInstance().serialize(level).array()

    // Convert or copy nio buffer into array in order to serialize it.
    val nioBuffer = blockData.nioByteBuffer()
    val array = if (nioBuffer.hasArray) {
      nioBuffer.array()
    } else {
      val data = new Array[Byte](nioBuffer.remaining())
      nioBuffer.get(data)
      data
    }

    client.sendRpc(new UploadBlock(appId, execId, blockId.toString, levelBytes, array).toByteBuffer,
      new RpcResponseCallback {
        override def onSuccess(response: ByteBuffer): Unit = {
          logTrace(s"Successfully uploaded block $blockId")
          result.success((): Unit)
        }
        override def onFailure(e: Throwable): Unit = {
          logError(s"Error while uploading block $blockId", e)
          result.failure(e)
        }
      })

    result.future
  }