MapReduce 工作流程
MapReduce 工作流程
以示例程式
wordcount
為例
Map
InputFormat
InputFormat
會將我們指定的輸入路徑中的檔案按照block
(預設 128M)邏輯切分成若干切片(split
,如果檔案不足 128M 則單獨為一個切片,如果滿了128M但是不滿128M*1.1也單獨為一個切片),然後交給RecordReader
進行處理,產出若干key/value record
RecordReader
產出的key/value record
會暫存在記憶體中的一塊環形緩衝區中(邏輯上成環形),寫入record
時會從環形上的兩個位置寫入,一個位置寫入record
,一個位置寫入record
inde
,這樣做的好處是:要想在環上找到一個record
不用遍歷資料量較大的record
序列,而只用遍歷資料量較小的index
列表。
Shuffle
Ring Buffer & Excessive Writing & Combine
MapTask
會根據輸入的大資料來源源不斷的產出record
,而環形緩衝的大小是有限的(假設是100M,此引數可配置),當環形緩衝的佔用量達到80%
(此引數可配置)時,就會對這80%
的record
進行一個全排序(準確的說是二次排序,先按照partition
有序(見Partitioner
),再按照record
的key
有序),如果你設定了CombinerClass
record
進行一個合併,最後寫入磁碟(此過程稱為溢寫),形成一個首先分割槽號有序其次key
有序的record
序列;而剩下的20%
則繼續迎接後續寫入的record
。
Merge Sort
這樣輸入的資料集就會分批次寫入到硬碟中,形成多個批次內有序的record
序列,然後再從硬碟中逐批讀出這些序列進行一個歸併排序(歸併的過程中又可以應用Combiner
做一個合併處理),最終產出該MapTask
對應的分割槽號有序、同一分割槽內record.key
有序的record
序列,即將流入Reducer
Reducer
setNumReduceTask
MapTask
的數量是由切片規則來決定的,輸入的資料集會被切成多少片就會有多少個MapTask
MapTask
都會產出一個分割槽號有序的record
序列,而ReducerTask
是在Driver
中通過setNumReducerTask
手動指定的,一般會和Partitioner
返回的分割槽號(返回0則會由ReduceTask1
處理併產出到part-r-0000
)類別數保持一致。
ReduceTask
ReduceTask
會從所有的MapTask
的產出中抓取出分割槽號和自己對應的record
過來,例如上圖中ReduceTask1
會分別抓取MapTask1
和MapTask2
產出的record
序列中分割槽號為0
的部分,進行一個歸併排序(過程中使用GroupingComparator
進行分組,結果對應Reducer#reduce
方法入參的Iterable values
)並將結果序列中的元素(Object key,Iterable values
)逐個交給Reducer#reduce
進行處理,可以通過context.write
寫入到output
對應分割槽號的part-r-000x
中。