1. 程式人生 > >MapReduce 的 shuffle 機制

MapReduce 的 shuffle 機制

由於 MapReduce 確保每個 reducer 的輸入都是按鍵排序的,因此在 map 處理完資料之後傳給 reducer 的這個過程中需要進行一系列操作,這個操作過程就是 shuffle。在《hadoop權威指南》中指出,shuffle 是 MapReduce 的 “心臟”,瞭解 shuffle 工作機制有助於我們優化 MapReduce 程式,接下來我們就來看看它的執行機制。

shuffle 流程

先用一張圖表示 shuffle 的整個過程。從圖中我們可以看到 shuffle 流程主要是對 map 的資料進行排序、分組傳送給 reduce 後再進行合併的一個過程,我們將分 map 和 reduce 兩個部分來講解 shuffle 的流程。

shfulle機制

map 端 shuffle

map 任務開始產生資料時,會先將這些資料儲存在一個 記憶體緩衝區 中,這個緩衝區大小預設為 100MB,可以通過設定 mapreduce.task.io.sort.mb 來改變其大小。由於 hadoop 處理的是海量資料,100MB 的記憶體顯然是不夠用的,因此達到一定 閾值 時(預設為 0.8,可以通過設定 mapreduce.map.sort.spill.percent 來改變其大小),會將記憶體中的內容溢位(spill)到磁碟當中,溢位的路徑是由 mapreduce.cluster.local.dir 屬性指定的。在溢位到磁碟的過程中,如果緩衝區中還有空間,map 程式會繼續輸出資料到緩衝區中,如果沒有空間的話,map 輸出程式則會阻塞直到資料寫入到磁碟後。

在上圖中 buffer in memory(輸出到快取中) 和 merge on disk(合併到磁碟) 這兩個步驟中間還有一個 分割槽、排序 的步驟。分割槽能達到跟分組類似的效果,例如讀取一個含有大量電話號碼的資料時,把 138 的分為一組,把 135 分為一組。這個效果可以通過自定一個類繼承 Partitioner,然後在 Job 中呼叫 setPartitionerClass 方法設定分割槽類來完成。在每個分割槽中,後臺執行緒按照鍵的值對資料在記憶體中進行排序,如果有一個 combiner 方法,則在排序完成之後執行它。combiner 方法會使 map 輸出更緊湊,減少寫到磁碟中的資料和傳給 reducer 的資料。

一般情況下,map 的輸出結果並不會進行壓縮,由於資料量大,對網路資源的耗費很大,為了對 mapreduce 程式進行優化,我們可以將 mapreduce.map.output.compress 屬性設定為 true,這樣當 map 將資料寫到磁碟時就會對資料進行壓縮。具體的壓縮格式可以通過 mapreduce.map.output.compress.codec 屬性來設定。當所有記錄都寫完之後,map 會合並全部的溢位檔案為一個分割槽且排序的檔案傳給 reduce。

reduce 端 shuffle

reducer 通過 HTTP 的方式獲取 map 的的輸出資料,這是複製階段。reducer 在複製階段把 Map 輸出複製到 Reducer 的記憶體或磁碟,一個 Map 任務完成後,Reduce 就開始複製輸出。複製完所有的 map 輸出之後,reducer 對這些資料進行合併,使它們仍然保持有序。合併完成之後,直接將這些資料輸入到 reduce 函式中,從而省略一次寫入磁碟的時間。至此,整個 shuffle 流程就完成了。

以上便是我對 MR shuffle 機制的理解,如果其中有錯,歡迎指出。