1. 程式人生 > >Spark的Shuffle過程

Spark的Shuffle過程

shuffle原理:

Shuffle是MapReduce框架中的一個特定的phase,介於Map phase和Reduce phase之間,當Map的輸出結果要被Reduce使用時,輸出結果需要按key雜湊,並且分發到每一個Reducer上去,這個過程就是shuffle。shuflle描述著資料從map task到reduce task輸入的這段過程,如果在分散式的情況下,reduce task需要reduce task需要跨節點去拉去其他節點上的map結果,由於shuffle涉及到了磁碟的讀寫和網路的傳輸,因此shuffle效能的高低直接影響到了整個程式的執行效率。

Shuffle 過程中,提供資料的一端,被稱作 Map 端,Map 端每個生成資料的任務稱為 Mapper,對應的,接收資料的一端,被稱作 Reduce 端,Reduce 端每個拉取資料的任務稱為 Reducer,Shuffle 過程本質上都是將 Map 端獲得的資料使用分割槽器進行劃分,並將資料傳送給對應的 Reducer 的過程。

Shuffle write:

由於不要求資料有序,shuffle write 的任務很簡單:將資料 partition 好,並持久化。之所以要持久化,一方面是要減少記憶體儲存空間壓力,另一方面也是為了 fault-tolerance。
shuffle write 的任務很簡單,那麼實現也很簡單:將 shuffle write 的處理邏輯加入到 ShuffleMapStage(ShuffleMapTask 所在的 stage) 的最後,該 stage 的 final RDD 每輸出一個 record 就將其 partition 並持久化。圖示如下:
這裡寫圖片描述
上圖有 4 個 ShuffleMapTask 要在同一個 worker node 上執行,CPU core 數為 2,可以同時執行兩個 task。每個 task 的執行結果(該 stage 的 finalRDD 中某個 partition 包含的 records)被逐一寫到本地磁碟上。每個 task 包含 R 個緩衝區,R = reducer 個數(也就是下一個 stage 中 task 的個數),緩衝區被稱為 bucket,其大小為spark.shuffle.file.buffer.kb ,預設是 32KB(Spark 1.1 版本以前是 100KB)。

其實 bucket 是一個廣義的概念,代表 ShuffleMapTask 輸出結果經過 partition 後要存放的地方,這裡為了細化資料存放位置和資料名稱,僅僅用 bucket 表示緩衝區。

ShuffleMapTask 的執行過程很簡單:先利用 pipeline 計算得到 finalRDD 中對應 partition 的 records。每得到一個 record 就將其送到對應的 bucket 裡,具體是哪個 bucket 由partitioner.partition(record.getKey()))決定。每個 bucket 裡面的資料會不斷被寫到本地磁碟上,形成一個 ShuffleBlockFile,或者簡稱 FileSegment。之後的 reducer 會去 fetch 屬於自己的 FileSegment,進入 shuffle read 階段。
這樣的實現很簡單,但有幾個問題:

  • 1.產生的 FileSegment 過多。每個 ShuffleMapTask 產生 R(reducer 個數)個 FileSegment,M 個 ShuffleMapTask 就會產生 M * R 個檔案。一般 Spark job 的 M 和 R 都很大,因此磁碟上會存在大量的資料檔案。
  • 2.緩衝區佔用記憶體空間大。每個 ShuffleMapTask 需要開 R 個 bucket,M 個 ShuffleMapTask 就會產生 M R 個 bucket。雖然一個 ShuffleMapTask 結束後,對應的緩衝區可以被回收,但一個 worker node 上同時存在的 bucket 個數可以達到 cores R 個(一般 worker 同時可以執行 cores 個 ShuffleMapTask),佔用的記憶體空間也就達到了cores * R * 32 KB。對於 8 核 1000 個 reducer 來說,佔用記憶體就是 256MB。

目前來看,第二個問題還沒有好的方法解決,因為寫磁碟終究是要開緩衝區的,緩衝區太小會影響 IO 速度。但第一個問題有一些方法去解決,下面介紹已經在 Spark 裡面實現的 FileConsolidation 方法。先上圖:

這裡寫圖片描述

可以明顯看出,在一個core上連續執行的ShuffleMapTasks可以共用一個輸出檔案 ShuffleFile。先執行完的 ShuffleMapTask 形成ShuffleBlock i,後執行的 ShuffleMapTask可以將輸出資料直接追加到ShuffleBlock i後面,形成ShuffleBlock i’,每個ShuffleBlock被稱為FileSegment。下一個stage的reducer只需要fetch整個 ShuffleFile就行了。這樣每個worker持有的檔案數降為cores*R。
FileConsolidation 功能可以通過spark.shuffle.consolidateFiles=true來開啟。

Shuffle read:

如上圖所示,Reducer從mapper中拉取資料的過程被稱為Shuffleread,這個過程是一個動態過程,Reducer中有個softbuffer緩衝區(比較小),mapper拉取資料時,先放在緩衝區裡,當緩衝區滿了,再把資料以鍵值對的形式鏈到RDD鏈上。