1. 程式人生 > >[spark] Shuffle Read解析 (Sort Based Shuffle)

[spark] Shuffle Read解析 (Sort Based Shuffle)

本文將講解shuffle Reduce部分,shuffle的下游Stage的第一個rdd是ShuffleRDD,通過其compute方法來獲取上游Stage Shuffle Write溢寫到磁碟檔案資料的一個迭代器:

 override def compute(split: Partition, context: TaskContext): Iterator[(K, C)] = {
    val dep = dependencies.head.asInstanceOf[ShuffleDependency[K, V, C]]
    SparkEnv.get.shuffleManager.getReader(dep.shuffleHandle, split.index, split.index + 1
, context) .read() .asInstanceOf[Iterator[(K, C)]] }

從SparkEnv中獲取shuffleManager(這裡是SortShuffleManager),通過manager獲取Reader並呼叫其read方法來得到一個迭代器。

override def getReader[K, C](
      handle: ShuffleHandle,
      startPartition: Int,
      endPartition: Int,
      context: TaskContext): ShuffleReader[K, C] = {
    new BlockStoreShuffleReader(
      handle.asInstanceOf[BaseShuffleHandle[K, _, C]], startPartition, endPartition, context)
  }

getReader方法例項化了一個BlockStoreShuffleReader,引數有需要獲取分割槽對應的partitionId,看看起read方法:

 override def read(): Iterator[Product2[K, C]] = {
    val blockFetcherItr = new ShuffleBlockFetcherIterator(
      context,
      blockManager.shuffleClient,
      blockManager,
      // 獲取儲存資料位置的元資料
      mapOutputTracker.getMapSizesByExecutorId
(handle.shuffleId, startPartition, endPartition), // 每次遠端請求傳輸的最大大小 SparkEnv.get.conf.getSizeAsMb("spark.reducer.maxSizeInFlight", "48m") * 1024 * 1024, SparkEnv.get.conf.getInt("spark.reducer.maxReqsInFlight", Int.MaxValue)) // 用壓縮加密來包裝流 val wrappedStreams = blockFetcherItr.map { case (blockId, inputStream) => serializerManager.wrapStream(blockId, inputStream) } val serializerInstance = dep.serializer.newInstance() // 對每個流生成K/V迭代器 val recordIter = wrappedStreams.flatMap { wrappedStream => serializerInstance.deserializeStream(wrappedStream).asKeyValueIterator } // 每條記錄讀取後更新任務度量 val readMetrics = context.taskMetrics.createTempShuffleReadMetrics() // 生成完整的迭代器 val metricIter = CompletionIterator[(Any, Any), Iterator[(Any, Any)]]( recordIter.map { record => readMetrics.incRecordsRead(1) record }, context.taskMetrics().mergeShuffleReadMetrics()) // An interruptible iterator must be used here in order to support task cancellation val interruptibleIter = new InterruptibleIterator[(Any, Any)](context, metricIter) val aggregatedIter: Iterator[Product2[K, C]] = if (dep.aggregator.isDefined) { if (dep.mapSideCombine) { // 在map端已經聚合一次了 val combinedKeyValuesIterator = interruptibleIter.asInstanceOf[Iterator[(K, C)]] dep.aggregator.get.combineCombinersByKey(combinedKeyValuesIterator, context) } else { // 只在reduce端聚合 val keyValuesIterator = interruptibleIter.asInstanceOf[Iterator[(K, Nothing)]] dep.aggregator.get.combineValuesByKey(keyValuesIterator, context) } } else { require(!dep.mapSideCombine, "Map-side combine without Aggregator specified!") interruptibleIter.asInstanceOf[Iterator[Product2[K, C]]] } // 若需要全域性排序 dep.keyOrdering match { case Some(keyOrd: Ordering[K]) => val sorter = new ExternalSorter[K, C, C](context, ordering = Some(keyOrd), serializer = dep.serializer) sorter.insertAll(aggregatedIter) context.taskMetrics().incMemoryBytesSpilled(sorter.memoryBytesSpilled) context.taskMetrics().incDiskBytesSpilled(sorter.diskBytesSpilled) context.taskMetrics().incPeakExecutionMemory(sorter.peakMemoryUsedBytes) CompletionIterator[Product2[K, C], Iterator[Product2[K, C]]](sorter.iterator, sorter.stop()) case None => aggregatedIter } }

首先例項化了ShuffleBlockFetcherIterator物件,其中一個引數:

mapOutputTracker.getMapSizesByExecutorId(handle.shuffleId, startPartition, endPartition)

該方法獲取reduce端資料的來源的元資料,返回的是 Seq[(BlockManagerId, Seq[(BlockId, Long)])],即資料是來自於哪個節點的哪些block的,並且block的資料大小是多少,看看getMapSizesByExecutorId是怎麼實現的:

def getMapSizesByExecutorId(shuffleId: Int, startPartition: Int, endPartition: Int)
      : Seq[(BlockManagerId, Seq[(BlockId, Long)])] = {
    logDebug(s"Fetching outputs for shuffle $shuffleId, partitions $startPartition-$endPartition")
    // 獲取元資料資訊
    val statuses = getStatuses(shuffleId)
    // 轉換格式並得到指定partition的元資料資訊
    statuses.synchronized {
      return MapOutputTracker.convertMapStatuses(shuffleId, startPartition, endPartition, statuses)
    }
  }
  • 傳入shuffleId獲取對應shuffle的所有元資料資訊
  • 轉換格式並獲取指定partition的元資料

跟進getStatuses:

private def getStatuses(shuffleId: Int): Array[MapStatus] = {
    // 直接從mapStatuses中獲取
    val statuses = mapStatuses.get(shuffleId).orNull
    if (statuses == null) {
      logInfo("Don't have map outputs for shuffle " + shuffleId + ", fetching them")
      val startTime = System.currentTimeMillis
      var fetchedStatuses: Array[MapStatus] = null
      ......
      if (fetchedStatuses == null) {
        // We won the race to fetch the statuses; do so
        logInfo("Doing the fetch; tracker endpoint = " + trackerEndpoint)
        // This try-finally prevents hangs due to timeouts:
        try {
          // 從遠端獲取元資料
          val fetchedBytes = askTracker[Array[Byte]](GetMapOutputStatuses(shuffleId))
          // 反序列化
          fetchedStatuses = MapOutputTracker.deserializeMapStatuses(fetchedBytes)
          logInfo("Got the output locations")
          // 加入mapStatus
          mapStatuses.put(shuffleId, fetchedStatuses)
        } finally {
          fetching.synchronized {
            fetching -= shuffleId
            fetching.notifyAll()
          }
        }
      } 
     .....
      }
    } else {
      return statuses
    }
  }

若能從mapStatuses獲取到則直接返回,若不能則向mapOutputTrackerMaster通訊傳送GetMapOutputStatuses訊息來獲取元資料。

我們知道一個Executor對應一個CoarseGrainedExecutorBackend,構建CoarseGrainedExecutorBackend的時候會建立一個SparkEnv,建立SparkEnv的時候會建立一個mapOutputTracker,即mapOutputTracker和Executor一一對應,也就是每一個Executor都有一個mapOutputTracker來維護元資料資訊。

這裡的mapStatuses就是mapOutputTracker儲存元資料資訊的,mapOutputTracker和Executor一一對應,在該Executor上完成的Shuffle Write的元資料資訊都會儲存在其mapStatus裡面,另外通過遠端獲取的其他Executor上完成的Shuffle Write的元資料資訊也會在當前的mapStatuses中儲存。

Executor對應的是mapOutputTrackerWorker,而Driver對應的是mapOutputTrackerMaster,兩者都是在例項化SparkEnv的時候建立的,每個在Executor上完成的Shuffle Task的結果都會註冊到driver端的mapOutputTrackerMaster中,即driver端的mapOutputTrackerMaster的mapStatuses儲存這所有元資料資訊,所以當一個Executor上的任務需要獲取一個shuffle的輸出時,會先在自己的mapStatuses中查詢,找不到再和mapOutputTrackerMaster通訊獲取元資料。

mapOutputTrackerMaster收到訊息後的處理邏輯:

case GetMapOutputStatuses(shuffleId: Int) =>
      val hostPort = context.senderAddress.hostPort
      logInfo("Asked to send map output locations for shuffle " + shuffleId + " to " + hostPort)
      val mapOutputStatuses = tracker.post(new GetMapOutputMessage(shuffleId, context))

呼叫了tracker的post方法:

 def post(message: GetMapOutputMessage): Unit = {
    mapOutputRequests.offer(message)
  }

將該Message加入了mapOutputRequests中,mapOutputRequests是一個鏈式阻塞佇列,在mapOutputTrackerMaster初始化的時候專門啟動了一個執行緒池來執行這些請求:

private val threadpool: ThreadPoolExecutor = {
    val numThreads = conf.getInt("spark.shuffle.mapOutput.dispatcher.numThreads", 8)
    val pool = ThreadUtils.newDaemonFixedThreadPool(numThreads, "map-output-dispatcher")
    for (i <- 0 until numThreads) {
      pool.execute(new MessageLoop)
    }
    pool
  }

看看執行緒處理類MessageLoop的run方法是怎麼定義的:

private class MessageLoop extends Runnable {
    override def run(): Unit = {
      try {
        while (true) {
          try {
            // 取出一個GetMapOutputMessage
            val data = mapOutputRequests.take()
             if (data == PoisonPill) {
              // Put PoisonPill back so that other MessageLoops can see it.
              mapOutputRequests.offer(PoisonPill)
              return
            }
            val context = data.context
            val shuffleId = data.shuffleId
            val hostPort = context.senderAddress.hostPort
            logDebug("Handling request to send map output locations for shuffle " + shuffleId +
              " to " + hostPort)
            // 通過shuffleId獲取對應序列化後的元資料資訊
            val mapOutputStatuses = getSerializedMapOutputStatuses(shuffleId)
            // 返回資料
            context.reply(mapOutputStatuses)
          } catch {
            case NonFatal(e) => logError(e.getMessage, e)
          }
        }
      } catch {
        case ie: InterruptedException => // exit
      }
    }
  }

通過shuffleId獲取對應序列化後的元資料資訊並返回,具體看看getSerializedMapOutputStatuses的實現:

def getSerializedMapOutputStatuses(shuffleId: Int): Array[Byte] = {
    var statuses: Array[MapStatus] = null
    var retBytes: Array[Byte] = null
    var epochGotten: Long = -1

    // 從cache中檢索出MapStatus,若沒有則從mapStatuses中獲取
    def checkCachedStatuses(): Boolean = {
      epochLock.synchronized {
        if (epoch > cacheEpoch) {
          cachedSerializedStatuses.clear()
          clearCachedBroadcast()
          cacheEpoch = epoch
        }
        cachedSerializedStatuses.get(shuffleId) match {
          case Some(bytes) =>
            retBytes = bytes
            true
          case None =>
            logDebug("cached status not found for : " + shuffleId)
            statuses = mapStatuses.getOrElse(shuffleId, Array.empty[MapStatus])
            epochGotten = epoch
            false
        }
      }
    }

    if (checkCachedStatuses()) return retBytes
    var shuffleIdLock = shuffleIdLocks.get(shuffleId)
    if (null == shuffleIdLock) {
      val newLock = new Object()
      // in general, this condition should be false - but good to be paranoid
      val prevLock = shuffleIdLocks.putIfAbsent(shuffleId, newLock)
      shuffleIdLock = if (null != prevLock) prevLock else newLock
    }
    // synchronize so we only serialize/broadcast it once since multiple threads call
    // in parallel
    shuffleIdLock.synchronized {
      if (checkCachedStatuses()) return retBytes

      // 序列化statues
      val (bytes, bcast) = MapOutputTracker.serializeMapStatuses(statuses, broadcastManager,
        isLocal, minSizeForBroadcast)
      logInfo("Size of output statuses for shuffle %d is %d bytes".format(shuffleId, bytes.length))
      // Add them into the table only if the epoch hasn't changed while we were working
      epochLock.synchronized {
        if (epoch == epochGotten) {
          cachedSerializedStatuses(shuffleId) = bytes
          if (null != bcast) cachedSerializedBroadcast(shuffleId) = bcast
        } else {
          logInfo("Epoch changed, not caching!")
          removeBroadcast(bcast)
        }
      }
      bytes
    }
  }

大體思路是先從快取中獲取元資料(MapStatuses),獲取到直接返回,若沒有則從mapStatuses獲取,獲取到後將其序列化後返回,隨後返回給mapOutputTrackerWorker(剛才與之通訊的節點),mapOutputTracker收到回覆後又將元資料序列化並加入當前Executor的mapStatuses中。

再回到getMapSizesByExecutorId方法中,getStatuses得到shuffleID對應的所有的元資料資訊後,通過convertMapStatuses方法將獲得的元資料資訊轉化成形如Seq[(BlockManagerId, Seq[(BlockId, Long)])]格式的位置資訊,用來讀取指定的分割槽的資料:

private def convertMapStatuses(
      shuffleId: Int,
      startPartition: Int,
      endPartition: Int,
      statuses: Array[MapStatus]): Seq[(BlockManagerId, Seq[(BlockId, Long)])] = {
    assert (statuses != null)
    // 儲存指定partition的元資料
    val splitsByAddress = new HashMap[BlockManagerId, ArrayBuffer[(BlockId, Long)]]
    for ((status, mapId) <- statuses.zipWithIndex) {
      if (status == null) {
        val errorMessage = s"Missing an output location for shuffle $shuffleId"
        logError(errorMessage)
        throw new MetadataFetchFailedException(shuffleId, startPartition, errorMessage)
      } else {
        for (part <- startPartition until endPartition) {
          splitsByAddress.getOrElseUpdate(status.location, ArrayBuffer()) +=
            ((ShuffleBlockId(shuffleId, mapId, part), status.getSizeForBlock(part)))
        }
      }
    }

    splitsByAddress.toSeq
  }

這裡的引數statuses:Array[MapStatus]是前面獲取的上游stage所有的shuffle Write 檔案的元資料,並且是按map端的partitionId排序的,通過zipWithIndex將元素和這個元素在陣列中的ID(索引號)組合成鍵/值對,這裡的索引號即是map端的partitionId,再根據shuffleId、mapPartitionId、reducePartitionId來構建ShuffleBlockId(在map端的ShuffleBlockId構建中的reducePartitionId始終是0,因為一個ShuffleMapTask就一個Block,而這裡加入的真正的reducePartitionId在後面通過index檔案獲取對應reduce端partition偏移量的時候需要用到),並估算得到對應資料的大小,因為後面獲取遠端資料的時候需要限制大小,最後返回位置資訊。

至此mapOutputTracker.getMapSizesByExecutorId(handle.shuffleId, startPartition, endPartition)方法完成,返回了指定分割槽對應的元資料MapStatus資訊。

在初始化物件ShuffleBlockFetcherIterator的時候呼叫了其初始化方法initialize():

private[this] def initialize(): Unit = {
    // Add a task completion callback (called in both success case and failure case) to cleanup.
    context.addTaskCompletionListener(_ => cleanup())

    // 區分local blocks和remote blocks並返回遠端請求FetchRequest
    val remoteRequests = splitLocalRemoteBlocks()
    // 將遠端請求隨機的加入到fetchRequests佇列中
    fetchRequests ++= Utils.randomize(remoteRequests)
    assert ((0 == reqsInFlight) == (0 == bytesInFlight),
      "expected reqsInFlight = 0 but found reqsInFlight = " + reqsInFlight +
      ", expected bytesInFlight = 0 but found bytesInFlight = " + bytesInFlight)

    // 從fetchRequests取出遠端請求,並使用sendRequest方法傳送請求
    fetchUpToMaxBytes()

    val numFetches = remoteRequests.size - fetchRequests.size
    logInfo("Started " + numFetches + " remote fetches in" + Utils.getUsedTimeMs(startTime))

    // 獲取本地blocks
    fetchLocalBlocks()
    logDebug("Got local blocks in " + Utils.getUsedTimeMs(startTime))
  }
  • 區分local blocks和remote blocks,並返回遠端請求FetchRequest加入到fetchRequests佇列中
  • 從fetchRequests取出遠端請求,並使用sendRequest方法傳送請求,獲取遠端資料
  • 獲取本地blocks

先看是怎麼區分local blocks和remote blocks的:

private[this] def splitLocalRemoteBlocks(): ArrayBuffer[FetchRequest] = {
    // 將一次能獲取的資料最大大小/5,目的是增加並行度,最大為5個並行度
    val targetRequestSize = math.max(maxBytesInFlight / 5, 1L)
    logDebug("maxBytesInFlight: " + maxBytesInFlight + ", targetRequestSize: " + targetRequestSize)

    // 儲存遠端請求的陣列
    val remoteRequests = new ArrayBuffer[FetchRequest]

    // Tracks total number of blocks (including zero sized blocks)
    var totalBlocks = 0
    for ((address, blockInfos) <- blocksByAddress) {
      totalBlocks += blockInfos.size
      // 若block所在executor就是當前executor,則判斷為本地,否則為遠端
      if (address.executorId == blockManager.blockManagerId.executorId) {
        // 過濾掉大小為0的blocks
        localBlocks ++= blockInfos.filter(_._2 != 0).map(_._1)
        numBlocksToFetch += localBlocks.size
      } else {
        val iterator = blockInfos.iterator
        var curRequestSize = 0L
        var curBlocks = new ArrayBuffer[(BlockId, Long)]
        while (iterator.hasNext) {
          val (blockId, size) = iterator.next()
          // Skip empty blocks
          if (size > 0) {
            curBlocks += ((blockId, size))
            remoteBlocks += blockId
            numBlocksToFetch += 1
            curRequestSize += size
          } else if (size < 0) {
            throw new BlockException(blockId, "Negative block size " + size)
          }
          // 當請求大小超過了限制,則建立一個FetchRequest並加入到remoteRequests中
          if (curRequestSize >= targetRequestSize) {
            // Add this FetchRequest
            remoteRequests += new FetchRequest(address, curBlocks)
            curBlocks = new ArrayBuffer[(BlockId, Long)]
            logDebug(s"Creating fetch request of $curRequestSize at $address")
            curRequestSize = 0
          }
        }
        // 將剩餘的blocks建立一個FetchRequest並加入到remoteRequests中
        if (curBlocks.nonEmpty) {
          remoteRequests += new FetchRequest(address, curBlocks)
        }
      }
    }
    logInfo(s"Getting $numBlocksToFetch non-empty blocks out of $totalBlocks blocks")
    remoteRequests
  }
  • 為了增加在遠端節點獲取資料的並行度,將一個請求的大小限制除以5作為最終的大小限制,即每次最多啟動5個執行緒去最多5個節點上讀取資料
  • 判斷是否是本地blocks的條件是block所在的executor和當前executor是否是同一個
  • 遍歷遠端資料節點(Executor節點)的blocks,在一個節點上的請求資料超過大小限制則構建一個FetchRequest並加入到remoteRequests中,最後返回遠端請求remoteRequests,這裡的FetchRequest是對一個請求資料的包裝,包括地址和blockId及大小

區分完local remote blocks後加入到了佇列fetchRequests中,並呼叫fetchUpToMaxBytes()來獲取遠端資料:

private def fetchUpToMaxBytes(): Unit = {
    // Send fetch requests up to maxBytesInFlight
    while (fetchRequests.nonEmpty &&
      (bytesInFlight == 0 ||
        (reqsInFlight + 1 <= maxReqsInFlight &&
          bytesInFlight + fetchRequests.front.size <= maxBytesInFlight))) {
      sendRequest(fetchRequests.dequeue())
    }
  }

從fetchRequests中取出FetchRequest,並呼叫了sendRequest方法:

 private[this] def sendRequest(req: FetchRequest) {
    logDebug("Sending request for %d blocks (%s) from %s".format(
      req.blocks.size, Utils.bytesToString(req.size), req.address.hostPort))
    bytesInFlight += req.size
    reqsInFlight += 1

    // 轉成map  Map[blockId,size]
    val sizeMap = req.blocks.map { case (blockId, size) => (blockId.toString, size) }.toMap
    val remainingBlocks = new HashSet[String]() ++= sizeMap.keys
    val blockIds = req.blocks.map(_._1.toString)

    val address = req.address
    // 通過shuffleClient的fetchBlocks方法來獲取對應遠端節點上的資料
    shuffleClient.fetchBlocks(address.host, address.port, address.executorId, blockIds.toArray,
      new BlockFetchingListener {
        // 將結果儲存到results中
        override def onBlockFetchSuccess(blockId: String, buf: ManagedBuffer): Unit = {
          // Only add the buffer to results queue if the iterator is not zombie,
          // i.e. cleanup() has not been called yet.
          ShuffleBlockFetcherIterator.this.synchronized {
            if (!isZombie) {
              // Increment the ref count because we need to pass this to a different thread.
              // This needs to be released after use.
              buf.retain()
              remainingBlocks -= blockId
              results.put(new SuccessFetchResult(BlockId(blockId), address, sizeMap(blockId), buf,
                remainingBlocks.isEmpty))
              logDebug("remainingBlocks: " + remainingBlocks)
            }
          }
          logTrace("Got remote block " + blockId + " after " + Utils.getUsedTimeMs(startTime))
        }

        override def onBlockFetchFailure(blockId: String, e: Throwable): Unit = {
          logError(s"Failed to get block(s) from ${req.address.host}:${req.address.port}", e)
          results.put(new FailureFetchResult(BlockId(blockId), address, e))
        }
      }
    )
  }

通過shuffleClient的fetchBlocks方法來獲取對應遠端節點上的資料,預設是通過NettyBlockTransferService的fetchBlocks方法實現的,不管是成功還是失敗都將構建SuccessFetchResult & FailureFetchResult 結果放入results中。

獲取完遠端的資料接著通過fetchLocalBlocks()方法來獲取本地的blocks資訊:

private[this] def fetchLocalBlocks() {
    val iter = localBlocks.iterator
    while (iter.hasNext) {
      val blockId = iter.next()
      try {
        val buf = blockManager.getBlockData(blockId)
        shuffleMetrics.incLocalBlocksFetched(1)
        shuffleMetrics.incLocalBytesRead(buf.size)
        buf.retain()
        results.put(new SuccessFetchResult(blockId, blockManager.blockManagerId, 0, buf, false))
      } catch {
        case e: Exception =>
          // If we see an exception, stop immediately.
          logError(s"Error occurred while fetching local blocks", e)
          results.put(new FailureFetchResult(blockId, blockManager.blockManagerId, e))
          return
      }
    }
  }

迭代需要獲取的block,直接從blockManager中獲取資料,並通過結果資料構建SuccessFetchResult或者FailureFetchResult放入results中,看看在blockManager.getBlockData(blockId)的實現:

override def getBlockData(blockId: BlockId): ManagedBuffer = {
    if (blockId.isShuffle) {
      shuffleManager.shuffleBlockResolver.getBlockData(blockId.asInstanceOf[ShuffleBlockId])
    } else {
      getLocalBytes(blockId) match {
        case Some(buffer) => new BlockManagerManagedBuffer(blockInfoManager, blockId, buffer)
        case None =>
          // If this block manager receives a request for a block that it doesn't have then it's
          // likely that the master has outdated block statuses for this block. Therefore, we send
          // an RPC so that this block is marked as being unavailable from this block manager.
          reportBlockStatus(blockId, BlockStatus.empty)
          throw new BlockNotFoundException(blockId.toString)
      }
    }
  }

再看看getBlockData方法:

override def getBlockData(blockId: ShuffleBlockId): ManagedBuffer = {
    // 根據ShuffleID和MapID獲取索引檔案
    val indexFile = getIndexFile(blockId.shuffleId, blockId.mapId)
    val in = new DataInputStream(new FileInputStream(indexFile))
    try {
      // 跳到對應Block的資料區
      ByteStreams.skipFully(in, blockId.reduceId * 8)
      // partition對應的開始offset
      val offset = in.readLong()
      // partition對應的結束offset
      val nextOffset = in.readLong()
      new FileSegmentManagedBuffer(
        transportConf,
        getDataFile(blockId.shuffleId, blockId.mapId),
        offset,
        nextOffset - offset)
    } finally {
      in.close()
    }
  }

根據shuffleId和mapId獲取index檔案,並建立一個讀檔案的檔案流,根據block的reduceId(上面獲取對應partition元資料的時候提到過)跳過對應的Block的資料區,先後獲取開始和結束的offset,然後在資料檔案中讀取資料。

得到所有資料結果result後,再回到read()方法中:

 override def read(): Iterator[Product2[K, C]] = {
    val blockFetcherItr = new ShuffleBlockFetcherIterator(
      context,
      blockManager.shuffleClient,
      blockManager,
      // 與mapOutputTrackerMaster通訊獲取儲存資料位置的元資料
      mapOutputTracker.getMapSizesByExecutorId(handle.shuffleId, startPartition, endPartition),
      // 每次傳輸的最大大小
      SparkEnv.get.conf.getSizeAsMb("spark.reducer.maxSizeInFlight", "48m") * 1024 * 1024,
      SparkEnv.get.conf.getInt("spark.reducer.maxReqsInFlight", Int.MaxValue))

    // 用壓縮加密來包裝流
    val wrappedStreams = blockFetcherItr.map { case (blockId, inputStream) =>
      serializerManager.wrapStream(blockId, inputStream)
    }

    val serializerInstance = dep.serializer.newInstance()

    // 對每個流生成K/V迭代器
    val recordIter = wrappedStreams.flatMap { wrappedStream =>
       serializerInstance.deserializeStream(wrappedStream).asKeyValueIterator
    }

    // 每條記錄讀取後更新任務度量
    val readMetrics = context.taskMetrics.createTempShuffleReadMetrics()
    // 生成完整的迭代器
    val metricIter = CompletionIterator[(Any, Any), Iterator[(Any, Any)]](
      recordIter.map { record =>
        readMetrics.incRecordsRead(1)
        record
      },
      context.taskMetrics().mergeShuffleReadMetrics())

    // An interruptible iterator must be used here in order to support task cancellation
    val interruptibleIter = new InterruptibleIterator[(Any, Any)](context, metricIter)

    val aggregatedIter: Iterator[Product2[K, C]] = if (dep.aggregator.isDefined) {
      if (dep.mapSideCombine) {
        // 在map端已經聚合一次了
        val combinedKeyValuesIterator = interruptibleIter.asInstanceOf[Iterator[(K, C)]]
        dep.aggregator.get.combineCombinersByKey(combinedKeyValuesIterator, context)
      } else {
        // 只在reduce端聚合
        val keyValuesIterator = interruptibleIter.asInstanceOf[Iterator[(K, Nothing)]]
        dep.aggregator.get.combineValuesByKey(keyValuesIterator, context)
      }
    } else {
      require(!dep.mapSideCombine, "Map-side combine without Aggregator specified!")
      interruptibleIter.asInstanceOf[Iterator[Product2[K, C]]]
    }

    // 若需要全域性排序
    dep.keyOrdering match {
      case Some(keyOrd: Ordering[K]) =>
        val sorter =
          new ExternalSorter[K, C, C](context, ordering = Some(keyOrd), serializer = dep.serializer)
        sorter.insertAll(aggregatedIter)
        context.taskMetrics().incMemoryBytesSpilled(sorter.memoryBytesSpilled)
        context.taskMetrics().incDiskBytesSpilled(sorter.diskBytesSpilled)
        context.taskMetrics().incPeakExecutionMemory(sorter.peakMemoryUsedBytes)
        CompletionIterator[Product2[K, C], Iterator[Product2[K, C]]](sorter.iterator, sorter.stop())
      case None =>
        aggregatedIter
    }
  }

這裡的ShuffleBlockFetcherIterator繼承了Iterator,results可以被迭代,在其next()方法中將FetchResult以(blockId,inputStream)的形式返回:

case SuccessFetchResult(blockId, address, _, buf, _) =>
        try {
          (result.blockId, new BufferReleasingInputStream(buf.createInputStream(), this))
        } catch {
          case NonFatal(t) =>
            throwFetchFailedException(blockId, address, t)
        }

在read()方法的後半部分會進行聚合和排序,和Shuffle Write部分很類似,這裡大致描述一下。

在需要聚合的前提下,有map端聚合的時候執行combineCombinersByKey,沒有則執行combineValuesByKey,但最終都呼叫了ExternalAppendOnlyMap的insertAll(iter)方法:

def combineCombinersByKey(
      iter: Iterator[_ <: Product2[K, C]],
      context: TaskContext): Iterator[(K, C)] = {
    val combiners = new ExternalAppendOnlyMap[K, C, C](identity, mergeCombiners, mergeCombiners)
    combiners.insertAll(iter)
    updateMetrics(context, combiners)
    combiners.iterator
  }
def combineValuesByKey(
      iter: Iterator[_ <: Product2[K, V]],
      context: TaskContext): Iterator[(K, C)] = {
    val combiners = new ExternalAppendOnlyMap[K, V, C](createCombiner, mergeValue, mergeCombiners)
    combiners.insertAll(iter)
    updateMetrics(context, combiners)
    combiners.iterator
  }
def insertAll(entries: Iterator[Product2[K, V]]): Unit = {
    if (currentMap == null) {
      throw new IllegalStateException(
        "Cannot insert new elements into a map after calling iterator")
    }
    // An update function for the map that we reuse across entries to avoid allocating
    // a new closure each time
    var curEntry: Product2[K, V] = null
    val update: (Boolean, C) => C = (hadVal, oldVal) => {
      if (hadVal) mergeValue(oldVal, curEntry._2) else createCombiner(curEntry._2)
    }

    while (entries.hasNext) {
      curEntry = entries.next()
      val estimatedSize = currentMap.estimateSize()
      if (estimatedSize > _peakMemoryUsedBytes) {
        _peakMemoryUsedBytes = estimatedSize
      }
      if (maybeSpill(currentMap, estimatedSize)) {
        currentMap = new SizeTrackingAppendOnlyMap[K, C]
      }
      currentMap.changeValue(curEntry._1, update)
      addElementsRead()
    }
  }

在裡面的迭代最終都會呼叫上面提到的ShuffleBlockFetcherIterator的next方法來獲取資料。

每次update&insert也會估算currentMap的大小,並判斷是否需要溢寫到磁碟檔案,若需要則將map中的資料根據定義的keyComparator對key進行排序後返回一個迭代器,然後寫到一個臨時的磁碟檔案,然後新建一個map來放新的資料。

執行完combiners[ExternalAppendOnlyMap]的insertAll後,呼叫其iterator來返回一個代表一個完整partition資料(記憶體及spillFile)的迭代器:

override def iterator: Iterator[(K, C)] = {
    if (currentMap == null) {
      throw new IllegalStateException(
        "ExternalAppendOnlyMap.iterator is destructive and should only be called once.")
    }
    if (spilledMaps.isEmpty) {
      CompletionIterator[(K, C), Iterator[(K, C)]](
        destructiveIterator(currentMap.iterator), freeCurrentMap())
    } else {
      new ExternalIterator()
    }
  }

跟進ExternalIterator類的例項化:

// A queue that maintains a buffer for each stream we are currently merging
    // This queue maintains the invariant that it only contains non-empty buffers
    private val mergeHeap = new mutable.PriorityQueue[StreamBuffer]

    // Input streams are derived both from the in-memory map and spilled maps on disk
    // The in-memory map is sorted in place, while the spilled maps are already in sorted order
    private val sortedMap = CompletionIterator[(K, C), Iterator[(K, C)]](destructiveIterator(
      currentMap.destructiveSortedIterator(keyComparator)), freeCurrentMap())
    private val inputStreams = (Seq(sortedMap) ++ spilledMaps).map(it => it.buffered)

    inputStreams.foreach { it =>
      val kcPairs = new ArrayBuffer[(K, C)]
      readNextHashCode(it, kcPairs)
      if (kcPairs.length > 0) {
        mergeHeap.enqueue(new StreamBuffer(it, kcPairs))
      }
    }

將currentMap中的資料經過排序後和spillFile資料的iterator組合在一起得到inputStreams ,迭代這個inputStreams ,將所有資料都儲存在mergeHeadp中,在ExternalIterator方法的next()方法中將被訪問到。

最後若需要對資料進行全域性的排序,則通過只有排序引數的ExternalSorter的insertAll方法來進行排序,和Shuffle Write一樣的這裡就不細講了。

最終返回一個指定partition所有資料的一個迭代器。

相關推薦

[spark] Shuffle Read解析 (Sort Based Shuffle)

本文將講解shuffle Reduce部分,shuffle的下游Stage的第一個rdd是ShuffleRDD,通過其compute方法來獲取上游Stage Shuffle Write溢寫到磁碟檔案資料的一個迭代器: override def com

[spark] Shuffle Write解析 (Sort Based Shuffle)

本文基於 Spark 2.1 進行解析 前言 從 Spark 2.0 開始移除了Hash Based Shuffle,想要了解可參考Shuffle 過程,本文將講解 Sort Based Shuffle。 ShuffleMapTask的結果(S

54:Spark中的Tungsten-sort Based Shuffle內幕

本期內容: 1. Tungsten-sort Based Shuffle原理 2. Tungsten-sort Based Shuffle原始碼 ShortShuffleManager Memory

Spark原始碼分析之Sort-Based Shuffle讀寫流程

override def read(): Iterator[Product2[K, C]] = {   // 構造ShuffleBlockFetcherIterator,一個迭代器,它獲取多個塊,對於本地塊,從本地讀取   // 對於遠端塊,通過遠端方法讀取val blockFetcherItr = new

Spark Tungsten-sort Based Shuffle 分析

Tungsten-sort 算不得一個全新的shuffle 方案,它在特定場景下基於類似現有的Sort Based Shuffle處理流程,對記憶體/CPU/Cache使用做了非常大的優化。帶來高效的同時,也就限定了自己的使用場景。如果Tungsten-s

Spark-1.6.0中的Sort Based Shuffle原始碼解讀

  從Spark-1.2.0開始,Spark的Shuffle由Hash Based Shuffle升級成了Sort Based Shuffle。即Spark.shuffle.manager從Hash換成了Sort。不同形式的Shuffle邏輯主要是Shuffle

Spark學習之11:Shuffle Read

本文描述ShuffleMapTask執行完成後,後續Stage執行時讀取Shuffle Write結果的過程。涉及Shuffle Read的RDD有ShuffledRDD、CoGroupedRDD等。 發起Shuffle Read的方法是這些RDD的compute方法。下面

spark調優-JVM調優+Shuffle調優

          JVM調優: 1 降低cache操作的記憶體佔比        spark中,堆記憶體又被劃分成了兩塊,一塊是專門用來給R

Spark-core-問題記錄:join shuffle

1、partitionBy:當hashCode為負時,拋異常:java.lang.ArrayIndexOutOfBoundsException,         at org.apache.spark.shuffle.sort.By

Spark專案實戰-troubleshooting之控制shuffle reduce端緩衝大小以避免OOM

一、reduce緩衝機制 如下,我們知道shuffle的map端task是不斷輸出資料的,資料量可能是很大的。 但是其實reduce端的task,並不是等到map端task將屬於自己的那份資料全部寫入磁碟檔案之後再去拉取的。map端寫一點資料,reduce端task就會拉取

shuffle(partitioner+combiner+sort)

shuffle(partitioner+combiner+sort) 每一個map有一個環形記憶體緩衝區,用於儲存任務的輸出。預設大小100MB(io.sort.mb屬性),一旦達到閥值0.8(io.sort.spill.percent),一個後臺執行緒把內容寫到(spill)磁碟的指

Spark效能優化篇四:shuffle調優

Spark效能優化篇四:shuffle調優 shuffle調優 調優概述       大多數Spark作業的效能主要就是消耗在了shuffle環節,因為該環節包含了大量的磁碟IO、序列化、網路資料

MapReduce核心map reduce shuffle (spill sort partition merge)詳解

        上圖可能是某個map task的執行情況。拿它與官方圖的左半邊比較,會發現很多不一致。官方圖沒有清楚地說明partition, sort與combiner到底作用在哪個階段。我畫了這張圖,希望讓大家清晰地瞭解從map資料輸入到map端所有資料準備好的全過程。        整個流程我分了四

大話Spark(4)-一文理解MapReduce ShuffleSpark Shuffle

Shuffle本意是 混洗, 洗牌的意思, 在MapReduce過程中需要各節點上同一類資料彙集到某一節點進行計算,把這些分佈在不同節點的資料按照一定的規則聚集到一起的過程成為Shuffle. 在Hadoop的MapReduce框架中, Shuffle是連線Map和Reduce之間的橋樑, Map

Spark利用Broadcast實現Join避免Shuffle操作

在Spark中, 諸如ReduceByKey,GroupByKey等操作會觸發Shuffle, 影響效能。 本文提供了一種利用廣播

Spark源碼解析(一) —— Spark-shell淺析

源碼解析 bsp feature 2.0 安裝 default slave title 分享圖片 1.準備工作 1.1 安裝spark,並配置spark-env.sh 使用spark-shell前需要安裝spark,詳情可以參考http://www.cnblogs.com/

什麼是寬窄依賴,及特殊join運算元,join時何時產生shuffle,何時不產生shuffle

1、 什麼是寬窄依賴, 寬依賴: 發生shuffle時,一定會產生寬依賴,寬依賴是一個RDD中的一個Partition被多個子Partition所依賴(一個父親多有兒子),也就是說每一個父RDD的Partition中的資料,都可能傳輸一部分到下一個RDD的多個partition中,此時一定會

Scala實戰高手****第7課:零基礎實戰Scala面向對象編程及Spark源碼解析

類名 修飾 hack 就是 博文 特征 def 編程 來源 /** * 如果有這些語法的支持,我們說這門語言是支持面向對象的語言 * 其實真正面向對象的精髓是不是封裝、繼承、多態呢? * --->肯定不是,封裝、繼承、多態,只不過是支撐面向對象的 * 一些語言級別的語

Scala實戰高手****第6課 :零基礎實戰Scala集合操作及Spark源碼解析

應用程序 元素 如果 掌握 說明 例如 log 方法 線程 本課內容1.Spark中Scala集合操作鑒賞2.Scala集合操作實戰 ----------------------------------------------------------------------

Spark——Streaming原始碼解析之容錯

此文是從思維導圖中匯出稍作調整後生成的,思維腦圖對程式碼瀏覽支援不是很好,為了更好閱讀體驗,文中涉及到的原始碼都是刪除掉不必要的程式碼後的虛擬碼,如需獲取更好閱讀體驗可下載腦圖配合閱讀: 此博文共分為四個部分: DAG定義 Job動態生成 資料的產生與匯入 容錯 ​ 策略 優點 缺點 (1) 熱備