1. 程式人生 > >Spark儲存機制原始碼剖析

Spark儲存機制原始碼剖析

一、Shuffle結果的寫入和讀取

通過之前的文章Spark原始碼解讀之Shuffle原理剖析與原始碼分析我們知道,一個Shuffle操作被DAGScheduler劃分為兩個stage,第一個stage是ShuffleMapTask,第二個是ResultTask。ShuffleMapTask會產生臨時計算結果,這些資料會被ResultTask作為輸入而讀取。

那麼ShuffleMapTask的計算結果是如何被ResultTask取得的呢?過程如下:

  1. ShuffleMapTask將計算狀態(不是具體的計算數值)包裝為MapStatus返回給DAGScheduler。
  2. DAGScheduler將MapStatus儲存到MapOutputTrackerMaster中。
  3. ResultTask在呼叫到ShuffleRDD時會利用BlockShuffleFetcher的fetch方法去獲取資料。首先是諮詢MapOutputTracker所要取的資料的location;然後根據返回的結果呼叫BlockManager.getMultiple獲取真正的資料。

每一個ShuffleMapTask都會用一個MapStatus來儲存計算結果。MapStatus是由BlockManagerId和ByeteSize構成,BlockManagerId表示這些計算的中間結果的實際資料在哪個BlockManager,ByteSize表示不同reduceid所要讀取的資料的大小。

private[spark] sealed trait MapStatus {
  /** Location where this task was run. */
  def location: BlockManagerId

  /**
   * Estimated size for the reduce block, in bytes.
   *
   * If a block is non-empty, then this method MUST return a non-zero size.  This invariant is
   * necessary for correctness, since block fetchers are allowed to skip zero-size blocks.
   */
  //不同reduceID所要讀取的資料的大小
  def getSizeForBlock(reduceId: Int): Long
}

1. Shuffle結果的寫入

Shuffle的寫入過程如下:

ShuffleMapTask.runTask ----> HashShuffleWriter.writer ----> BlockObjectWriter.writer

ShuffleMapTask中runTask方法原始碼如下:

 override def runTask(context: TaskContext): MapStatus = {
    //使用廣播變數反序列化RDD
    // Deserialize the RDD using the broadcast variable.
    val ser = SparkEnv.get.closureSerializer.newInstance()
    val (rdd, dep) = ser.deserialize[(RDD[_], ShuffleDependency[_, _, _])](
      ByteBuffer.wrap(taskBinary.value), Thread.currentThread.getContextClassLoader)

    metrics = Some(context.taskMetrics)
    var writer: ShuffleWriter[Any, Any] = null
    try {
      //獲取ShuffleManager,從ShuffleManager中獲取ShuffleWriter
      val manager = SparkEnv.get.shuffleManager
      writer = manager.getWriter[Any, Any](dep.shuffleHandle, partitionId, context)
      //首先呼叫rdd的iterator方法,並且傳入了當前task要處理那個partition,然後執行我們定義的函式
      //處理返回的資料都是通過ShuffleWriter,經過HashPartitioner進行分割槽之後,寫入了自己對應的bucket
      writer.write(rdd.iterator(partition, context).asInstanceOf[Iterator[_ <: Product2[Any, Any]]])
      //最後返回結果,MapStatus
      //MapStatus裡面封裝了ShffleMapTask計算後的資料,儲存在哪裡,其實就是BlockManager的資訊
      //BlockManager是spark底層記憶體,資料,磁碟資料管理的元件
      return writer.stop(success = true).get
    } catch {
      case e: Exception =>
        try {
          if (writer != null) {
            writer.stop(success = false)
          }
        } catch {
          case e: Exception =>
            log.debug("Could not stop writer", e)
        }
        throw e
    }
  }

在HashShuffleWriter.writer中主要處理兩件事:

  1. 判斷是否需要進行聚合操作,比如有<hello,1>,<hello,1>都需要寫入的話,那麼需要寫成<hello,2>,然後再進行後續操作。
  2. 利用Partition函式來決定<key,value>寫入哪個檔案中。

