MapReduce核心map reduce shuffle (spill sort partition merge)詳解
阿新 • • 發佈:2019-02-20
上圖可能是某個map task的執行情況。拿它與官方圖的左半邊比較,會發現很多不一致。官方圖沒有清楚地說明partition, sort與combiner到底作用在哪個階段。我畫了這張圖,希望讓大家清晰地瞭解從map資料輸入到map端所有資料準備好的全過程。
整個流程我分了四步。簡單些可以這樣說,每個map task都有一個記憶體緩衝區,儲存著map的輸出結果,當緩衝區快滿的時候需要將緩衝區的資料以一個臨時檔案的方式存放到磁碟,當整個map task結束後再對磁碟中這個map task產生的所有臨時檔案做合併,生成最終的正式輸出檔案,然後等待reduce task來拉資料。
當然這裡的每一步都可能包含著多個步驟與細節,下面我對細節來一一說明:
1
2. 在經過mapper的執行後,我們得知mapper的輸出是這樣一個key/value對: key是“aaa”, value是數值1。因為當前map端只做加1的操作,在reduce task裡才去合併結果集。前面我們知道這個job有3個reduce task,到底當前的“aaa”應該交由哪個reduce去做呢,是需要現在決定的。
MapReduce提供Partitioner介面,它的作用就是根據key或value及reduce的數量來決定當前的這對輸出資料最終應該交由哪個 reduce task處理。預設對key hash後再以reduce task數量取模。預設的取模方式只是為了平均reduce的處理能力,如果使用者自己對Partitioner有需求,可以訂製並設定到job上。
在我們的例子中,“aaa”經過Partitioner後返回0,也就是這對值應當交由第一個reducer來處理。接下來,需要將資料寫入記憶體緩衝區 中,緩衝區的作用是批量收集map結果,減少磁碟IO的影響。我們的key/value對以及Partition的結果都會被寫入緩衝區。當然寫入之 前,key與value值都會被序列化成位元組陣列。
整個記憶體緩衝區就是一個位元組陣列,它的位元組索引及key/value儲存結構我沒有研究過。如果有朋友對它有研究,那麼請大致描述下它的細節吧。
3
當溢寫執行緒啟動後,需要對這80MB空間內的key做排序(Sort)。排序是MapReduce模型預設的行為,這裡的排序也是對序列化的位元組做的排序。
在這裡我們可以想想,因為map task的輸出是需要傳送到不同的reduce端去,而記憶體緩衝區沒有對將傳送到相同reduce端的資料做合併,那麼這種合併應該是體現是磁碟檔案中 的。從官方圖上也可以看到寫到磁碟中的溢寫檔案是對不同的reduce端的數值做過合併。所以溢寫過程一個很重要的細節在於,如果有很多個 key/value對需要傳送到某個reduce端去,那麼需要將這些key/value值拼接到一塊,減少與partition相關的索引記錄。
在針對每個reduce端而合併資料時,有些資料可能像這樣:“aaa”/1, “aaa”/1。對於WordCount例子,就是簡單地統計單詞出現的次數,如果在同一個map task的結果中有很多個像“aaa”一樣出現多次的key,我們就應該把它們的值合併到一塊,這個過程叫reduce也叫combine。但 MapReduce的術語中,reduce只指reduce端執行從多個map task取資料做計算的過程。除reduce外,非正式地合併資料只能算做combine了。其實大家知道的,MapReduce中將Combiner等 同於Reducer。
如果client設定過Combiner,那麼現在就是使用Combiner的時候了
4. 每次溢寫會在磁碟上生成一個溢寫檔案,如果map的輸出結果真的很大,有多次這樣的溢寫發生,磁碟上相應的就會有多個溢寫檔案存在。當map task真正完成時,記憶體緩衝區中的資料也全部溢寫到磁碟中形成一個溢寫檔案。