MapReduce shuffle過程及壓縮機制
shuffle過程
map階段處理的資料如何傳遞給reduce階段,是MapReduce框架中最關鍵的一個流程,這個流程就叫shuffle。
shuffle: ——核心機制:資料分割槽,排序,規約,分組,合併等過程。
shuffle是Mapreduce的核心,它分佈在Mapreduce的map階段和reduce階段。
一般把從Map產生輸出開始到Reduce取得資料作為輸入之前的過程稱作shuffle。
Collect階段:將MapTask的結果輸出到預設大小為100M的環形緩衝區,儲存的是key/value,Partition分割槽資訊等。
Spill階段:當記憶體中的資料量達到一定的閥值的時候,就會將資料寫入本地磁碟,在將資料寫入磁碟之前需要對資料進行一次排序的操作,如果配置了combiner,還會將有相同分割槽號和key的資料進行排序。
Merge階段
Copy階段:ReduceTask啟動Fetcher執行緒到已經完成MapTask的節點上覆制一份屬於自己的資料,這些資料預設會儲存在記憶體的緩衝區中,當記憶體的緩衝區達到一定的閥值的時候,就會將資料寫到磁碟之上。
Merge階段:在ReduceTask遠端複製資料的同時,會在後臺開啟兩個執行緒對記憶體到本地的資料檔案進行合併操作。
Sort階段:在對資料進行合併的同時,會進行排序操作,由於MapTask階段已經對資料進行了區域性的排序,ReduceTask只需保證Copy的資料的最終整體有效性即可。
Shuffle中的緩衝區大小會影響到mapreduce程式的執行效率,原則上說,緩衝區越大,磁碟io的次數越少,執行速度就越快.
shuffle階段資料的壓縮機制
在shuffle階段,資料有大量的拷貝過程。從map階段輸出的資料,都要通過網路拷貝,傳送到reduce階段,這一過程中,會涉及到大量的網路IO,因此一個好的壓縮機制,會大大減少資料的傳送量。
hadoop當中支援的壓縮演算法
檔案壓縮的好處:節約磁碟空間,加速資料在網路和磁碟上的傳輸。
使用bin/hadoop checknative
來檢視我們編譯之後的hadoop支援的各種壓縮。
編譯後的CDH版本hadoop支援的壓縮演算法:
常見的壓縮速率比較
如何開啟我們的壓縮:
方式一:在程式碼中進行設定壓縮
設定我們的map階段的壓縮
Configuration configuration = new Configuration(); configuration.set("mapreduce.map.output.compress","true"); configuration.set("mapreduce.map.output.compress.codec","org.apache.hadoop.io.compress.SnappyCodec");
設定我們的reduce階段的壓縮
configuration.set("mapreduce.output.fileoutputformat.compress","true");
configuration.set("mapreduce.output.fileoutputformat.compress.type","RECORD");
configuration.set("mapreduce.output.fileoutputformat.compress.codec","org.apache.hadoop.io.compress.SnappyCodec");
方式二:配置全域性的MapReduce壓縮
修改mapred-site.xml配置檔案,對所有的mapreduce任務進行壓縮
對map輸出資料壓縮:
<property>
<name>mapreduce.map.output.compress</name>
<value>true</value>
</property>
<property>
<name>mapreduce.map.output.compress.codec</name>
<value>org.apache.hadoop.io.compress.SnappyCodec</value>
</property>
對reduce輸出資料進行壓縮:
<property>
<name>mapreduce.output.fileoutputformat.compress</name>
<value>true</value>
</property>
<property>
<name>mapreduce.output.fileoutputformat.compress.type</name>
<value>RECORD</value>
</property>
<property>
<name>mapreduce.output.fileoutputformat.compress.codec</name>
<value>org.apache.hadoop.io.compress.SnappyCodec</value>
</property>
注意:所有節點都要修改mapred-site.xml 的配置檔案,並在完成之後重啟Hadoop叢集.
snappy壓縮
程式碼中新增配置
這裡我們通過修改程式碼的方式來實現資料的壓縮
map階段輸出壓縮配置
Configuration configuration = new Configuration();
configuration.set("mapreduce.map.output.compress","true");
configuration.set("mapreduce.map.output.compress.codec","org.apache.hadoop.io.compress.SnappyCodec");
reduce階段輸出壓縮配置
configuration.set("mapreduce.output.fileoutputformat.compress","true");
configuration.set("mapreduce.output.fileoutputformat.compress.type","RECORD");
configuration.set("mapreduce.output.fileoutputformat.compress.codec","org.apache.hadoop.io.compress.SnappyCodec");