MapReduce簡單介紹及入門程式
1、MapReduce 入門
1.1、什麼是 MapReduce
hadoop 的四大元件:
HDFS:分散式儲存系統
MapReduce:分散式計算系統
YARN:hadoop 的資源排程系統
Common:以上三大元件的底層支撐元件,主要提供基礎工具包和 RPC 框架等
MapReduce 是一個分散式運算程式的程式設計框架,是使用者開發“基於 Hadoop 的資料分析應用”的核心框架
1.2、為什麼需要 MapReduce
- 海量資料在單機上處理因為硬體資源限制,無法勝任
- 而一旦將單機版程式擴充套件到叢集來分散式執行,將極大增加程式的複雜度和開發難度
- 引入 MapReduce 框架後,開發人員可以將絕大部分工作集中在業務邏輯的開發上,而將分散式計算中的複雜性交由框架來處理
Hadoop 當中的 MapReduce 就是這樣的一個分散式程式運算框架,它把大量分散式程式都會
涉及的到的內容都封裝進了,讓使用者只用專注自己的業務邏輯程式碼的開發。它對應以上問題
的整體結構如下:
- MRAppMaster:MapReduce Application Master,分配任務,協調任務的執行
MapTask:階段併發任,負責 mapper 階段的任務處理 YARNChild
ReduceTask:階段彙總任務,負責 reducer 階段的任務處理 YARNChild
1.3、MapReduce 程式執行j簡單wordcount演示
package Test; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; // 資料樣式為 hello hi nice to meet you public class MyMapReduceWordCount { public static class MyMap extends Mapper<LongWritable, Text, Text, IntWritable>{ @Override protected void map(LongWritable key, Text value,Context context) throws IOException, InterruptedException { String[] fields = value.toString().split(" "); for(String s: fields){ context.write(new Text(s), new IntWritable(1)); } } } public static class MyReduce extends Reducer<Text, IntWritable, Text, IntWritable>{ @Override protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int count=0; for(IntWritable i:values){ count+=i.get(); } context.write(key, new IntWritable(count)); } } public static void main(String[] args) { Configuration conf=new Configuration(); System.setProperty("HADOOP_USER_NAME", "qyl"); conf.set("fs.defaultFS", "hdfs://qyl01:9000"); try { Job job=Job.getInstance(conf); job.setJarByClass(MyMapReduceWordCount.class); job.setMapperClass(MyMap.class); job.setReducerClass(MyReduce.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); Path inPath =new Path("/wordcount"); FileInputFormat.addInputPath(job, inPath); Path outpath=new Path("/wordcount/wordcount_result"); if(outpath.getFileSystem(conf).exists(outpath)){ outpath.getFileSystem(conf).delete(outpath, true); } FileOutputFormat.setOutputPath(job, outpath); job.waitForCompletion(true); } catch (Exception e) { // TODO Auto-generated catch block e.printStackTrace(); } } }
2、MapReduce 程式的核心執行
2.1、概述
一個完整的 MapReduce 程式在分散式執行時有兩類例項程序:
1、MRAppMaster:負責整個程式的過程排程及狀態協調
2、Yarnchild:負責 map 階段的整個資料處理流程
3、Yarnchild:負責 reduce 階段的整個資料處理流程以上兩個階段 MapTask 和 ReduceTask 的程序都是 YarnChild,並不是說這 MapTask 和ReduceTask 就跑在同一個 YarnChild 進行裡
2.2、MapReduce 程式的執行流程
1、一個 mr 程式啟動的時候,最先啟動的是 MRAppMaster,MRAppMaster 啟動後根據本次
job 的描述資訊,計算出需要的 maptask 例項數量,然後向叢集申請機器啟動相應數量的
maptask 程序
2、 maptask 程序啟動之後,根據給定的資料切片(哪個檔案的哪個偏移量範圍)範圍進行數
據處理,主體流程為:
- A、利用客戶指定的 InputForm來獲取 RecordReader 讀取資料,形成輸入 KV 對
- B、將輸入 KV 對傳遞給客戶定義的 map()方法,做邏輯運算,並將 map()方法輸出的 KV 對收 集到快取
C、將快取中的 KV 對按照 K分割槽排序後不斷溢寫到磁碟檔案
3、 MRAppMaster 監控到所有 maptask 程序任務完成之後(真實情況是,某些 maptask 程序處理完成後,就會開始啟動 reducetask 去已完成的 maptask 處 fetch 資料),會根據客指定的引數啟動相應數量的 reducetask 程序,並告知 reducetask 程序要處理的資料範圍(資料分割槽)
4、Reducetask 程序啟動之後,根據 MRAppMaster 告知的待處理資料所在位置,從若干臺maptask 執行所在機器上獲取到若干個 maptask 輸出結果檔案,並在本地進行重新歸併序,然後按照相同 key 的 KV 為一個組,呼叫客戶定義的 reduce()方法進行邏輯運算,並收集運算輸出的結果 KV,然後呼叫客戶指定的 OutputFormat 將結果資料輸出到外部儲存
2.3、MapTask 並行度決定機制
maptask 的並行度決定 map 階段的任務處理併發度,進而影響到整個 job 的處理速度那麼,mapTask 並行例項是否越多越好呢?其並行度又是如何決定呢?一個 job 的 map 階段並行度由客戶端在提交 job 時決定,客戶端對 map 階段並行度的規劃的基本邏輯為:
將待處理資料執行邏輯切片(即按照一個特定切片大小,將待處理資料劃分成邏輯上的多
個 split),然後每一個 split 分配一個 mapTask 並行例項處理
2.4、切片機制
FileInputFormat 中預設的切片機制
1、簡單地按照檔案的內容長度進行切片
2、切片大小,預設等於 block 大小
3、切片時不考慮資料集整體,而是逐個針對每一個檔案單獨切片
比如待處理資料有兩個檔案:
File1.txt 200M
File2.txt 100M
經過 getSplits()方法處理之後,形成的切片資訊是:
File1.txt-split1 0-128M
File1.txt-split2
2.5、ReduceTask 並行度決定機制
reducetask 的並行度同樣影響整個 job 的執行併發度和執行效率,但與 maptask 的併發數由切片數決定不同,Reducetask 數量的決定是可以直接手動設定:job.setNumReduceTasks(4);預設值是 1,手動設定為 4,表示執行 4 個 reduceTask,設定為 0,表示不執行 reduceTask 任務,也就是沒有 reducer 階段,只有 mapper 階段
如果資料分佈不均勻,就有可能在 reduce 階段產生資料傾斜
注意:reducetask 數量並不是任意設定,還要考慮業務邏輯需求,有些情況下,需要計算全域性彙總結果,就只能有 1 個 reducetask。
儘量不要執行太多的 reducetask。對大多數 job 來說,最好 rduce 的個數最多和叢集中的reduce 持平,或者比叢集的 reduce slots 小。這個對於小叢集而言,尤其重要。