HashShuffleWriter中的writer方法原始碼如下:

 /** Write a bunch of records to this task's output */
  /**
    * 將每個ShuffleMapTask計算出來的新的RDD的partition資料,寫入本地磁碟
    * @param records
    */
  override def write(records: Iterator[_ <: Product2[K, V]]): Unit = {
    //判斷是否需要進行本地,如果是reduceByKey這種操作,則要進行聚合操作
    //即dep.aggregator.isDefined為true
    //dep.mapSideCombine也為true
    val iter = if (dep.aggregator.isDefined) {
      if (dep.mapSideCombine) {
        //這裡進行本地聚合操作,比如本地有(hello,1),(hello,1)
        //則可以聚合成(hello,2)
        dep.aggregator.get.combineValuesByKey(records, context)
      } else {
        records
      }
    } else {
      require(!dep.mapSideCombine, "Map-side combine without Aggregator specified!")
      records
    }

    //如果需要本地聚合,則先進行聚合
    //然後遍歷資料,對每一個數據,進行partition操作,預設的是HashPartitioner,並且生成bucketId
    //也就表示這資料要寫入哪一個bucket
    for (elem <- iter) {
      //計算bucketId
      val bucketId = dep.partitioner.getPartition(elem._1)
      //呼叫shuffleBlockManager.forMapTask()方法生成bucketId對應的writer,然後用writer將資料寫入bucket
      //DiskBlockObjectWriter負責將資料真正寫入磁碟
      shuffle.writers(bucketId).write(elem)
    }
  }

在上面writer方法中,使用到的Shuffle由ShuffleBlockManager中的forMapTask函式生成,該方法原始碼如下:

/**
   * Get a ShuffleWriterGroup for the given map task, which will register it as complete
   * when the writers are closed successfully
   */
  /**
    * 給每一個map task生成 一個ShuffleWriterGroup
    */
  def forMapTask(shuffleId: Int, mapId: Int, numBuckets: Int, serializer: Serializer,
      writeMetrics: ShuffleWriteMetrics) = {
    new ShuffleWriterGroup {
      shuffleStates.putIfAbsent(shuffleId, new ShuffleState(numBuckets))
      private val shuffleState = shuffleStates(shuffleId)
      private var fileGroup: ShuffleFileGroup = null

      val openStartTime = System.nanoTime
      //判斷是否開啟了consolidate優化,如果開啟了,就不會為每一個bucket獲取一個輸出檔案
      //而是為每一個bucket獲取一個ShuffleGroup的write
      val writers: Array[BlockObjectWriter] = if (consolidateShuffleFiles) {
        fileGroup = getUnusedFileGroup()
        Array.tabulate[BlockObjectWriter](numBuckets) { bucketId =>
          //首先生成一個唯一的blockId,然後用bucketId來呼叫ShuffleFileGroup的apply函式來獲取一個writer
          val blockId = ShuffleBlockId(shuffleId, mapId, bucketId)
          //使用blockManager.getDiskWriter()函式來獲取一個writer
          //實際上在開啟優化配置後,對一個bucketId,不再是像之前一樣獲取一個獨立的ShuffleBlockFile的writer
          //而是獲取ShuffleFileGroup中的一個writer
          //這樣就實現了多個ShufffleMapTask的輸出檔案的合併
          blockManager.getDiskWriter(blockId, fileGroup(bucketId), serializer, bufferSize,
            writeMetrics)
        }
      } else {
        //如果沒有進行shuffle優化配置,也會針對每一個shuffleMapTask建立一個ShuffleBlockFile
        Array.tabulate[BlockObjectWriter](numBuckets) { bucketId =>
          val blockId = ShuffleBlockId(shuffleId, mapId, bucketId)
          val blockFile = blockManager.diskBlockManager.getFile(blockId)
          // Because of previous failures, the shuffle file may already exist on this machine.
          // If so, remove it.
          //如果ShuffleBlockFile存在,則進行刪除
          if (blockFile.exists) {
            if (blockFile.delete()) {
              logInfo(s"Removed existing shuffle file $blockFile")
            } else {
              logWarning(s"Failed to remove existing shuffle file $blockFile")
            }
          }
          //寫入磁碟中
          blockManager.getDiskWriter(blockId, blockFile, serializer, bufferSize, writeMetrics)
        }
      }
      // Creating the file to write to and creating a disk writer both involve interacting with
      // the disk, so should be included in the shuffle write time.
      writeMetrics.incShuffleWriteTime(System.nanoTime - openStartTime)

      override def releaseWriters(success: Boolean) {
        if (consolidateShuffleFiles) {
          if (success) {
            val offsets = writers.map(_.fileSegment().offset)
            val lengths = writers.map(_.fileSegment().length)
            fileGroup.recordMapOutput(mapId, offsets, lengths)
          }
          recycleFileGroup(fileGroup)
        } else {
          shuffleState.completedMapTasks.add(mapId)
        }
      }

      private def getUnusedFileGroup(): ShuffleFileGroup = {
        val fileGroup = shuffleState.unusedFileGroups.poll()
        if (fileGroup != null) fileGroup else newFileGroup()
      }

      private def newFileGroup(): ShuffleFileGroup = {
        val fileId = shuffleState.nextFileId.getAndIncrement()
        val files = Array.tabulate[File](numBuckets) { bucketId =>
          val filename = physicalFileName(shuffleId, bucketId, fileId)
          blockManager.diskBlockManager.getFile(filename)
        }
        val fileGroup = new ShuffleFileGroup(shuffleId, fileId, files)
        shuffleState.allFileGroups.add(fileGroup)
        fileGroup
      }

      private def recycleFileGroup(group: ShuffleFileGroup) {
        shuffleState.unusedFileGroups.add(group)
      }
    }
  }

