MapReduce中shuffle過程
阿新 • • 發佈:2018-04-29
mapr 提前 bin run prope 內存 pat appdata 設置
Map負責過濾分發,reduce歸並整理,從map輸出到reduce輸入就是shuffle過程。
實現的功能
分區
決定當前key交給哪個reduce處理
默認:按照key的hash值對reduce的個數取余進行分區
分組
將相同key的value合並
排序
按照key對每一個keyvalue進行排序,字典排序
過程
map端shuffle
spill階段:溢寫
每一個map task處理的結果會進入環形緩沖區(內存100M)
分區
對每一條key進行分區(標上交給哪個reduce)
hadoop 1 reduce0 hive 1 reduce0 spark 1 reduce1 hadoop 1 reduce0 hbase 1 reduce1
排序
按照key排序,將相同分區的數據進行分區內排序
hadoop 1 reduce0 hadoop 1 reduce0 hive 1 reduce0 hbase 1 reduce1 spark 1 reduce1
溢寫
當整個緩沖區達到閾值80%,開始進行溢寫
將當前分區排序後的數據寫入磁盤變成一個文件file1
最終生成多個spill小文件
可以在mapred-site.xml中設置內存的大小和溢寫的閾值
在mapred-site.xml中設置內存的大小 ? <property> ? <name>mapreduce.task.io.sort.mb</name> ? <value>100</value> ? </property> ? 在mapred-site.xml中設置內存溢寫的閾值 ? <property> ? <name>mapreduce.task.io.sort.spill.percent</name> ? <value>0.8</value> ? </property>
merge:合並
將spill生成的多個小文件進行合並
排序:將相同分區的數據進行分區內排序,實現comparator比較器進行比較。最終形成一個文件。
file1 hadoop 1 reduce0 hadoop 1 reduce0 hive 1 reduce0 hbase 1 reduce1 spark 1 reduce1 ? file2 hadoop 1 reduce0 hadoop 1 reduce0 hive 1 reduce0 hbase 1 reduce1 spark 1 reduce1 ? end_file: hadoop 1 reduce0 hadoop 1 reduce0 hadoop 1 reduce0 hadoop 1 reduce0 hive 1 reduce0 hive 1 reduce0 hbase 1 reduce1 hbase 1 reduce1 spark 1 reduce1 spark 1 reduce1
map task 結束,通知app master,app master通知reduce拉取數據
reduce端shuffle
map task1 hadoop 1 reduce0 hadoop 1 reduce0 hadoop 1 reduce0 hadoop 1 reduce0 hive 1 reduce0 hive 1 reduce0 hbase 1 reduce1 hbase 1 reduce1 spark 1 reduce1 spark 1 reduce1 map task2 hadoop 1 reduce0 hadoop 1 reduce0 hadoop 1 reduce0 hadoop 1 reduce0 hive 1 reduce0 hive 1 reduce0 hbase 1 reduce1 hbase 1 reduce1 spark 1 reduce1 spark 1 reduce1
reduce啟動多個線程通過http到每臺機器上拉取屬於自己分區的數據
reduce0: hadoop 1 reduce0 hadoop 1 reduce0 hadoop 1 reduce0 hadoop 1 reduce0 hadoop 1 reduce0 hadoop 1 reduce0 hadoop 1 reduce0 hadoop 1 reduce0 hive 1 reduce0 hive 1 reduce0 hive 1 reduce0 hive 1 reduce0
merge:合並,將每個map task的結果中屬於自己的分區數據進行合並
排序:對整體屬於我分區的數據進行排序
分組:對相同key的value進行合並,使用comparable完成比較。
hadoop,list<1,1,1,1,1,1,1,1> hive,list<1,1,1,1>
優化
combine
在map階段提前進行一次合並。一般等同於提前執行reduce
job.setCombinerClass(WCReduce.class);
compress
壓縮中間結果集,減少磁盤IO以及網絡IO
壓縮配置方式
1.default:所有hadoop中默認的配置項 2.site:用於自定義配置文件,如果修改以後必須重啟生效 3.conf對象配置每個程序的自定義配置 4.運行時通過參數實現用戶自定義配置 bin/yarn jar xx.jar -Dmapreduce.map.output.compress=true -Dmapreduce.map.output.compress.codec=org.apache.hadoop.io.compress.Lz4Codec main_class input_path ouput_path
查看本地庫支持哪些壓縮
bin/hadoop checknative
通過conf配置對象配置壓縮
public static void main(String[] args) { Configuration configuration = new Configuration(); //配置map中間結果集壓縮 configuration.set("mapreduce.map.output.compress","true"); configuration.set("mapreduce.map.output.compress.codec", "org.apache.hadoop.io.compress.Lz4Codec"); //配置reduce結果集壓縮 configuration.set("mapreduce.output.fileoutputformat.compress","true"); configuration.set("mapreduce.output.fileoutputformat.compress.codec", "org.apache.hadoop.io.compress.Lz4Codec"); try { int status = ToolRunner.run(configuration, new MRDriver(), args); System.exit(status); } catch (Exception e) { e.printStackTrace(); } }
MapReduce中shuffle過程