Hadoop Mapreduce運行流程
Mapreduce的運算過程為兩個階段:
第一個階段的map task相互獨立,完全並行;
第二個階段的reduce task也是相互獨立,但依賴於上一階段所有map task並發實例的輸出;
這些task任務分布在多臺機器運行,它的運行管理是有一個master負責,這個master由yarn負責啟動,那麽yarn如何知道啟動多少個map task進程去計算呢?
下面概述一下Mapreduce的執行流程:
1、客戶端首先會訪問hdfs的namenode獲取待處理數據的信息(文件數及文件大小),形成一個任務分配計劃(會寫入配置文件);
2、這個任務分配計劃以及配置文件都會交給yarn,yarn根據自己所掌握的各機器資源情況,去啟動mr appmaster;
3、mr appmaster根據配置文件負責啟動map task任務進程;
4、map task去datanode分行讀取數據,交給自定義的mapper,輸出的context.write(key,value),
再交給outputcollecter輸出到本機的一個分區文件(後面有幾個reduce task就有幾個分區);
5、所有的map task執行完,mr appmaster再去啟動reduce task;
6、reduce task進程對每一組key相同的<key,value>調用一次自定義的reducer;
7、reduce task的計算結果會不斷追加寫入設置好的hdfs的路徑中;
8、整個程序需要一個driver來提交,提交的是一個描述了各種必要信息的job對象。
代碼示例:
WordCountMap.java
import java.io.IOException; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper;/** * Mapper的第一個參數:KEYIN: 默認情況下,是mr框架所讀到的一行文本的起始偏移量,Long, * 但是在hadoop中有自己的更精簡的序列化接口,所以不直接用Long,而用LongWritable * Mapper的第二個參數:VALUEIN:默認情況下,是mr框架所讀到的一行文本的內容,String,同上,用Text * Mapper第三個參數:KEYOUT:是用戶自定義邏輯處理完成之後輸出數據中的key,在此處是單詞,String,同上,用Text * Mapper第四個參數:VALUEOUT:是用戶自定義邏輯處理完成之後輸出數據中的value,在此處是單詞次數,Integer,同上,用IntWritable */ public class WordCountMap extends Mapper<LongWritable, Text, Text, IntWritable>{ @Override protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, IntWritable>.Context context) throws IOException, InterruptedException { //將maptask傳給我們的文本內容先轉換成String String line = value.toString(); //根據空格將這一行切分成單詞 String[] words = line.split(" "); //將單詞輸出為<單詞,1> for(String word:words) { //將單詞作為key,將次數1作為value,以便於後續的數據分發,可以根據單詞分發,以便於相同單詞會到相同的reduce task context.write(new Text(word), new IntWritable(1)); } } }
WordCountReduce.java
import java.io.IOException; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.WordCount.Reduce; import org.apache.hadoop.mapreduce.Reducer; /** * KEYIN, VALUEIN 對應 mapper輸出的KEYOUT,VALUEOUT類型對應 * KEYOUT, VALUEOUT 是自定義reduce邏輯處理結果的輸出數據類型 * KEYOUT是單詞 * VLAUEOUT是總次數 */ public class WordCountReduce extends Reducer<Text, IntWritable, Text, IntWritable> { /** * <hello,1><hello,1><hello,1><hello,1><hello,1><hello,1> * <banana,1><banana,1><banana,1><banana,1><banana,1><banana,1> * 入參key,是一組相同單詞kv對的key */ @Override protected void reduce(Text key, Iterable<IntWritable> values,Context context) throws IOException, InterruptedException { int count=0; for(IntWritable value:values) { count += value.get(); } context.write(key, new IntWritable(count)); } }
WordcountDriver.java
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.CombineTextInputFormat; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; /** * 相當於一個yarn集群的客戶端 * 需要在此封裝我們的mr程序的相關運行參數,指定jar包 * 最後提交給yarn */ public class WordcountDriver { public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); //是否運行為本地模式,就是看這個參數值是否為local,默認就是local /*conf.set("mapreduce.framework.name", "local");*/ //本地模式運行mr程序時,輸入輸出的數據可以在本地,也可以在hdfs上 //到底在哪裏,就看以下兩行配置你用哪行,默認就是file:/// /*conf.set("fs.defaultFS", "hdfs://192.168.1.110:9000/");*/ /*conf.set("fs.defaultFS", "file:///");*/ //運行集群模式,就是把程序提交到yarn中去運行 //要想運行為集群模式,以下3個參數要指定為集群上的值 /*conf.set("mapreduce.framework.name", "yarn"); conf.set("yarn.resourcemanager.hostname", "192.168.1.110"); conf.set("fs.defaultFS", "hdfs://192.168.1.110:9000/");*/ Job job = Job.getInstance(conf);//指定本程序的jar包所在的本地路徑 job.setJarByClass(WordcountDriver.class); //指定本業務job要使用的mapper/Reducer業務類 job.setMapperClass(WordCountMap.class); job.setReducerClass(WordCountReduce.class); //指定mapper輸出數據的kv類型 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); //指定最終輸出的數據的kv類型 job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); //指定需要使用combiner,以及用哪個類作為combiner的邏輯 job.setCombinerClass(WordCountReduce.class); //如果不設置InputFormat,它默認用的是TextInputformat.class job.setInputFormatClass(CombineTextInputFormat.class); CombineTextInputFormat.setMaxInputSplitSize(job, 4194304); CombineTextInputFormat.setMinInputSplitSize(job, 2097152); //指定job的輸入原始文件所在目錄 FileInputFormat.setInputPaths(job, new Path(args[0])); //指定job的輸出結果所在目錄 FileOutputFormat.setOutputPath(job, new Path(args[1])); //將job中配置的相關參數,以及job所用的java類所在的jar包,提交給yarn去運行 //job.submit(); //如果submit的話,提交完任務客戶端就退出了,而不知道任務在集群上的運行情況 boolean res = job.waitForCompletion(true); //所以我們這裏使用wait提交,參數true表示將集群情況返回客戶端 System.exit(res?0:1); } }
Hadoop Mapreduce運行流程