在上面的原始碼中涉及到Shuffle的優化原理,細節可以檢視上篇文章Spark原始碼解讀之Shuffle原理剖析與原始碼分析 在gieFile方法中負責將Shuffle需要寫入的資料對映為一個檔案。

/** Looks up a file by hashing it into one of our local subdirectories. */
  // This method should be kept in sync with
  // org.apache.spark.network.shuffle.StandaloneShuffleBlockManager#getFile().
  //負責將三元組(shuffle_id,map_id,reduce_id)對映到檔名
  def getFile(filename: String): File = {
    // Figure out which local directory it hashes to, and which subdirectory in that
    val hash = Utils.nonNegativeHash(filename)
    val dirId = hash % localDirs.length
    val subDirId = (hash / localDirs.length) % subDirsPerLocalDir

    // Create the subdirectory if it doesn't already exist
    var subDir = subDirs(dirId)(subDirId)
    if (subDir == null) {
      subDir = subDirs(dirId).synchronized {
        val old = subDirs(dirId)(subDirId)
        if (old != null) {
          old
        } else {
          val newDir = new File(localDirs(dirId), "%02x".format(subDirId))
          if (!newDir.exists() && !newDir.mkdir()) {
            throw new IOException(s"Failed to create local dir in $newDir.")
          }
          subDirs(dirId)(subDirId) = newDir
          newDir
        }
      }
    }

    new File(subDir, filename)
  }

最後使用DiskBlockObjectWriter.writer負責將資料真正寫入磁碟中。

 override def write(value: Any) {
    if (!initialized) {
      open()
    }

    objOut.writeObject(value)
    numRecordsWritten += 1
    writeMetrics.incShuffleRecordsWritten(1)

    if (numRecordsWritten % 32 == 0) {
      updateBytesWritten()
    }
  }

2. Shuffle結果讀取

Shuffle結果的讀取過程如下所示:

ShuffleRDD.compute ---> HashShuffleRead.read ---> BlockStoreShuffleFetcher.fetch ---> BlockManager.getMultiple

