1. 程式人生 > >9.Shuffle讀寫原始碼分析

9.Shuffle讀寫原始碼分析

  1. /**
  2. * 給每個map task獲取一個ShuffleWriterGroup
  3. */
  4. def forMapTask(shuffleId:Int, mapId:Int, numBuckets:Int, serializer:Serializer,
  5. writeMetrics:ShuffleWriteMetrics)={
  6. newShuffleWriterGroup{
  7. shuffleStates.putIfAbsent(shuffleId,newShuffleState(numBuckets))
  8. private val shuffleState = shuffleStates(shuffleId
    )
  9. private var fileGroup:ShuffleFileGroup=null
  10. // 重點: 對應上我們之前所說的shuffle有兩種模式 , 一種是普通的,一種是優化後的
  11. // 如果開啟了consolication機制,也即使consolicationShuffleFiles為true的話那麼實際上不會給每個bucket都獲取一個獨立的檔案
  12. // 而是為了這個bucket獲取一個ShuffleGroup的writer
  13. val writers:Array[BlockObjectWriter]=if(consolidateShuffleFiles){
  14. fileGroup = getUnusedFileGroup
    ()
  15. Array.tabulate[BlockObjectWriter](numBuckets){ bucketId =>
  16. // 首先用shuffleId, mapId,bucketId生成一個一個唯一的ShuffleBlockId
  17. // 然後用bucketId來呼叫shuffleFileGroup的apply()函式為bucket獲取一個ShuffleFileGroup
  18. val blockId =ShuffleBlockId(shuffleId, mapId, bucketId)
  19. // 然後用BlockManager的getDisWriter()方法針對ShuffleFileGroup獲取一個Writer
  20. // 這樣的話如果開啟了consolidation機制那麼對於每一個bucket都會獲取一個針對ShuffleFileGroup的writer , 而不是一個獨立的ShuffleBlockFile的writer
  21. // 這樣就實現了所謂的多個ShuffleMapTask的輸出資料合併
  22. blockManager.getDiskWriter(blockId, fileGroup(bucketId), serializer, bufferSize,
  23. writeMetrics)
  24. }
  25. }else{
  26. // 如果沒有開啟consolation機制
  27. Array.tabulate[BlockObjectWriter](numBuckets){ bucketId =>
  28. // 同樣生成一個ShuffleBlockId
  29. val blockId =ShuffleBlockId(shuffleId, mapId, bucketId)
  30. // 然後呼叫BlockManager的DiskBlockManager , 獲取一個代表了要寫入本地磁碟檔案的BlockFile
  31. val blockFile = blockManager.diskBlockManager.getFile(blockId)
  32. // Because of previous failures, the shuffle file may already exist on this machine.
  33. // If so, remove it.
  34. // 而且會判斷這個blockFile要是存在的話還得刪除它
  35. if(blockFile.exists){
  36. if(blockFile.delete()){
  37. logInfo(s"Removed existing shuffle file $blockFile")
  38. }else{
  39. logWarning(s"Failed to remove existing shuffle file $blockFile")
  40. }
  41. }
  42. // 然後呼叫BlockManager的getDiskWriterff針對那個blockFile生成writer
  43. blockManager.getDiskWriter(blockId, blockFile, serializer, bufferSize, writeMetrics)
  44. }
  45. // 所以使用過這種普通的我shuffle操作的話對於每一個ShuffleMapTask輸出的bucket都會在本地獲取一個但粗的shuffleBlockFile
  46. }
上面程式碼的註釋已經很詳細啦 , 就是根據是否設定consolication機制來判斷是否給每一個bucket資料建立一個獨立的檔案 , 若設定了consolication機制的話那麼就會給這個bucket資料生成一個shuffeBlockId 然後根據bucket原有的id獲取到一個ShuffleFileGroup . 而最後就會針對每一個bucket都會獲取這個關於ShuffleFileGroup的Writer進行資料的寫 , 而不是為每一個bucket都建立一個獨立的shufflerBlockFile的writer 上面是關於一個stage中最後shuffle的寫操作 , 接下來就是下一個stage讀取上一個stage shuffle資料的讀操作: 先來看下ShuffleRDD中的compute方法 , 原始碼如下:
  1. /**
  2. * Shuffle讀資料的入口
  3. */
  4. override def compute(split:Partition, context:TaskContext):Iterator[(K, C)]={
  5. // ResultTask或者ShuffleMapTask在執行ShuffleRDD時肯定會呼叫ShuffleRDD的compute方法,來計算當前這個RDD的partition的資料
  6. // 這個就是之前的Task原始碼分析時結合TaskRunner所分析的
  7. // 在這裡會呼叫ShuffleManager的getReader()方法,獲取一個HashShuffleReader , 然後呼叫它的read()方法拉取該ResultTask,ShuffleMapTask需要聚合的資料
  8. val dep = dependencies.head.asInstanceOf[ShuffleDependency[K, V, C]]
  9. SparkEnv.get.shuffleManager.getReader(dep.shuffleHandle, split.index, split.index +1, context)
  10. .read()
  11. .asInstanceOf[Iterator[(K, C)]]
  12. }