Shuffle機制流程原理
基礎知識:
Mapreduce確保每個reducer的輸入都是按鍵排序的。系統執行排序的過程(即將map輸出作為輸入傳給reducer)稱為shuffle。
shuffle階段是從map方法輸出資料以後開始到reduce方法輸入資料之前結束。
分割槽的數量 = ReduceTask數量 = 結果檔案的數量
- 首先是由map方法處理後的key/value對輸入到環形緩衝區。
- 當環形緩衝區寫滿之後將會對緩衝區裡面的資料進行分割槽、排序操作,然後溢寫到磁碟中(也可以先使用Combiner進行合併處理),可能環形緩衝區會進行多次溢寫。
- 將多次溢寫的資料按分割槽進行歸併排序,合併為一個大的檔案,然後將這個大檔案通過壓縮手段進行壓縮(減小磁碟耗費量,減少網路IO傳輸),最後寫入到磁碟中。
- 在Reduce端,每一個Reduce按照分割槽號將每一個map輸出的資料中的對應分割槽的資料拷貝到自己的緩衝區中(比如:reduceTask1是處理1號分割槽的資料,則它就將所有map輸出的1號緩衝區的資料拷貝到自己的緩衝區中),若緩衝區不夠則將資料溢寫到磁碟。
- 然後對每一個map來的資料進行歸併排序。
- 最後按照相同的key分組輸入到reduce方法中。
補充知識:
1、分割槽:
預設分割槽是根據key的hashCode對reduceTasks個數取模得到的。使用者沒法控制哪個key儲存到哪個分割槽。
預設分割槽:
public class HashPartitioner<K, V> extends Partitioner<K, V> {
public int getPartition(K key, V value, int numReduceTasks) {
return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks;
}
}
但是使用者可以自定義分割槽,比如可以自定義按手機號進行分割槽(136開頭的0號分割槽,137開頭的1號分割槽。。。。。),
自定義Partitioner步驟:
(1)自定義類繼承Partitioner,重寫getPartition()方法
public class ProvincePartitioner extends Partitioner<Text, FlowBean> { @Override public int getPartition(Text key, FlowBean value, int numPartitions) { // 1 獲取電話號碼的前三位 String preNum = key.toString().substring(0, 3); int partition = 4; // 2 判斷是哪個省 if ("136".equals(preNum)) { partition = 0; }else if ("137".equals(preNum)) { partition = 1; }else if ("138".equals(preNum)) { partition = 2; }else if ("139".equals(preNum)) { partition = 3; } return partition; } }
job.setPartitionerClass(CustomPartitioner.class); |
(2)在job驅動中,設定自定義partitioner:
(3)自定義partition後,要根據自定義partitioner的邏輯設定相應數量的reduce task
job.setNumReduceTasks(5); |
3)注意:
如果reduceTask的數量> getPartition的結果數,則會多產生幾個空的輸出檔案part-r-000xx;
如果1<reduceTask的數量<getPartition的結果數,則有一部分分割槽資料無處安放,會Exception;
如果reduceTask的數量=1,則不管mapTask端輸出多少個分割槽檔案,最終結果都交給這一個reduceTask,最終也就只會產生一個結果檔案 part-r-00000;
例如:假設自定義分割槽數為5,則
(1)job.setNumReduceTasks(1);會正常執行,只不過會產生一個輸出檔案
(2)job.setNumReduceTasks(2);會報錯
(3)job.setNumReduceTasks(6);大於5,程式會正常執行,會產生空檔案
2、資料壓縮:
在MapTask讀取壓縮檔案前可以進行先解壓:
資料解壓:createInputStream(InputStream in)建立一個CompressionInputStream,讀取壓縮資料。
可以在環形緩衝區寫入到磁碟的過程中吧資料進行壓縮,這樣可以減少網路IO。壓縮資料:createOutputStream(OutputStream out) 建立一個CommpressionOutputStream,將其以壓縮格式寫入底層。在MapTask的資料傳入ReduceTask之前,可以使用LZO或者Snappy壓縮方式(需要安裝)。在配置檔案(core-site.xml;mapred-site.xml)中設定mapper輸出壓縮(預設是不進行壓縮的)
也可以在reduce處理完成之後,再將資料以壓縮的格式進行寫出到檔案。
最常用的兩種壓縮方式:LZO和Snappy
1.Lzo壓縮
優點:壓縮/解壓速度也比較快,合理的壓縮率;支援split,是hadoop中最流行的壓縮格式;可以在linux系統下安裝lzop命令,使用方便。
缺點:壓縮率比gzip要低一些;hadoop本身不支援,需要安裝;在應用中對lzo格式的檔案需要做一些特殊處理(為了支援split需要建索引,還需要指定inputformat為lzo格式)。
應用場景:一個很大的文字檔案,壓縮之後還大於200M以上的可以考慮,而且單個檔案越大,lzo優點越越明顯。
2.Snappy壓縮
優點:高速壓縮速度和合理的壓縮率。
缺點:不支援split;壓縮率比gzip要低;hadoop本身不支援,需要安裝;
應用場景:當Mapreduce作業的Map輸出的資料比較大的時候,作為Map到Reduce的中間資料的壓縮格式;或者作為一個Mapreduce作業的輸出和另外一個Mapreduce作業的輸入。
3.壓縮位置選擇
壓縮可以在MapReduce作用的任意階段啟用。
示例:可以在Driver類中開啟並指定使用哪種壓縮方式
1、map端輸出開啟並設定壓縮方式:
// 開啟map端輸出壓縮
configuration.setBoolean("mapreduce.map.output.compress", true);
// 設定map端輸出壓縮方式
configuration.setClass("mapreduce.map.output.compress.codec", BZip2Codec.class, CompressionCodec.class);/2、
2、reduce端輸出開啟並設定壓縮方式:
// 設定reduce端輸出壓縮開啟
FileOutputFormat.setCompressOutput(job, true);
// 設定壓縮的方式
FileOutputFormat.setOutputCompressorClass(job, BZip2Codec.class);