ShuffleRDD的compute函式是讀取ShuffleMapTask計算結果的出發點。compute原始碼如下:

 /**
    *shuffle的入口
    */
  override def compute(split: Partition, context: TaskContext): Iterator[(K, C)] = {
    //這裡會呼叫shuffleManager.getReader()來獲取一個HashShuffleReader
    //然後呼叫它的reader方法來拉取resultTask需要聚合的資料
    val dep = dependencies.head.asInstanceOf[ShuffleDependency[K, V, C]]
    SparkEnv.get.shuffleManager.getReader(dep.shuffleHandle, split.index, split.index + 1, context)
      .read()
      .asInstanceOf[Iterator[(K, C)]]
  }

在這裡使用HashShuffleReader呼叫reader方法獲取合併後的資料,原始碼如下所示:

/** Read the combined key-values for this reduce task */
  override def read(): Iterator[Product2[K, C]] = {
    val ser = Serializer.getSerializer(dep.serializer)
    //通過BlockStoreShuffleFetcher的fetch方法來從DAGScheduler的MapOutputTrackerMaster中獲取
    //自己需要的資料的資訊,然後底層再通過對應的BlockManager拉取需要的資料
    val iter = BlockStoreShuffleFetcher.fetch(handle.shuffleId, startPartition, context, ser)

    val aggregatedIter: Iterator[Product2[K, C]] = if (dep.aggregator.isDefined) {
      if (dep.mapSideCombine) {
        new InterruptibleIterator(context, dep.aggregator.get.combineCombinersByKey(iter, context))
      } else {
        new InterruptibleIterator(context, dep.aggregator.get.combineValuesByKey(iter, context))
      }
    } else {
      require(!dep.mapSideCombine, "Map-side combine without Aggregator specified!")

      // Convert the Product2s to pairs since this is what downstream RDDs currently expect
      iter.asInstanceOf[Iterator[Product2[K, C]]].map(pair => (pair._1, pair._2))
    }

    // Sort the output if there is a sort ordering defined.
    dep.keyOrdering match {
      case Some(keyOrd: Ordering[K]) =>
        // Create an ExternalSorter to sort the data. Note that if spark.shuffle.spill is disabled,
        // the ExternalSorter won't spill to disk.
        val sorter = new ExternalSorter[K, C, C](ordering = Some(keyOrd), serializer = Some(ser))
        sorter.insertAll(aggregatedIter)
        context.taskMetrics.incMemoryBytesSpilled(sorter.memoryBytesSpilled)
        context.taskMetrics.incDiskBytesSpilled(sorter.diskBytesSpilled)
        sorter.iterator
      case None =>
        aggregatedIter
    }
  }

在reader函式中呼叫BlockStoreShuffleFetcher的fetch方法去獲取MapStatus,最後通過BlockManager去真正獲取資料。原始碼如下:

