1. 程式人生 > >Shuffle機制流程原理

Shuffle機制流程原理

基礎知識:

Mapreduce確保每個reducer的輸入都是按鍵排序的。系統執行排序的過程(即將map輸出作為輸入傳給reducer)稱為shuffle。

shuffle階段是從map方法輸出資料以後開始到reduce方法輸入資料之前結束。

分割槽的數量 =  ReduceTask數量  = 結果檔案的數量

  1. 首先是由map方法處理後的key/value對輸入到環形緩衝區。
  2. 當環形緩衝區寫滿之後將會對緩衝區裡面的資料進行分割槽、排序操作,然後溢寫到磁碟中(也可以先使用Combiner進行合併處理),可能環形緩衝區會進行多次溢寫。
  3. 將多次溢寫的資料按分割槽進行歸併排序,合併為一個大的檔案,然後將這個大檔案通過壓縮手段進行壓縮(減小磁碟耗費量,減少網路IO傳輸),最後寫入到磁碟中。
  4. 在Reduce端,每一個Reduce按照分割槽號將每一個map輸出的資料中的對應分割槽的資料拷貝到自己的緩衝區中(比如:reduceTask1是處理1號分割槽的資料,則它就將所有map輸出的1號緩衝區的資料拷貝到自己的緩衝區中),若緩衝區不夠則將資料溢寫到磁碟。
  5. 然後對每一個map來的資料進行歸併排序。
  6. 最後按照相同的key分組輸入到reduce方法中。

補充知識:

1、分割槽:

預設分割槽是根據key的hashCode對reduceTasks個數取模得到的。使用者沒法控制哪個key儲存到哪個分割槽。

預設分割槽:

public class HashPartitioner<K, V> extends Partitioner<K, V> {

  public int getPartition(K key, V value, int numReduceTasks) {

    return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks;

  }

}

但是使用者可以自定義分割槽,比如可以自定義按手機號進行分割槽(136開頭的0號分割槽,137開頭的1號分割槽。。。。。),

自定義Partitioner步驟:

(1)自定義類繼承Partitioner,重寫getPartition()方法

public class ProvincePartitioner extends Partitioner<Text, FlowBean> {

@Override
public int getPartition(Text key, FlowBean value, int numPartitions) {

// 1 獲取電話號碼的前三位
String preNum = key.toString().substring(0, 3);

int partition = 4;

// 2 判斷是哪個省
if ("136".equals(preNum)) {
partition = 0;
}else if ("137".equals(preNum)) {
partition = 1;
}else if ("138".equals(preNum)) {
partition = 2;
}else if ("139".equals(preNum)) {
partition = 3;
}

return partition;
}
}

job.setPartitionerClass(CustomPartitioner.class);

(2)在job驅動中,設定自定義partitioner:

(3)自定義partition後,要根據自定義partitioner的邏輯設定相應數量的reduce task

job.setNumReduceTasks(5);

3)注意:

如果reduceTask的數量> getPartition的結果數,則會多產生幾個空的輸出檔案part-r-000xx;

如果1<reduceTask的數量<getPartition的結果數,則有一部分分割槽資料無處安放,會Exception;

如果reduceTask的數量=1,則不管mapTask端輸出多少個分割槽檔案,最終結果都交給這一個reduceTask,最終也就只會產生一個結果檔案 part-r-00000;

例如假設自定義分割槽數為5,

(1)job.setNumReduceTasks(1);會正常執行,只不過會產生一個輸出檔案

(2)job.setNumReduceTasks(2);會報錯

(3)job.setNumReduceTasks(6);大於5,程式會正常執行,會產生空檔案

2、資料壓縮:

在MapTask讀取壓縮檔案前可以進行先解壓:

資料解壓:createInputStream(InputStream in)建立一個CompressionInputStream,讀取壓縮資料。

可以在環形緩衝區寫入到磁碟的過程中吧資料進行壓縮,這樣可以減少網路IO。壓縮資料:createOutputStream(OutputStream out) 建立一個CommpressionOutputStream,將其以壓縮格式寫入底層。在MapTask的資料傳入ReduceTask之前,可以使用LZO或者Snappy壓縮方式(需要安裝)。在配置檔案(core-site.xml;mapred-site.xml)中設定mapper輸出壓縮(預設是不進行壓縮的)

也可以在reduce處理完成之後,再將資料以壓縮的格式進行寫出到檔案。

最常用的兩種壓縮方式:LZO和Snappy

1.Lzo壓縮

優點:壓縮/解壓速度也比較快,合理的壓縮率;支援split,是hadoop中最流行的壓縮格式;可以在linux系統下安裝lzop命令,使用方便。

缺點:壓縮率比gzip要低一些;hadoop本身不支援,需要安裝;在應用中對lzo格式的檔案需要做一些特殊處理(為了支援split需要建索引,還需要指定inputformat為lzo格式)。

應用場景:一個很大的文字檔案,壓縮之後還大於200M以上的可以考慮,而且單個檔案越大,lzo優點越越明顯。

2.Snappy壓縮

優點:高速壓縮速度和合理的壓縮率。

缺點:不支援split;壓縮率比gzip要低;hadoop本身不支援,需要安裝;

應用場景:Mapreduce作業的Map輸出的資料比較大的時候,作為Map到Reduce的中間資料的壓縮格式;或者作為一個Mapreduce作業的輸出和另外一個Mapreduce作業的輸入。

3.壓縮位置選擇

壓縮可以在MapReduce作用的任意階段啟用。

示例:可以在Driver類中開啟並指定使用哪種壓縮方式

1、map端輸出開啟並設定壓縮方式:

// 開啟map端輸出壓縮

configuration.setBoolean("mapreduce.map.output.compress", true);

// 設定map端輸出壓縮方式

configuration.setClass("mapreduce.map.output.compress.codec", BZip2Codec.class, CompressionCodec.class);/2、

2、reduce端輸出開啟並設定壓縮方式:

// 設定reduce端輸出壓縮開啟

FileOutputFormat.setCompressOutput(job, true);

// 設定壓縮的方式

 FileOutputFormat.setOutputCompressorClass(job, BZip2Codec.class);