1. 程式人生 > 實用技巧 >MapReduce之Shuffle詳解

MapReduce之Shuffle詳解

Hadoop原生的計算框架MapReduce,簡單概括一下:程序量級很重,啟動很慢,但能承載的資料量很大,效率相較於Spark微批處理和Flink實時來講很慢,Shuffle任何一個寫MR同學都必須掌握的東西,說難不難,說簡單也不簡單

MapReduce程式的五個階段:

  • input
  • map
  • shuffle
  • reduce
  • output

我將Shuffle階段加粗了,原因很簡單,因為這裡很重要

1. 關於Shuffle過程實現的功能:

1. 分割槽:

  • 決定當前的Key交給哪個Reducer進行處理,相同的Key則由相同的Reducer處理
  • 預設是根據Key的Hash值,對Reduce個數取餘(原始碼如下)
    public
    int getPartition(K2 key, V2 value, int numReduceTasks) { return (key.hashCode() & Integer.MAX_VALUE) % numReducTasks }

2. 分組

  • 將相同的Key的value進行合併
  • Key相等時將分到同一個組裡面
  • MapReduce階段,一行呼叫一次Map方法,一種Key呼叫一次Reduce

3. 排序:將Key按照字典排序

2. 關於Shuffle過程實現功能的詳細描述:

1. Map端Shuffle:
  1. Spill:溢寫
    1. 每一個Map處理之後的結果都會進入環形緩衝區(記憶體,預設100M)(關於環形緩衝區有必要單獨瞭解一下,不詳細展開了)
    2. 分割槽:對每一條key-value進行分割槽,打標籤
    3. 排序:將相同分割槽的資料進行分割槽內排序
    4. 當環形緩衝區達到閾值的80%,將分割槽排序後的資料寫到磁碟變成檔案,最終會生成多個小檔案,
  2. Merge合併:
    1. 將spill生成的小檔案進行合併
    2. 將相同分割槽的資料進行排序
  3. Map task結束)通知ApplicationMaster,Reduce主動過來拉取資料Reduce端Shuffle

2. Reduce端Shuffle:

  1. 啟動多個執行緒,去每臺機器上拉去屬於自己分割槽的M資料
  2. Merge:
    1. 將每個Maptask的結果屬於自己分割槽的資料進行合併
    2. 將整體屬於自己分割槽的資料進行排序
  3. 分組:對相同的key的value進行合併

3. 關於MapReduce的Shuffle優化:

MapReduce Shuffle過程的優化:
  1. Combiner:合併
    1. 在map階段提前進行了一次合併,一般來說等同於提前進行了reduce,降低reduce的壓力
    2. 不是所有的程式都適合combiner
  2. Compress:壓縮
    1. 能大大減少磁碟和網路的IO
  3. hadoop中設定壓縮:
    1. hadoop checknative檢視本地支援哪些壓縮
    2. 常見的壓縮格式:snappy,lzo,lz4
    3. 修改本地支援的壓縮方式:替換lib/native
  4. MapReduce程式可以設定壓縮的位置:
    1. 輸入
    2. map的中間結果(需要同時指定)
      1. mapreduce.map.output.compress
      2. mapreduce.map.output.compress.codec=預設是DefaultCodec
    3. reduce的輸出
      1. mapreduce.output.fileoutputformat.compress
      2. Mapreduce,output.fileoutputformat.compress.codec
  5. 怎麼設定壓縮:
    1. 叢集配置檔案內
    2. 設定conf物件當前程式有效
    3. 執行時指定引數: -Dmapreduce.output.fileoutputformat.compress=true ….