private[hash] object BlockStoreShuffleFetcher extends Logging {
  def fetch[T](
      shuffleId: Int,
      reduceId: Int,
      context: TaskContext,
      serializer: Serializer)
    : Iterator[T] =
  {
    logDebug("Fetching outputs for shuffle %d, reduce %d".format(shuffleId, reduceId))
    val blockManager = SparkEnv.get.blockManager

    val startTime = System.currentTimeMillis

    //獲取一個全域性的MapOutputTracker,並且呼叫其getServerStatuses方法
    //注意這裡傳入了兩個引數,shuffleId和reduceId
    //shuffle有兩個stage參與,因此shuffleId代表表示上一個stage,使用這個引數來獲取
    //上一個stage的ShuffleMapTask shuffle write輸出的MapStatus資料資訊
    //在獲取到MapStatus之後,還要使用reduceId來拉取當前stage需要獲取的之前stage的ShuffleMapTask的輸出檔案資訊
    //這個getServerStatuses方法是需要走網路通訊的,因為它要連線Driver上的DAGScheduler來獲取MapOutputTracker上的資料資訊
    val statuses = SparkEnv.get.mapOutputTracker.getServerStatuses(shuffleId, reduceId)

    logDebug("Fetching map output location for shuffle %d, reduce %d took %d ms".format(
      shuffleId, reduceId, System.currentTimeMillis - startTime))

    val splitsByAddress = new HashMap[BlockManagerId, ArrayBuffer[(Int, Long)]]
    for (((address, size), index) <- statuses.zipWithIndex) {
      splitsByAddress.getOrElseUpdate(address, ArrayBuffer()) += ((index, size))
    }

    val blocksByAddress: Seq[(BlockManagerId, Seq[(BlockId, Long)])] = splitsByAddress.toSeq.map {
      case (address, splits) =>
        (address, splits.map(s => (ShuffleBlockId(shuffleId, s._1, reduceId), s._2)))
    }

    def unpackBlock(blockPair: (BlockId, Try[Iterator[Any]])) : Iterator[T] = {
      val blockId = blockPair._1
      val blockOption = blockPair._2
      blockOption match {
        case Success(block) => {
          block.asInstanceOf[Iterator[T]]
        }
        case Failure(e) => {
          blockId match {
            case ShuffleBlockId(shufId, mapId, _) =>
              val address = statuses(mapId.toInt)._1
              throw new FetchFailedException(address, shufId.toInt, mapId.toInt, reduceId, e)
            case _ =>
              throw new SparkException(
                "Failed to get block " + blockId + ", which is not a shuffle block", e)
          }
        }
      }
    }

    val blockFetcherItr = new ShuffleBlockFetcherIterator(
      context,
      SparkEnv.get.blockManager.shuffleClient,
      blockManager,
      blocksByAddress,
      serializer,
      SparkEnv.get.conf.getLong("spark.reducer.maxMbInFlight", 48) * 1024 * 1024)
    val itr = blockFetcherItr.flatMap(unpackBlock)

    val completionIter = CompletionIterator[T, Iterator[T]](itr, {
      context.taskMetrics.updateShuffleReadMetrics()
    })

    new InterruptibleIterator[T](context, completionIter) {
      val readMetrics = context.taskMetrics.createShuffleReadMetricsForDependency()
      override def next(): T = {
        readMetrics.incRecordsRead(1)
        delegate.next()
      }
    }
  }
}

在MapOutputTracker中呼叫getServerStatuses在Executor中獲取ShuffleMapTask輸出結果資料的所在的URL和Size,原始碼如下:

 /**
   * Called from executors to get the server URIs and output sizes of the map outputs of
   * a given shuffle.
   */
  def getServerStatuses(shuffleId: Int, reduceId: Int): Array[(BlockManagerId, Long)] = {
    val statuses = mapStatuses.get(shuffleId).orNull
    if (statuses == null) {
      logInfo("Don't have map outputs for shuffle " + shuffleId + ", fetching them")
      var fetchedStatuses: Array[MapStatus] = null
      fetching.synchronized {
        // Someone else is fetching it; wait for them to be done
        //等待抓取資料
        while (fetching.contains(shuffleId)) {
          try {
            fetching.wait()
          } catch {
            case e: InterruptedException =>
          }
        }

        // Either while we waited the fetch happened successfully, or
        // someone fetched it in between the get and the fetching.synchronized.
        fetchedStatuses = mapStatuses.get(shuffleId).orNull
        if (fetchedStatuses == null) {
          // We have to do the fetch, get others to wait for us.
          fetching += shuffleId
        }
      }

      if (fetchedStatuses == null) {
        // We won the race to fetch the output locs; do so
        logInfo("Doing the fetch; tracker actor = " + trackerActor)
        // This try-finally prevents hangs due to timeouts:
        try {
          val fetchedBytes =
            askTracker(GetMapOutputStatuses(shuffleId)).asInstanceOf[Array[Byte]]
          fetchedStatuses = MapOutputTracker.deserializeMapStatuses(fetchedBytes)
          logInfo("Got the output locations")
          mapStatuses.put(shuffleId, fetchedStatuses)
        } finally {
          fetching.synchronized {
            fetching -= shuffleId
            fetching.notifyAll()
          }
        }
      }
      if (fetchedStatuses != null) {
        fetchedStatuses.synchronized {
          return MapOutputTracker.convertMapStatuses(shuffleId, reduceId, fetchedStatuses)
        }
      } else {
        logError("Missing all output locations for shuffle " + shuffleId)
        throw new MetadataFetchFailedException(
          shuffleId, reduceId, "Missing all output locations for shuffle " + shuffleId)
      }
    } else {
      statuses.synchronized {
        return MapOutputTracker.convertMapStatuses(shuffleId, reduceId, statuses)
      }
    }
  }

