1. 程式人生 > 程式設計 >MapReduce 工作流程

MapReduce 工作流程

MapReduce 工作流程

以示例程式 wordcount為例

image.png

Map

InputFormat

InputFormat會將我們指定的輸入路徑中的檔案按照block(預設 128M)邏輯切分成若干切片(split,如果檔案不足 128M 則單獨為一個切片,如果滿了128M但是不滿128M*1.1也單獨為一個切片),然後交給RecordReader進行處理,產出若干key/value record

RecordReader

產出的key/value record會暫存在記憶體中的一塊環形緩衝區中(邏輯上成環形),寫入record時會從環形上的兩個位置寫入,一個位置寫入record,一個位置寫入record

的索引inde,這樣做的好處是:要想在環上找到一個record不用遍歷資料量較大的record序列,而只用遍歷資料量較小的index列表。

Shuffle

Ring Buffer & Excessive Writing & Combine

MapTask會根據輸入的大資料來源源不斷的產出record,而環形緩衝的大小是有限的(假設是100M,此引數可配置),當環形緩衝的佔用量達到80%(此引數可配置)時,就會對這80%record進行一個全排序(準確的說是二次排序,先按照partition有序(見Partitioner),再按照recordkey有序),如果你設定了CombinerClass

那麼同時會對record進行一個合併,最後寫入磁碟(此過程稱為溢寫),形成一個首先分割槽號有序其次key有序的record序列;而剩下的20%則繼續迎接後續寫入的record

Merge Sort

這樣輸入的資料集就會分批次寫入到硬碟中,形成多個批次內有序的record序列,然後再從硬碟中逐批讀出這些序列進行一個歸併排序(歸併的過程中又可以應用Combiner做一個合併處理),最終產出該MapTask對應的分割槽號有序、同一分割槽內record.key有序的record序列,即將流入Reducer

Reducer

setNumReduceTask

MapTask的數量是由切片規則來決定的,輸入的資料集會被切成多少片就會有多少個MapTask

,每個MapTask都會產出一個分割槽號有序的record序列,而ReducerTask是在Driver中通過setNumReducerTask手動指定的,一般會和Partitioner返回的分割槽號(返回0則會由ReduceTask1處理併產出到part-r-0000)類別數保持一致。

ReduceTask

ReduceTask會從所有的MapTask的產出中抓取出分割槽號和自己對應的record過來,例如上圖中ReduceTask1會分別抓取MapTask1MapTask2產出的record序列中分割槽號為0的部分,進行一個歸併排序(過程中使用GroupingComparator進行分組,結果對應Reducer#reduce方法入參的Iterable values)並將結果序列中的元素(Object key,Iterable values)逐個交給Reducer#reduce進行處理,可以通過context.write寫入到output對應分割槽號的part-r-000x中。