Spark儲存機制原始碼剖析
一、Shuffle結果的寫入和讀取
通過之前的文章Spark原始碼解讀之Shuffle原理剖析與原始碼分析我們知道,一個Shuffle操作被DAGScheduler劃分為兩個stage,第一個stage是ShuffleMapTask,第二個是ResultTask。ShuffleMapTask會產生臨時計算結果,這些資料會被ResultTask作為輸入而讀取。
那麼ShuffleMapTask的計算結果是如何被ResultTask取得的呢?過程如下:
- ShuffleMapTask將計算狀態(不是具體的計算數值)包裝為MapStatus返回給DAGScheduler。
- DAGScheduler將MapStatus儲存到MapOutputTrackerMaster中。
- ResultTask在呼叫到ShuffleRDD時會利用BlockShuffleFetcher的fetch方法去獲取資料。首先是諮詢MapOutputTracker所要取的資料的location;然後根據返回的結果呼叫BlockManager.getMultiple獲取真正的資料。
每一個ShuffleMapTask都會用一個MapStatus來儲存計算結果。MapStatus是由BlockManagerId和ByeteSize構成,BlockManagerId表示這些計算的中間結果的實際資料在哪個BlockManager,ByteSize表示不同reduceid所要讀取的資料的大小。
private[spark] sealed trait MapStatus { /** Location where this task was run. */ def location: BlockManagerId /** * Estimated size for the reduce block, in bytes. * * If a block is non-empty, then this method MUST return a non-zero size. This invariant is * necessary for correctness, since block fetchers are allowed to skip zero-size blocks. */ //不同reduceID所要讀取的資料的大小 def getSizeForBlock(reduceId: Int): Long }
1. Shuffle結果的寫入
Shuffle的寫入過程如下:
ShuffleMapTask.runTask ----> HashShuffleWriter.writer ----> BlockObjectWriter.writer
ShuffleMapTask中runTask方法原始碼如下:
override def runTask(context: TaskContext): MapStatus = { //使用廣播變數反序列化RDD // Deserialize the RDD using the broadcast variable. val ser = SparkEnv.get.closureSerializer.newInstance() val (rdd, dep) = ser.deserialize[(RDD[_], ShuffleDependency[_, _, _])]( ByteBuffer.wrap(taskBinary.value), Thread.currentThread.getContextClassLoader) metrics = Some(context.taskMetrics) var writer: ShuffleWriter[Any, Any] = null try { //獲取ShuffleManager,從ShuffleManager中獲取ShuffleWriter val manager = SparkEnv.get.shuffleManager writer = manager.getWriter[Any, Any](dep.shuffleHandle, partitionId, context) //首先呼叫rdd的iterator方法,並且傳入了當前task要處理那個partition,然後執行我們定義的函式 //處理返回的資料都是通過ShuffleWriter,經過HashPartitioner進行分割槽之後,寫入了自己對應的bucket writer.write(rdd.iterator(partition, context).asInstanceOf[Iterator[_ <: Product2[Any, Any]]]) //最後返回結果,MapStatus //MapStatus裡面封裝了ShffleMapTask計算後的資料,儲存在哪裡,其實就是BlockManager的資訊 //BlockManager是spark底層記憶體,資料,磁碟資料管理的元件 return writer.stop(success = true).get } catch { case e: Exception => try { if (writer != null) { writer.stop(success = false) } } catch { case e: Exception => log.debug("Could not stop writer", e) } throw e } }
在HashShuffleWriter.writer中主要處理兩件事:
- 判斷是否需要進行聚合操作,比如有<hello,1>,<hello,1>都需要寫入的話,那麼需要寫成<hello,2>,然後再進行後續操作。
- 利用Partition函式來決定<key,value>寫入哪個檔案中。
HashShuffleWriter中的writer方法原始碼如下:
/** Write a bunch of records to this task's output */
/**
* 將每個ShuffleMapTask計算出來的新的RDD的partition資料,寫入本地磁碟
* @param records
*/
override def write(records: Iterator[_ <: Product2[K, V]]): Unit = {
//判斷是否需要進行本地,如果是reduceByKey這種操作,則要進行聚合操作
//即dep.aggregator.isDefined為true
//dep.mapSideCombine也為true
val iter = if (dep.aggregator.isDefined) {
if (dep.mapSideCombine) {
//這裡進行本地聚合操作,比如本地有(hello,1),(hello,1)
//則可以聚合成(hello,2)
dep.aggregator.get.combineValuesByKey(records, context)
} else {
records
}
} else {
require(!dep.mapSideCombine, "Map-side combine without Aggregator specified!")
records
}
//如果需要本地聚合,則先進行聚合
//然後遍歷資料,對每一個數據,進行partition操作,預設的是HashPartitioner,並且生成bucketId
//也就表示這資料要寫入哪一個bucket
for (elem <- iter) {
//計算bucketId
val bucketId = dep.partitioner.getPartition(elem._1)
//呼叫shuffleBlockManager.forMapTask()方法生成bucketId對應的writer,然後用writer將資料寫入bucket
//DiskBlockObjectWriter負責將資料真正寫入磁碟
shuffle.writers(bucketId).write(elem)
}
}
在上面writer方法中,使用到的Shuffle由ShuffleBlockManager中的forMapTask函式生成,該方法原始碼如下:
/**
* Get a ShuffleWriterGroup for the given map task, which will register it as complete
* when the writers are closed successfully
*/
/**
* 給每一個map task生成 一個ShuffleWriterGroup
*/
def forMapTask(shuffleId: Int, mapId: Int, numBuckets: Int, serializer: Serializer,
writeMetrics: ShuffleWriteMetrics) = {
new ShuffleWriterGroup {
shuffleStates.putIfAbsent(shuffleId, new ShuffleState(numBuckets))
private val shuffleState = shuffleStates(shuffleId)
private var fileGroup: ShuffleFileGroup = null
val openStartTime = System.nanoTime
//判斷是否開啟了consolidate優化,如果開啟了,就不會為每一個bucket獲取一個輸出檔案
//而是為每一個bucket獲取一個ShuffleGroup的write
val writers: Array[BlockObjectWriter] = if (consolidateShuffleFiles) {
fileGroup = getUnusedFileGroup()
Array.tabulate[BlockObjectWriter](numBuckets) { bucketId =>
//首先生成一個唯一的blockId,然後用bucketId來呼叫ShuffleFileGroup的apply函式來獲取一個writer
val blockId = ShuffleBlockId(shuffleId, mapId, bucketId)
//使用blockManager.getDiskWriter()函式來獲取一個writer
//實際上在開啟優化配置後,對一個bucketId,不再是像之前一樣獲取一個獨立的ShuffleBlockFile的writer
//而是獲取ShuffleFileGroup中的一個writer
//這樣就實現了多個ShufffleMapTask的輸出檔案的合併
blockManager.getDiskWriter(blockId, fileGroup(bucketId), serializer, bufferSize,
writeMetrics)
}
} else {
//如果沒有進行shuffle優化配置,也會針對每一個shuffleMapTask建立一個ShuffleBlockFile
Array.tabulate[BlockObjectWriter](numBuckets) { bucketId =>
val blockId = ShuffleBlockId(shuffleId, mapId, bucketId)
val blockFile = blockManager.diskBlockManager.getFile(blockId)
// Because of previous failures, the shuffle file may already exist on this machine.
// If so, remove it.
//如果ShuffleBlockFile存在,則進行刪除
if (blockFile.exists) {
if (blockFile.delete()) {
logInfo(s"Removed existing shuffle file $blockFile")
} else {
logWarning(s"Failed to remove existing shuffle file $blockFile")
}
}
//寫入磁碟中
blockManager.getDiskWriter(blockId, blockFile, serializer, bufferSize, writeMetrics)
}
}
// Creating the file to write to and creating a disk writer both involve interacting with
// the disk, so should be included in the shuffle write time.
writeMetrics.incShuffleWriteTime(System.nanoTime - openStartTime)
override def releaseWriters(success: Boolean) {
if (consolidateShuffleFiles) {
if (success) {
val offsets = writers.map(_.fileSegment().offset)
val lengths = writers.map(_.fileSegment().length)
fileGroup.recordMapOutput(mapId, offsets, lengths)
}
recycleFileGroup(fileGroup)
} else {
shuffleState.completedMapTasks.add(mapId)
}
}
private def getUnusedFileGroup(): ShuffleFileGroup = {
val fileGroup = shuffleState.unusedFileGroups.poll()
if (fileGroup != null) fileGroup else newFileGroup()
}
private def newFileGroup(): ShuffleFileGroup = {
val fileId = shuffleState.nextFileId.getAndIncrement()
val files = Array.tabulate[File](numBuckets) { bucketId =>
val filename = physicalFileName(shuffleId, bucketId, fileId)
blockManager.diskBlockManager.getFile(filename)
}
val fileGroup = new ShuffleFileGroup(shuffleId, fileId, files)
shuffleState.allFileGroups.add(fileGroup)
fileGroup
}
private def recycleFileGroup(group: ShuffleFileGroup) {
shuffleState.unusedFileGroups.add(group)
}
}
}
在上面的原始碼中涉及到Shuffle的優化原理,細節可以檢視上篇文章Spark原始碼解讀之Shuffle原理剖析與原始碼分析 在gieFile方法中負責將Shuffle需要寫入的資料對映為一個檔案。
/** Looks up a file by hashing it into one of our local subdirectories. */
// This method should be kept in sync with
// org.apache.spark.network.shuffle.StandaloneShuffleBlockManager#getFile().
//負責將三元組(shuffle_id,map_id,reduce_id)對映到檔名
def getFile(filename: String): File = {
// Figure out which local directory it hashes to, and which subdirectory in that
val hash = Utils.nonNegativeHash(filename)
val dirId = hash % localDirs.length
val subDirId = (hash / localDirs.length) % subDirsPerLocalDir
// Create the subdirectory if it doesn't already exist
var subDir = subDirs(dirId)(subDirId)
if (subDir == null) {
subDir = subDirs(dirId).synchronized {
val old = subDirs(dirId)(subDirId)
if (old != null) {
old
} else {
val newDir = new File(localDirs(dirId), "%02x".format(subDirId))
if (!newDir.exists() && !newDir.mkdir()) {
throw new IOException(s"Failed to create local dir in $newDir.")
}
subDirs(dirId)(subDirId) = newDir
newDir
}
}
}
new File(subDir, filename)
}
最後使用DiskBlockObjectWriter.writer負責將資料真正寫入磁碟中。
override def write(value: Any) {
if (!initialized) {
open()
}
objOut.writeObject(value)
numRecordsWritten += 1
writeMetrics.incShuffleRecordsWritten(1)
if (numRecordsWritten % 32 == 0) {
updateBytesWritten()
}
}
2. Shuffle結果讀取
Shuffle結果的讀取過程如下所示:
ShuffleRDD.compute ---> HashShuffleRead.read ---> BlockStoreShuffleFetcher.fetch ---> BlockManager.getMultiple
ShuffleRDD的compute函式是讀取ShuffleMapTask計算結果的出發點。compute原始碼如下:
/**
*shuffle的入口
*/
override def compute(split: Partition, context: TaskContext): Iterator[(K, C)] = {
//這裡會呼叫shuffleManager.getReader()來獲取一個HashShuffleReader
//然後呼叫它的reader方法來拉取resultTask需要聚合的資料
val dep = dependencies.head.asInstanceOf[ShuffleDependency[K, V, C]]
SparkEnv.get.shuffleManager.getReader(dep.shuffleHandle, split.index, split.index + 1, context)
.read()
.asInstanceOf[Iterator[(K, C)]]
}
在這裡使用HashShuffleReader呼叫reader方法獲取合併後的資料,原始碼如下所示:
/** Read the combined key-values for this reduce task */
override def read(): Iterator[Product2[K, C]] = {
val ser = Serializer.getSerializer(dep.serializer)
//通過BlockStoreShuffleFetcher的fetch方法來從DAGScheduler的MapOutputTrackerMaster中獲取
//自己需要的資料的資訊,然後底層再通過對應的BlockManager拉取需要的資料
val iter = BlockStoreShuffleFetcher.fetch(handle.shuffleId, startPartition, context, ser)
val aggregatedIter: Iterator[Product2[K, C]] = if (dep.aggregator.isDefined) {
if (dep.mapSideCombine) {
new InterruptibleIterator(context, dep.aggregator.get.combineCombinersByKey(iter, context))
} else {
new InterruptibleIterator(context, dep.aggregator.get.combineValuesByKey(iter, context))
}
} else {
require(!dep.mapSideCombine, "Map-side combine without Aggregator specified!")
// Convert the Product2s to pairs since this is what downstream RDDs currently expect
iter.asInstanceOf[Iterator[Product2[K, C]]].map(pair => (pair._1, pair._2))
}
// Sort the output if there is a sort ordering defined.
dep.keyOrdering match {
case Some(keyOrd: Ordering[K]) =>
// Create an ExternalSorter to sort the data. Note that if spark.shuffle.spill is disabled,
// the ExternalSorter won't spill to disk.
val sorter = new ExternalSorter[K, C, C](ordering = Some(keyOrd), serializer = Some(ser))
sorter.insertAll(aggregatedIter)
context.taskMetrics.incMemoryBytesSpilled(sorter.memoryBytesSpilled)
context.taskMetrics.incDiskBytesSpilled(sorter.diskBytesSpilled)
sorter.iterator
case None =>
aggregatedIter
}
}
在reader函式中呼叫BlockStoreShuffleFetcher的fetch方法去獲取MapStatus,最後通過BlockManager去真正獲取資料。原始碼如下:
private[hash] object BlockStoreShuffleFetcher extends Logging {
def fetch[T](
shuffleId: Int,
reduceId: Int,
context: TaskContext,
serializer: Serializer)
: Iterator[T] =
{
logDebug("Fetching outputs for shuffle %d, reduce %d".format(shuffleId, reduceId))
val blockManager = SparkEnv.get.blockManager
val startTime = System.currentTimeMillis
//獲取一個全域性的MapOutputTracker,並且呼叫其getServerStatuses方法
//注意這裡傳入了兩個引數,shuffleId和reduceId
//shuffle有兩個stage參與,因此shuffleId代表表示上一個stage,使用這個引數來獲取
//上一個stage的ShuffleMapTask shuffle write輸出的MapStatus資料資訊
//在獲取到MapStatus之後,還要使用reduceId來拉取當前stage需要獲取的之前stage的ShuffleMapTask的輸出檔案資訊
//這個getServerStatuses方法是需要走網路通訊的,因為它要連線Driver上的DAGScheduler來獲取MapOutputTracker上的資料資訊
val statuses = SparkEnv.get.mapOutputTracker.getServerStatuses(shuffleId, reduceId)
logDebug("Fetching map output location for shuffle %d, reduce %d took %d ms".format(
shuffleId, reduceId, System.currentTimeMillis - startTime))
val splitsByAddress = new HashMap[BlockManagerId, ArrayBuffer[(Int, Long)]]
for (((address, size), index) <- statuses.zipWithIndex) {
splitsByAddress.getOrElseUpdate(address, ArrayBuffer()) += ((index, size))
}
val blocksByAddress: Seq[(BlockManagerId, Seq[(BlockId, Long)])] = splitsByAddress.toSeq.map {
case (address, splits) =>
(address, splits.map(s => (ShuffleBlockId(shuffleId, s._1, reduceId), s._2)))
}
def unpackBlock(blockPair: (BlockId, Try[Iterator[Any]])) : Iterator[T] = {
val blockId = blockPair._1
val blockOption = blockPair._2
blockOption match {
case Success(block) => {
block.asInstanceOf[Iterator[T]]
}
case Failure(e) => {
blockId match {
case ShuffleBlockId(shufId, mapId, _) =>
val address = statuses(mapId.toInt)._1
throw new FetchFailedException(address, shufId.toInt, mapId.toInt, reduceId, e)
case _ =>
throw new SparkException(
"Failed to get block " + blockId + ", which is not a shuffle block", e)
}
}
}
}
val blockFetcherItr = new ShuffleBlockFetcherIterator(
context,
SparkEnv.get.blockManager.shuffleClient,
blockManager,
blocksByAddress,
serializer,
SparkEnv.get.conf.getLong("spark.reducer.maxMbInFlight", 48) * 1024 * 1024)
val itr = blockFetcherItr.flatMap(unpackBlock)
val completionIter = CompletionIterator[T, Iterator[T]](itr, {
context.taskMetrics.updateShuffleReadMetrics()
})
new InterruptibleIterator[T](context, completionIter) {
val readMetrics = context.taskMetrics.createShuffleReadMetricsForDependency()
override def next(): T = {
readMetrics.incRecordsRead(1)
delegate.next()
}
}
}
}
在MapOutputTracker中呼叫getServerStatuses在Executor中獲取ShuffleMapTask輸出結果資料的所在的URL和Size,原始碼如下:
/**
* Called from executors to get the server URIs and output sizes of the map outputs of
* a given shuffle.
*/
def getServerStatuses(shuffleId: Int, reduceId: Int): Array[(BlockManagerId, Long)] = {
val statuses = mapStatuses.get(shuffleId).orNull
if (statuses == null) {
logInfo("Don't have map outputs for shuffle " + shuffleId + ", fetching them")
var fetchedStatuses: Array[MapStatus] = null
fetching.synchronized {
// Someone else is fetching it; wait for them to be done
//等待抓取資料
while (fetching.contains(shuffleId)) {
try {
fetching.wait()
} catch {
case e: InterruptedException =>
}
}
// Either while we waited the fetch happened successfully, or
// someone fetched it in between the get and the fetching.synchronized.
fetchedStatuses = mapStatuses.get(shuffleId).orNull
if (fetchedStatuses == null) {
// We have to do the fetch, get others to wait for us.
fetching += shuffleId
}
}
if (fetchedStatuses == null) {
// We won the race to fetch the output locs; do so
logInfo("Doing the fetch; tracker actor = " + trackerActor)
// This try-finally prevents hangs due to timeouts:
try {
val fetchedBytes =
askTracker(GetMapOutputStatuses(shuffleId)).asInstanceOf[Array[Byte]]
fetchedStatuses = MapOutputTracker.deserializeMapStatuses(fetchedBytes)
logInfo("Got the output locations")
mapStatuses.put(shuffleId, fetchedStatuses)
} finally {
fetching.synchronized {
fetching -= shuffleId
fetching.notifyAll()
}
}
}
if (fetchedStatuses != null) {
fetchedStatuses.synchronized {
return MapOutputTracker.convertMapStatuses(shuffleId, reduceId, fetchedStatuses)
}
} else {
logError("Missing all output locations for shuffle " + shuffleId)
throw new MetadataFetchFailedException(
shuffleId, reduceId, "Missing all output locations for shuffle " + shuffleId)
}
} else {
statuses.synchronized {
return MapOutputTracker.convertMapStatuses(shuffleId, reduceId, statuses)
}
}
}
一個ShuffleMapTask會生成一個MapStatus,在MapStatus中含有當前ShuffleMapTask產生的資料落到各個Partition中的大小。如果大小為0,則表示該分割槽中沒有資料產生。每一個分割槽中的資料大小使用一個byte來表示的,但是一個byte最多隻能表示255,如何表示更大的size呢?這裡就使用到了巧妙的轉換,使用1.1作為對數底,可以將28,轉換為1.1256。MapStatus中的compressSize和decompressSize的作用,就是將資料的大小用另一種進位制來表示,這樣就可以讓表達的空間從0至255轉換為0至35903328256,單個儲存的大小可以高達近35GB。
原始碼如下:
/**
* Compress a size in bytes to 8 bits for efficient reporting of map output sizes.
* We do this by encoding the log base 1.1 of the size as an integer, which can support
* sizes up to 35 GB with at most 10% error.
*/
def compressSize(size: Long): Byte = {
if (size == 0) {
0
} else if (size <= 1L) {
1
} else {
math.min(255, math.ceil(math.log(size) / math.log(LOG_BASE)).toInt).toByte
}
}
/**
* Decompress an 8-bit encoded block size, using the reverse operation of compressSize.
*/
def decompressSize(compressedSize: Byte): Long = {
if (compressedSize == 0) {
0
} else {
math.pow(LOG_BASE, compressedSize & 0xFF).toLong
}
}
ShuffleId唯一標識了一個job中的stage,這一個stage是作為ReduceTask所在Stage的直接上游。需要遍歷該Stage中每一個Task產生的mapStatus來獲知是否有當前ResultTask需要讀取的資料。
在BlockManager中首先會呼叫initialize函式進行初始化,初始化BlockTransferService 和 ShuffleClient,向BlockManagerMaster進行註冊,並且在BlockManagerWorker中註冊本地的Shuffle service。如果所要獲取的檔案落在本地,則呼叫getLocal獲取,否則呼叫getRemote遠端拉取。initialize函式原始碼如下:
/**
* Initializes the BlockManager with the given appId. This is not performed in the constructor as
* the appId may not be known at BlockManager instantiation time (in particular for the driver,
* where it is only learned after registration with the TaskScheduler).
*
* This method initializes the BlockTransferService and ShuffleClient, registers with the
* BlockManagerMaster, starts the BlockManagerWorker actor, and registers with a local shuffle
* service if configured.
*/
def initialize(appId: String): Unit = {
blockTransferService.init(this)
shuffleClient.init(appId)
blockManagerId = BlockManagerId(
executorId, blockTransferService.hostName, blockTransferService.port)
shuffleServerId = if (externalShuffleServiceEnabled) {
BlockManagerId(executorId, blockTransferService.hostName, externalShuffleServicePort)
} else {
blockManagerId
}
master.registerBlockManager(blockManagerId, maxMemory, slaveActor)
// Register Executors' configuration with the local shuffle service, if one should exist.
if (externalShuffleServiceEnabled && !blockManagerId.isDriver) {
registerWithExternalShuffleServer()
}
}
Shuffle操作會消耗大量的記憶體,具體體現在下面幾個方面:
- 每個Writer開啟100KB的快取。
- Records會佔用大量記憶體。
- 在ResultTask的combine階段,利用HashMap來快取資料,如果讀取的資料量很大,或者分割槽很多,可能導致記憶體不足。