一個ShuffleMapTask會生成一個MapStatus,在MapStatus中含有當前ShuffleMapTask產生的資料落到各個Partition中的大小。如果大小為0,則表示該分割槽中沒有資料產生。每一個分割槽中的資料大小使用一個byte來表示的,但是一個byte最多隻能表示255,如何表示更大的size呢?這裡就使用到了巧妙的轉換,使用1.1作為對數底,可以將28,轉換為1.1256。MapStatus中的compressSize和decompressSize的作用,就是將資料的大小用另一種進位制來表示,這樣就可以讓表達的空間從0至255轉換為0至35903328256,單個儲存的大小可以高達近35GB。

原始碼如下:

/**
   * Compress a size in bytes to 8 bits for efficient reporting of map output sizes.
   * We do this by encoding the log base 1.1 of the size as an integer, which can support
   * sizes up to 35 GB with at most 10% error.
   */
  def compressSize(size: Long): Byte = {
    if (size == 0) {
      0
    } else if (size <= 1L) {
      1
    } else {
      math.min(255, math.ceil(math.log(size) / math.log(LOG_BASE)).toInt).toByte
    }
  }

  /**
   * Decompress an 8-bit encoded block size, using the reverse operation of compressSize.
   */
  def decompressSize(compressedSize: Byte): Long = {
    if (compressedSize == 0) {
      0
    } else {
      math.pow(LOG_BASE, compressedSize & 0xFF).toLong
    }
  }

ShuffleId唯一標識了一個job中的stage,這一個stage是作為ReduceTask所在Stage的直接上游。需要遍歷該Stage中每一個Task產生的mapStatus來獲知是否有當前ResultTask需要讀取的資料。

在BlockManager中首先會呼叫initialize函式進行初始化,初始化BlockTransferService 和 ShuffleClient,向BlockManagerMaster進行註冊,並且在BlockManagerWorker中註冊本地的Shuffle service。如果所要獲取的檔案落在本地,則呼叫getLocal獲取,否則呼叫getRemote遠端拉取。initialize函式原始碼如下:

/**
   * Initializes the BlockManager with the given appId. This is not performed in the constructor as
   * the appId may not be known at BlockManager instantiation time (in particular for the driver,
   * where it is only learned after registration with the TaskScheduler).
   *
   * This method initializes the BlockTransferService and ShuffleClient, registers with the
   * BlockManagerMaster, starts the BlockManagerWorker actor, and registers with a local shuffle
   * service if configured.
   */
  def initialize(appId: String): Unit = {
    blockTransferService.init(this)
    shuffleClient.init(appId)

    blockManagerId = BlockManagerId(
      executorId, blockTransferService.hostName, blockTransferService.port)

    shuffleServerId = if (externalShuffleServiceEnabled) {
      BlockManagerId(executorId, blockTransferService.hostName, externalShuffleServicePort)
    } else {
      blockManagerId
    }

    master.registerBlockManager(blockManagerId, maxMemory, slaveActor)

    // Register Executors' configuration with the local shuffle service, if one should exist.
    if (externalShuffleServiceEnabled && !blockManagerId.isDriver) {
      registerWithExternalShuffleServer()
    }
  }

Shuffle操作會消耗大量的記憶體,具體體現在下面幾個方面:

  • 每個Writer開啟100KB的快取。
  • Records會佔用大量記憶體。
  • 在ResultTask的combine階段,利用HashMap來快取資料,如果讀取的資料量很大,或者分割槽很多,可能導致記憶體不足。