spark原始碼解析--Shuffle輸出追蹤者--MapOutputTracker
Shuffle輸出追蹤者--MapOutputTracker
這個元件作為shuffle的一個輔助元件,在整個shuffle模組中具有很重要的作用。我們在前面一系列的分析中,或多或少都會提到這個元件,比如在DAGScheduler提交一個stage時會將這個stage封裝成一個任務集(TaskSet),但是可能有的分割槽已經計算過了,有了結果(stage由於失敗可能會多次提交,其中有部分task可能已經計算完成),這些分割槽就不需要再次計算,而只需要計算那些失敗的分割槽,那麼很顯然需要有一個元件來維護shuffle過程中的任務失敗成功的狀態,以及計算結果的位置資訊。
此外,在shuffle讀取階段,我們知道一個reduce端的分割槽會依賴於多個map端的分割槽的輸出資料,那麼我們在讀取一個reduce分割槽對應的資料時,就需要知道這個reduce分割槽依賴哪些map分割槽,每個block的物理位置是什麼,blockId是什麼,這個block中屬於這個reduce分割槽的資料量大小是多少,這些資訊的記錄維護都是靠MapOutputTracker來實現的,所以我們現在知道MapOutputTracker的重要性了。
MapOutputTracker.scala
MapOutputTracker元件的主要功能類和輔助類全部在這個檔案中,我先大概說一下各個類的主要作用,然後重點分析關鍵的類。
- ShuffleStatus,這個類是對一個stage的shuffle輸出狀態的封裝,它內部的一個主要的成員mapStatuses是一個數組,這個陣列的下標就是map的分割槽序號,存放了每個map分割槽的輸出情況,關於MapStatus具體可以看MapStatus.scala,這裡不打算展開。
- MapOutputTrackerMessage,用於rpc請求的訊息類,有兩個實現類:GetMapOutputStatuses用於獲取某次shuffle的所有輸出狀態;StopMapOutputTracker用於向driver端的傳送停止MapOutputTrackerMasterEndpoint端點的請求。
- MapOutputTrackerMasterEndpoint,如果熟悉spark的rpc模組的話,對這個類應該就很熟悉,它就是一個rpc服務端,通過向RpcEnv註冊自己,通過一個名稱標識自己,從而接收到特定一些訊息,也就是上面說的兩種訊息。
- MapOutputTracker,這個類是一個抽象類,只是定義了一些操作介面,它的一個最重要的作用可能就是內部維護了一個序列值epoch,這個值表示某一個一致的全域性map輸出狀態,一旦有map輸出發生變更,這個值就要加一,executor端會同步最新的epoch以判斷自己的map輸出狀態的快取是否過期。
- MapOutputTrackerMaster,執行在driver端,實現類MapOutputTracker的大部分功能,是最核心的類
- MapOutputTrackerWorker,執行在executor端,主要作用是封裝了rpc呼叫的邏輯。
總的來看,最核心的類是MapOutputTrackerMaster,其他的類都是圍繞這個類的一些輔助類,所以我們重點分析MapOutputTrackerMaster,其他的類我不打算深入展開,相信讀者自己也能夠較為輕鬆地理解。
MapOutputTrackerMaster
findMissingPartitions
這個方法在上面已經提到了,會在DAGScheduler封裝任務集的時候查詢一個stage需要計算的分割槽時會呼叫到。
def findMissingPartitions(shuffleId: Int): Option[Seq[Int]] = {
shuffleStatuses.get(shuffleId).map(_.findMissingPartitions())
}
ShuffleStatus.findMissingPartitions
def findMissingPartitions(): Seq[Int] = synchronized { val missing = (0 until numPartitions).filter(id => mapStatuses(id) == null) assert(missing.size == numPartitions - _numAvailableOutputs, s"${missing.size} missing, expected ${numPartitions - _numAvailableOutputs}") missing }
這兩段程式碼很簡單,不用多說,就是從map結構中查詢。
此外,像registerShuffle,registerMapOutput,unregisterMapOutput,unregisterShuffle,removeOutputsOnHost等等,我們可以看到這幾個方法本身都是很簡答的,無非就是對內部map結構的插入,更新和查詢,關鍵的是你要清楚這些方法的呼叫時機是什麼?弄清這一點,會讓我們對MapOutputTracker在整個spark框架中的作用和充當的角色有更深的理解。方法的呼叫地點,通過Idea這類IDE工具其實都可以很簡單地定位到,這裡我不做過多展開,僅僅簡單地概括一下:
- registerShuffle, DAGScheduler在建立一個ShuffleMapStage時會順便把這個stage對應的shuffle註冊進來。
- registerMapOutput, 在一個shuffleMapTask任務完成後,會把map輸出的資訊註冊進來。
- removeOutputsOnHost,將某個host上的相關map輸出資訊全部移除,一般在主機丟失時呼叫此操作
- removeOutputsOnExecutor,同樣地,將某個executor上的相關map輸出資訊全部移除,一般在executor丟失時呼叫此操作
getMapSizesByExecutorId
我們來看另一個比較重要的方法,在reduce階段讀取資料時,一個task首先需要知道它依賴於哪些map輸出,這時它回想driver端的MapOutputTrackerMasterEndpoint元件傳送一個獲取map輸出的訊息,經過一系列方法呼叫最終會呼叫這個方法:
def getMapSizesByExecutorId(shuffleId: Int, startPartition: Int, endPartition: Int)
: Seq[(BlockManagerId, Seq[(BlockId, Long)])] = {
logDebug(s"Fetching outputs for shuffle $shuffleId, partitions $startPartition-$endPartition")
shuffleStatuses.get(shuffleId) match {
case Some (shuffleStatus) =>
// 將所有的mapStatus陣列轉換成(BlockManagerId, Seq[(BlockId, Long)])物件
shuffleStatus.withMapStatuses { statuses =>
MapOutputTracker.convertMapStatuses(shuffleId, startPartition, endPartition, statuses)
}
case None =>
Seq.empty
}
}
我們看一下:MapOutputTracker.convertMapStatuses,這個方法也很簡單,其實就是將每個map分割槽輸出切分成reduce分割槽數量,最後產生的(BlockId, Long)元組數量等於map分割槽數量*reduce分割槽數量。
def convertMapStatuses(
shuffleId: Int,
startPartition: Int,
endPartition: Int,
statuses: Array[MapStatus]): Seq[(BlockManagerId, Seq[(BlockId, Long)])] = {
assert (statuses != null)
// 用於存放結果
val splitsByAddress = new HashMap[BlockManagerId, ArrayBuffer[(BlockId, Long)]]
// 最後產生的(BlockId, Long)元組數量等於map分割槽數量*reduce分割槽數量
for ((status, mapId) <- statuses.zipWithIndex) {
if (status == null) {
val errorMessage = s"Missing an output location for shuffle $shuffleId"
logError(errorMessage)
throw new MetadataFetchFailedException(shuffleId, startPartition, errorMessage)
} else {
for (part <- startPartition until endPartition) {
splitsByAddress.getOrElseUpdate(status.location, ArrayBuffer()) +=
((ShuffleBlockId(shuffleId, mapId, part), status.getSizeForBlock(part)))
}
}
}
splitsByAddress.toSeq
}
getPreferredLocationsForShuffle
我們來看另外一個比較重要的方法。我們知道reduce端的分割槽一般會依賴於多個map端分割槽輸出,但是對於每個map分割槽依賴的資料量是不同的,舉個極端的例子,假設reduce端某個分割槽依賴於10個map端的輸出分割槽,但是其中一個分割槽依賴的資料有10000條,而其他分割槽依賴的資料只有1條,這種情況下,顯然我們應該吧這個reduce任務優先排程到那個依賴了10000條的executor上。當然這個例子舉得很簡單,可能也不是什麼準確,但是也足夠說明這個方法的作用。
def getPreferredLocationsForShuffle(dep: ShuffleDependency[_, _, _], partitionId: Int)
: Seq[String] = {
// 首先判斷幾個引數配置,如果都符合條件,那麼再進行偏向位置的計算
if (shuffleLocalityEnabled && dep.rdd.partitions.length < SHUFFLE_PREF_MAP_THRESHOLD &&
dep.partitioner.numPartitions < SHUFFLE_PREF_REDUCE_THRESHOLD) {
// 關鍵呼叫
val blockManagerIds = getLocationsWithLargestOutputs(dep.shuffleId, partitionId,
dep.partitioner.numPartitions, REDUCER_PREF_LOCS_FRACTION)
if (blockManagerIds.nonEmpty) {
blockManagerIds.get.map(_.host)
} else {
Nil
}
} else {
Nil
}
}
可以看出來,關鍵的方法是getLocationsWithLargestOutputs,接下來,我們就來看一下這個方法:
註釋已經說得很清楚,這個方法的邏輯很簡單,比如一個reduce端分割槽要讀取的總資料量是100m, 某個executor上的所有map輸出中與這個reduce分割槽相關的資料加起來有20m,即超過了總量的0.2,這時這個executor就能夠成為偏向位置,是不是很簡單。但是這裡應該注意到一個問題,這個方法是以executor為最小單位計算偏向位置,而在前一個方法getPreferredLocationsForShuffle中,獲取到成為偏向位置的那些BlockManagerId後,僅僅是取出了host作為偏向位置返回給上層呼叫者,問題在於一個host(即物理節點)上可能有多個executor,這就會造成返回的結果中會有重複的host,;另外,既然返回host作為偏向位置,那為什麼不直接以host作為最小單位來計算偏向位置呢,比如將一個host上所有與這個reduce分割槽相關的資料加起來,如果超過0.2的佔比就認為這個host能夠作為偏向位置,這樣好像更合理,也更容易產生偏向位置。舉個極端的例子,一個host上運行了5個executor,每個executor與分割槽相關的資料佔比0.1,另外有5個host上每個都只運行了一個executor,他們的資料佔比均為0.1,這種情況下是不會產生偏向位置的,但是實際上顯然應該將那個擁有5個executor的host作為偏向位置。
def getLocationsWithLargestOutputs(
shuffleId: Int,
reducerId: Int,
numReducers: Int,
fractionThreshold: Double)
: Option[Array[BlockManagerId]] = {
val shuffleStatus = shuffleStatuses.get(shuffleId).orNull
// 對shuffleStatus非空檢查
if (shuffleStatus != null) {
shuffleStatus.withMapStatuses { statuses =>
// 對mapStatus陣列的非空檢查
if (statuses.nonEmpty) {
// HashMap to add up sizes of all blocks at the same location
// 記錄每個executor上的所有map輸出的block中屬於這個reduce端分割槽的資料量
val locs = new HashMap[BlockManagerId, Long]
var totalOutputSize = 0L
var mapIdx = 0
while (mapIdx < statuses.length) {
val status = statuses(mapIdx)
// status may be null here if we are called between registerShuffle, which creates an
// array with null entries for each output, and registerMapOutputs, which populates it
// with valid status entries. This is possible if one thread schedules a job which
// depends on an RDD which is currently being computed by another thread.
if (status != null) {
val blockSize = status.getSizeForBlock(reducerId)
if (blockSize > 0) {
locs(status.location) = locs.getOrElse(status.location, 0L) + blockSize
totalOutputSize += blockSize
}
}
mapIdx = mapIdx + 1
}
// 最後,判斷一個executor能否成為偏向位置的條件是:
// 這個executor上所有與這個reduce分割槽相關的資料大小與這個分割槽資料總量的比值是否大於一個閾值
// 這個閾值預設是0.2
val topLocs = locs.filter { case (loc, size) =>
size.toDouble / totalOutputSize >= fractionThreshold
}
// Return if we have any locations which satisfy the required threshold
if (topLocs.nonEmpty) {
return Some(topLocs.keys.toArray)
}
}
}
}
None
}
總結
國際慣例,再晚也要總結一下。我們簡單總結一下map輸出追蹤器的作用:
- 維護所有shuffle的map輸出狀態資訊,位置資訊等
- 查詢某個stage還有哪些未計算的分割槽
- 獲取reduce分割槽的偏向位置
- 獲取reduce分割槽依賴哪些map輸出,他們的位置,每個map輸出中相關資料的大小