9.Shuffle讀寫原始碼分析
阿新 • • 發佈:2019-02-02
上面程式碼的註釋已經很詳細啦 , 就是根據是否設定consolication機制來判斷是否給每一個bucket資料建立一個獨立的檔案 , 若設定了consolication機制的話那麼就會給這個bucket資料生成一個shuffeBlockId 然後根據bucket原有的id獲取到一個ShuffleFileGroup . 而最後就會針對每一個bucket都會獲取這個關於ShuffleFileGroup的Writer進行資料的寫 , 而不是為每一個bucket都建立一個獨立的shufflerBlockFile的writer 上面是關於一個stage中最後shuffle的寫操作 , 接下來就是下一個stage讀取上一個stage shuffle資料的讀操作: 先來看下ShuffleRDD中的compute方法 , 原始碼如下:
/**
* 給每個map task獲取一個ShuffleWriterGroup
*/
def forMapTask(shuffleId:Int, mapId:Int, numBuckets:Int, serializer:Serializer,
writeMetrics:ShuffleWriteMetrics)={
newShuffleWriterGroup{
shuffleStates.putIfAbsent(shuffleId,newShuffleState(numBuckets))
private val shuffleState = shuffleStates(shuffleId
)private var fileGroup:ShuffleFileGroup=null
// 重點: 對應上我們之前所說的shuffle有兩種模式 , 一種是普通的,一種是優化後的
// 如果開啟了consolication機制,也即使consolicationShuffleFiles為true的話那麼實際上不會給每個bucket都獲取一個獨立的檔案
// 而是為了這個bucket獲取一個ShuffleGroup的writer
val writers:Array[BlockObjectWriter]=if(consolidateShuffleFiles){
fileGroup = getUnusedFileGroup
()Array.tabulate[BlockObjectWriter](numBuckets){ bucketId =>
// 首先用shuffleId, mapId,bucketId生成一個一個唯一的ShuffleBlockId
// 然後用bucketId來呼叫shuffleFileGroup的apply()函式為bucket獲取一個ShuffleFileGroup
val blockId =ShuffleBlockId(shuffleId, mapId, bucketId)
// 然後用BlockManager的getDisWriter()方法針對ShuffleFileGroup獲取一個Writer
// 這樣的話如果開啟了consolidation機制那麼對於每一個bucket都會獲取一個針對ShuffleFileGroup的writer , 而不是一個獨立的ShuffleBlockFile的writer
// 這樣就實現了所謂的多個ShuffleMapTask的輸出資料合併
blockManager.getDiskWriter(blockId, fileGroup(bucketId), serializer, bufferSize,
writeMetrics)
}
}else{
// 如果沒有開啟consolation機制
Array.tabulate[BlockObjectWriter](numBuckets){ bucketId =>
// 同樣生成一個ShuffleBlockId
val blockId =ShuffleBlockId(shuffleId, mapId, bucketId)
// 然後呼叫BlockManager的DiskBlockManager , 獲取一個代表了要寫入本地磁碟檔案的BlockFile
val blockFile = blockManager.diskBlockManager.getFile(blockId)
// Because of previous failures, the shuffle file may already exist on this machine.
// If so, remove it.
// 而且會判斷這個blockFile要是存在的話還得刪除它
if(blockFile.exists){
if(blockFile.delete()){
logInfo(s"Removed existing shuffle file $blockFile")
}else{
logWarning(s"Failed to remove existing shuffle file $blockFile")
}
}
// 然後呼叫BlockManager的getDiskWriterff針對那個blockFile生成writer
blockManager.getDiskWriter(blockId, blockFile, serializer, bufferSize, writeMetrics)
}
// 所以使用過這種普通的我shuffle操作的話對於每一個ShuffleMapTask輸出的bucket都會在本地獲取一個但粗的shuffleBlockFile
}
/**
* Shuffle讀資料的入口
*/
override def compute(split:Partition, context:TaskContext):Iterator[(K, C)]={
// ResultTask或者ShuffleMapTask在執行ShuffleRDD時肯定會呼叫ShuffleRDD的compute方法,來計算當前這個RDD的partition的資料
// 這個就是之前的Task原始碼分析時結合TaskRunner所分析的
// 在這裡會呼叫ShuffleManager的getReader()方法,獲取一個HashShuffleReader , 然後呼叫它的read()方法拉取該ResultTask,ShuffleMapTask需要聚合的資料
val dep = dependencies.head.asInstanceOf[ShuffleDependency[K, V, C]]
SparkEnv.get.shuffleManager.getReader(dep.shuffleHandle, split.index, split.index +1, context)
.read()
.asInstanceOf[Iterator[(K, C)]]
}