spark的儲存系統--BlockManager原始碼分析
spark的儲存系統--BlockManager原始碼分析
根據之前的一系列分析,我們對spark作業從建立到排程分發,到執行,最後結果回傳driver的過程有了一個大概的瞭解。但是在分析原始碼的過程中也留下了大量的問題,最主要的就是涉及到的spark中重要的幾個基礎模組,我們對這些基礎設施的內部細節並不是很瞭解,之前走讀原始碼時基本只是大概瞭解每個模組的作用以及對外的主要介面,這些重要的模組包括BlockMananger, MemoryMananger, ShuffleManager, MapOutputTracker, rpc模組NettyRPCEnv,以及BroadcastManager。 而對於排程系統涉及到的幾個類包括DAGSchedulerManager, TaskSchedulerManager, CoarseGrainedSchedulerBackend, CoarseGrainedExecutorBackend, Executor, TaskRunner,我們之前已經做了較為詳細的分析,因此這幾個模組暫告一段落。
本篇,我們來看一下spark中最基礎的一個的模組--儲存系統BlockManager的內部實現。
BlockManager呼叫時機
首先,我們來整理一下在一個作業的執行過程中都有哪些地方使用到了BlockManager。
DAGScheduler.getCacheLocs。這個方法的呼叫是在提交一個stage時,需要獲取分割槽的偏向位置時會呼叫該方法。我們知道rdd是可以快取的,而rdd的快取就是通過blockManager來管理的,有一個專門的RDDBlockId用來表示一個RDD快取塊的唯一標識。
最終呼叫的方法是:blockManagerMaster.getLocations(blockIds)
廣播變數。在DAGscheduler中提交stage時需要把rdd和ShuffleDependency(對於ResultStage則是一個函式)物件序列化用於網路傳輸,實際上序列化後的位元組陣列是通過broadcastManager元件進行網路傳輸的,而broadcastManager實際又是通過BlockMananger來將要廣播的資料儲存成block,並在executor端傳送rpc請求向BlockManangerMaster請求資料。每個廣播變數會對應一個TorrentBroadcast物件,TorrentBroadcast物件內的writeBlocks和readBlocks是讀寫廣播變數的方法,
最終呼叫的方法是:blockManager.putSingle和blockManager.putBytes
Shuffle的map階段輸出。如果我們沒有啟動外部shuffle服務及ExternalShuffle,那麼就會用spark自己的shuffle機制,在map階段輸出時通過blockManager對輸出的檔案進行管理。shuffle這部分主要使用的是DiskBlockManager元件。
最終呼叫的是:DiskBlockManager相關方法包括createTempShuffleBlock,getDiskWriter, DiskBlockObjectWriter相關方法,包括write方法和commitAndGet方法
任務執行結果序列化後傳回driver。這裡分為兩種情況,如果結果序列化後體積較小,小於maxDirectResultSize,則直接通過rpc介面傳回,如果體積較大,就需要先通過blockManager寫入executor幾點的記憶體和磁碟中,然後在driver端進行拉取。
最終呼叫的是:blockManager.putBytes
此外,我們還注意到,以上幾種情形中使用的BlockId都是不同的,具體可以看一下BlockId.scala檔案中關於各種BlockId的定義。
所以,接下來,我們的思路就很清晰了,以上面提到的對BlockManager的方法呼叫為切入點進行分析。
BlockManagerMaster.getLocations
這個方法用於獲取指定的blockId對應的塊所在儲存位置。
def getLocations(blockIds: Array[BlockId]): IndexedSeq[Seq[BlockManagerId]] = {
driverEndpoint.askSync[IndexedSeq[Seq[BlockManagerId]]](
GetLocationsMultipleBlockIds(blockIds))
}
這裡向driverEndpoint傳送了一個GetLocations訊息,注意這裡的driverEndpoint並不是DriverEndpoint的端點引用,在SparkEnv的構造過程我們可以看到,這是一個BlockManagerMasterEndpoint端點的引用。所以我們需要在BlockManagerMasterEndpoint中尋找對於該訊息的處理。注意,由於這裡呼叫了ask方法,所以在服務端是由receiveAndReply方法來處理並響應的。
BlockManagerMasterEndpoint.receiveAndReply
我們截取了對GetLocations處理的部分程式碼
case GetLocationsMultipleBlockIds(blockIds) =>
context.reply(getLocationsMultipleBlockIds(blockIds))
呼叫的是getLocations方法:
private def getLocations(blockId: BlockId): Seq[BlockManagerId] = {
if (blockLocations.containsKey(blockId)) blockLocations.get(blockId).toSeq else Seq.empty
}
這個方法很簡單,就是直接從快取中查詢blockId對應的位置,位置資訊用BlockManagerId封裝。那麼快取中的資訊什麼時候加進去呢?當然是寫入新的block並更新block位置資訊的時候,後面的會分析到。
BlockManager.putSingle
這個方法寫入一個有單個物件組成的塊,
def putSingle[T: ClassTag](
blockId: BlockId,
value: T,
level: StorageLevel,
tellMaster: Boolean = true): Boolean = {
putIterator(blockId, Iterator(value), level, tellMaster)
}
可以看到,把物件包裝成了一個只有一個元素的迭代器,然後呼叫putIterator方法,最後呼叫doPutIterator方法
BlockManager.doPutIterator
上面的方法,最終呼叫了doPutIterator方法。
private def doPutIterator[T](
blockId: BlockId,
iterator: () => Iterator[T],
level: StorageLevel,
classTag: ClassTag[T],
tellMaster: Boolean = true,
keepReadLock: Boolean = false): Option[PartiallyUnrolledIterator[T]] = {
//
doPut(blockId, level, classTag, tellMaster = tellMaster, keepReadLock = keepReadLock) { info =>
val startTimeMs = System.currentTimeMillis
var iteratorFromFailedMemoryStorePut: Option[PartiallyUnrolledIterator[T]] = None
// Size of the block in bytes
var size = 0L
// 如果儲存等級中包含記憶體級別,那麼我們優先寫入記憶體中
if (level.useMemory) {
// Put it in memory first, even if it also has useDisk set to true;
// We will drop it to disk later if the memory store can't hold it.
// 對於不進行序列化的情況,只能儲存記憶體中
if (level.deserialized) {
memoryStore.putIteratorAsValues(blockId, iterator(), classTag) match {
case Right(s) =>
size = s
case Left(iter) =>
// Not enough space to unroll this block; drop to disk if applicable
// 記憶體空間不夠時,如果儲存等級允許磁碟,則儲存到磁碟中
if (level.useDisk) {
logWarning(s"Persisting block $blockId to disk instead.")
diskStore.put(blockId) { channel =>
val out = Channels.newOutputStream(channel)
// 注意對於儲存到磁碟的情況一定是要序列化的
serializerManager.dataSerializeStream(blockId, out, iter)(classTag)
}
size = diskStore.getSize(blockId)
} else {
iteratorFromFailedMemoryStorePut = Some(iter)
}
}
} else { // !level.deserialized
// 以序列化的形式進行儲存
memoryStore.putIteratorAsBytes(blockId, iterator(), classTag, level.memoryMode) match {
case Right(s) =>
size = s
case Left(partiallySerializedValues) =>
// Not enough space to unroll this block; drop to disk if applicable
if (level.useDisk) {
logWarning(s"Persisting block $blockId to disk instead.")
diskStore.put(blockId) { channel =>
val out = Channels.newOutputStream(channel)
partiallySerializedValues.finishWritingToStream(out)
}
size = diskStore.getSize(blockId)
} else {
iteratorFromFailedMemoryStorePut = Some(partiallySerializedValues.valuesIterator)
}
}
}
} else if (level.useDisk) {// 對於儲存級別不允許存入記憶體的情況,我們只能選擇存入磁碟
diskStore.put(blockId) { channel =>
val out = Channels.newOutputStream(channel)
// 儲存到磁碟是一定要序列化的
serializerManager.dataSerializeStream(blockId, out, iterator())(classTag)
}
size = diskStore.getSize(blockId)
}
// 獲取剛剛剛剛寫入的塊的狀態資訊
val putBlockStatus = getCurrentBlockStatus(blockId, info)
val blockWasSuccessfullyStored = putBlockStatus.storageLevel.isValid
// 如果塊儲存成功,那麼進行接下來的動作
if (blockWasSuccessfullyStored) {
// Now that the block is in either the memory or disk store, tell the master about it.
info.size = size
// 向driver彙報塊資訊
if (tellMaster && info.tellMaster) {
reportBlockStatus(blockId, putBlockStatus)
}
// 更新任務度量系統中關於塊資訊的相關統計值
addUpdatedBlockStatusToTaskMetrics(blockId, putBlockStatus)
logDebug("Put block %s locally took %s".format(blockId, Utils.getUsedTimeMs(startTimeMs)))
// 如果副本數大於1,那麼需要進行額外的複製
if (level.replication > 1) {
val remoteStartTime = System.currentTimeMillis
val bytesToReplicate = doGetLocalBytes(blockId, info)
// [SPARK-16550] Erase the typed classTag when using default serialization, since
// NettyBlockRpcServer crashes when deserializing repl-defined classes.
// TODO(ekl) remove this once the classloader issue on the remote end is fixed.
val remoteClassTag = if (!serializerManager.canUseKryo(classTag)) {
scala.reflect.classTag[Any]
} else {
classTag
}
try {
replicate(blockId, bytesToReplicate, level, remoteClassTag)
} finally {
bytesToReplicate.dispose()
}
logDebug("Put block %s remotely took %s"
.format(blockId, Utils.getUsedTimeMs(remoteStartTime)))
}
}
assert(blockWasSuccessfullyStored == iteratorFromFailedMemoryStorePut.isEmpty)
iteratorFromFailedMemoryStorePut
}
}
總結一下這段程式碼的主要邏輯:
- 如果儲存等級允許存入記憶體,那麼優先存入記憶體中。根據儲存的資料是否需要序列化分別選擇呼叫memoryStore的不同方法。
- 如果儲存等級不允許記憶體,那麼只能存入磁碟中,存入磁碟中的資料一定是經過序列化的,這點要注意。
- 向BlockManagerMaster彙報剛寫入的塊的位置資訊
- 更新任務度量系統中關於塊資訊的相關統計值
- 如果副本數大於1,那麼需要進行額外的複製
從上面的步驟可以看到,在完成資料寫入後,會通過rpc呼叫向BlockManagerMaster彙報塊的資訊,這也解答了blockManagerMaster.getLocations方法從記憶體的map結構中查詢塊的位置資訊的來源。
單純就儲存資料來說,最重要的無疑是記憶體管理器MemoryStore和磁碟管理器DiskStore。
對於MemoryStore和DiskStore呼叫的儲存方法有:
memoryStore.putIteratorAsValues
memoryStore.putIteratorAsBytes
diskStore.put(blockId: BlockId)(writeFunc: WritableByteChannel => Unit): Unit
diskStore.getSize(blockId)
blockManager.putBytes
我們再來接著看另一個寫入方法,putBytes,即寫入位元組陣列資料。它的實際寫入的邏輯在doPutBytes方法中,我們看一下這個方法:
blockManager.doPutBytes
這個方法的主要步驟與doPutIterator方法差不多。只不過doPutIterator方法插入的是java物件,如果儲存級別要求序列化或者儲存到磁碟時,需要將物件序列化。
private def doPutBytes[T](
blockId: BlockId,
bytes: ChunkedByteBuffer,
level: StorageLevel,
classTag: ClassTag[T],
tellMaster: Boolean = true,
keepReadLock: Boolean = false): Boolean = {
doPut(blockId, level, classTag, tellMaster = tellMaster, keepReadLock = keepReadLock) { info =>
val startTimeMs = System.currentTimeMillis
// Since we're storing bytes, initiate the replication before storing them locally.
// This is faster as data is already serialized and ready to send.
// 啟動副本複製
val replicationFuture = if (level.replication > 1) {
Future {
// This is a blocking action and should run in futureExecutionContext which is a cached
// thread pool. The ByteBufferBlockData wrapper is not disposed of to avoid releasing
// buffers that are owned by the caller.
replicate(blockId, new ByteBufferBlockData(bytes, false), level, classTag)
}(futureExecutionContext)
} else {
null
}
val size = bytes.size
// 如果快取級別中包含記憶體,優先寫入記憶體中
if (level.useMemory) {
// Put it in memory first, even if it also has useDisk set to true;
// We will drop it to disk later if the memory store can't hold it.
// 是否以序列化形式儲存
val putSucceeded = if (level.deserialized) {
val values =
serializerManager.dataDeserializeStream(blockId, bytes.toInputStream())(classTag)
memoryStore.putIteratorAsValues(blockId, values, classTag) match {
case Right(_) => true
case Left(iter) =>
// If putting deserialized values in memory failed, we will put the bytes directly to
// disk, so we don't need this iterator and can close it to free resources earlier.
iter.close()
false
}
} else {
// 如果以序列化格式儲存,則不需要反序列化
val memoryMode = level.memoryMode
memoryStore.putBytes(blockId, size, memoryMode, () => {
// 如果存在非直接記憶體,那麼需要將資料拷貝一份到直接記憶體中
if (memoryMode == MemoryMode.OFF_HEAP &&
bytes.chunks.exists(buffer => !buffer.isDirect)) {
bytes.copy(Platform.allocateDirectBuffer)
} else {
bytes
}
})
}
// 如果插入記憶體失敗,並且允許寫入磁碟的話,就將資料寫入磁碟
// 插入記憶體失敗一般是因為記憶體不夠引起
if (!putSucceeded && level.useDisk) {
logWarning(s"Persisting block $blockId to disk instead.")
diskStore.putBytes(blockId, bytes)
}
} else if (level.useDisk) {// 如果只允許儲存到磁碟,那就只能存到磁碟了
// 儲存到磁碟的資料一定是序列化的
diskStore.putBytes(blockId, bytes)
}
// 剛剛插入的塊的資訊
val putBlockStatus = getCurrentBlockStatus(blockId, info)
val blockWasSuccessfullyStored = putBlockStatus.storageLevel.isValid
if (blockWasSuccessfullyStored) {
// Now that the block is in either the memory or disk store,
// tell the master about it.
info.size = size
// 向driver端的BlockManagerMaster元件彙報塊資訊
if (tellMaster && info.tellMaster) {
reportBlockStatus(blockId, putBlockStatus)
}
// 更新任務度量值
addUpdatedBlockStatusToTaskMetrics(blockId, putBlockStatus)
}
logDebug("Put block %s locally took %s".format(blockId, Utils.getUsedTimeMs(startTimeMs)))
if (level.replication > 1) {
// Wait for asynchronous replication to finish
// 等待之前啟動的副本複製執行緒完成
// 注意這裡的超時被設成了無窮大
try {
ThreadUtils.awaitReady(replicationFuture, Duration.Inf)
} catch {
case NonFatal(t) =>
throw new Exception("Error occurred while waiting for replication to finish", t)
}
}
if (blockWasSuccessfullyStored) {
None
} else {
Some(bytes)
}
}.isEmpty
}
對於MemoryStore和DiskStore呼叫的方法有:
memoryStore.putBytes
diskStore.putBytes(blockId, bytes)
總結
綜上,我們把一個spark作業執行過程中需要呼叫到BlockManager的時機以及呼叫的BlockManager的一些寫入資料的方法大致整理了一下。BlockManager主要是通過內部的兩個元件MemoryStore和DiskStore來管理資料向記憶體或磁碟寫入的。此外DiskBlockManager元件主要是用來管理Block和磁碟檔案之間的對應關係,分配檔案路徑,管理本地檔案系統路徑等作用。對於MemoryStore和DiskStore的呼叫主要有如下幾個方法:
memoryStore.putIteratorAsValues
memoryStore.putIteratorAsBytes
diskStore.put(blockId: BlockId)(writeFunc: WritableByteChannel => Unit): Unit
diskStore.getSize(blockId)
memoryStore.putBytes
diskStore.putBytes(blockId, bytes)