MapReduce之Shuffle詳解
阿新 • • 發佈:2020-07-15
Hadoop原生的計算框架MapReduce,簡單概括一下:程序量級很重,啟動很慢,但能承載的資料量很大,效率相較於Spark微批處理和Flink實時來講很慢,Shuffle任何一個寫MR同學都必須掌握的東西,說難不難,說簡單也不簡單
MapReduce程式的五個階段:
- input
- map
- shuffle
- reduce
- output
我將Shuffle階段加粗了,原因很簡單,因為這裡很重要
1. 關於Shuffle過程實現的功能:
1. 分割槽:
- 決定當前的Key交給哪個Reducer進行處理,相同的Key則由相同的Reducer處理
- 預設是根據Key的Hash值,對Reduce個數取餘(原始碼如下)
public
2. 分組
- 將相同的Key的value進行合併
- Key相等時將分到同一個組裡面
- MapReduce階段,一行呼叫一次Map方法,一種Key呼叫一次Reduce
3. 排序:將Key按照字典排序
2. 關於Shuffle過程實現功能的詳細描述:
1. Map端Shuffle:- Spill:溢寫
- 每一個Map處理之後的結果都會進入環形緩衝區(記憶體,預設100M)(關於環形緩衝區有必要單獨瞭解一下,不詳細展開了)
- 分割槽:對每一條key-value進行分割槽,打標籤
- 排序:將相同分割槽的資料進行分割槽內排序
- 當環形緩衝區達到閾值的80%,將分割槽排序後的資料寫到磁碟變成檔案,最終會生成多個小檔案,
-
Merge合併:
- 將spill生成的小檔案進行合併
- 將相同分割槽的資料進行排序
- (Map task結束)通知ApplicationMaster,Reduce主動過來拉取資料Reduce端Shuffle
2. Reduce端Shuffle:
- 啟動多個執行緒,去每臺機器上拉去屬於自己分割槽的M資料
-
Merge:
- 將每個Maptask的結果屬於自己分割槽的資料進行合併
- 將整體屬於自己分割槽的資料進行排序
- 分組:對相同的key的value進行合併
3. 關於MapReduce的Shuffle優化:
MapReduce Shuffle過程的優化:- Combiner:合併
- 在map階段提前進行了一次合併,一般來說等同於提前進行了reduce,降低reduce的壓力
- 不是所有的程式都適合combiner
- Compress:壓縮
- 能大大減少磁碟和網路的IO
- hadoop中設定壓縮:
- hadoop checknative檢視本地支援哪些壓縮
- 常見的壓縮格式:snappy,lzo,lz4
- 修改本地支援的壓縮方式:替換lib/native
- MapReduce程式可以設定壓縮的位置:
- 輸入
- map的中間結果(需要同時指定)
- mapreduce.map.output.compress
- mapreduce.map.output.compress.codec=預設是DefaultCodec
- reduce的輸出
- mapreduce.output.fileoutputformat.compress
- Mapreduce,output.fileoutputformat.compress.codec
- 怎麼設定壓縮:
- 叢集配置檔案內
- 設定conf物件當前程式有效
- 執行時指定引數: -Dmapreduce.output.fileoutputformat.compress=true ….