1. 程式人生 > >Hadoop Mapreduce運行流程

Hadoop Mapreduce運行流程

rgs sub frame 退出 16px extend cte ont 提交

Mapreduce的運算過程為兩個階段:

  第一個階段的map task相互獨立,完全並行;

  第二個階段的reduce task也是相互獨立,但依賴於上一階段所有map task並發實例的輸出;

這些task任務分布在多臺機器運行,它的運行管理是有一個master負責,這個master由yarn負責啟動,那麽yarn如何知道啟動多少個map task進程去計算呢?

下面概述一下Mapreduce的執行流程:

1、客戶端首先會訪問hdfs的namenode獲取待處理數據的信息(文件數及文件大小),形成一個任務分配計劃(會寫入配置文件);

2、這個任務分配計劃以及配置文件都會交給yarn,yarn根據自己所掌握的各機器資源情況,去啟動mr appmaster;

3、mr appmaster根據配置文件負責啟動map task任務進程;

4、map task去datanode分行讀取數據,交給自定義的mapper,輸出的context.write(key,value),

  再交給outputcollecter輸出到本機的一個分區文件(後面有幾個reduce task就有幾個分區);

5、所有的map task執行完,mr appmaster再去啟動reduce task;

6、reduce task進程對每一組key相同的<key,value>調用一次自定義的reducer;

7、reduce task的計算結果會不斷追加寫入設置好的hdfs的路徑中;

8、整個程序需要一個driver來提交,提交的是一個描述了各種必要信息的job對象。

代碼示例:

WordCountMap.java

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

/** * Mapper的第一個參數:KEYIN: 默認情況下,是mr框架所讀到的一行文本的起始偏移量,Long, * 但是在hadoop中有自己的更精簡的序列化接口,所以不直接用Long,而用LongWritable * Mapper的第二個參數:VALUEIN:默認情況下,是mr框架所讀到的一行文本的內容,String,同上,用Text * Mapper第三個參數:KEYOUT:是用戶自定義邏輯處理完成之後輸出數據中的key,在此處是單詞,String,同上,用Text * Mapper第四個參數:VALUEOUT:是用戶自定義邏輯處理完成之後輸出數據中的value,在此處是單詞次數,Integer,同上,用IntWritable */ public class WordCountMap extends Mapper<LongWritable, Text, Text, IntWritable>{ @Override protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, IntWritable>.Context context) throws IOException, InterruptedException { //將maptask傳給我們的文本內容先轉換成String String line = value.toString(); //根據空格將這一行切分成單詞 String[] words = line.split(" "); //將單詞輸出為<單詞,1> for(String word:words) { //將單詞作為key,將次數1作為value,以便於後續的數據分發,可以根據單詞分發,以便於相同單詞會到相同的reduce task context.write(new Text(word), new IntWritable(1)); } } }

 

WordCountReduce.java

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.WordCount.Reduce;
import org.apache.hadoop.mapreduce.Reducer;

/**
 * KEYIN, VALUEIN 對應  mapper輸出的KEYOUT,VALUEOUT類型對應
 * KEYOUT, VALUEOUT 是自定義reduce邏輯處理結果的輸出數據類型
 * KEYOUT是單詞
 * VLAUEOUT是總次數
 */

public class WordCountReduce extends Reducer<Text, IntWritable, Text, IntWritable> {

    /**
     * <hello,1><hello,1><hello,1><hello,1><hello,1><hello,1>
     * <banana,1><banana,1><banana,1><banana,1><banana,1><banana,1>
     * 入參key,是一組相同單詞kv對的key
     */
    @Override
    protected void reduce(Text key, Iterable<IntWritable> values,Context context) throws IOException, InterruptedException {
        int count=0;
        
        for(IntWritable value:values) {
            count += value.get();
        }
        
        context.write(key, new IntWritable(count));
    }
}

WordcountDriver.java

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.CombineTextInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

/**
 * 相當於一個yarn集群的客戶端
 * 需要在此封裝我們的mr程序的相關運行參數,指定jar包
 * 最後提交給yarn
 */
public class WordcountDriver {
    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        //是否運行為本地模式,就是看這個參數值是否為local,默認就是local
        /*conf.set("mapreduce.framework.name", "local");*/
        
        //本地模式運行mr程序時,輸入輸出的數據可以在本地,也可以在hdfs上
        //到底在哪裏,就看以下兩行配置你用哪行,默認就是file:///
        /*conf.set("fs.defaultFS", "hdfs://192.168.1.110:9000/");*/
        /*conf.set("fs.defaultFS", "file:///");*/
                
        //運行集群模式,就是把程序提交到yarn中去運行
        //要想運行為集群模式,以下3個參數要指定為集群上的值
        /*conf.set("mapreduce.framework.name", "yarn");
        conf.set("yarn.resourcemanager.hostname", "192.168.1.110");
        conf.set("fs.defaultFS", "hdfs://192.168.1.110:9000/");*/
        
        Job job = Job.getInstance(conf);//指定本程序的jar包所在的本地路徑
        job.setJarByClass(WordcountDriver.class);
        
        //指定本業務job要使用的mapper/Reducer業務類
        job.setMapperClass(WordCountMap.class);
        job.setReducerClass(WordCountReduce.class);
        
        //指定mapper輸出數據的kv類型
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);
        
        //指定最終輸出的數據的kv類型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
        
        //指定需要使用combiner,以及用哪個類作為combiner的邏輯
        job.setCombinerClass(WordCountReduce.class);
        
        //如果不設置InputFormat,它默認用的是TextInputformat.class
        job.setInputFormatClass(CombineTextInputFormat.class);
        CombineTextInputFormat.setMaxInputSplitSize(job, 4194304);
        CombineTextInputFormat.setMinInputSplitSize(job, 2097152);
        
        //指定job的輸入原始文件所在目錄
        FileInputFormat.setInputPaths(job, new Path(args[0]));
        //指定job的輸出結果所在目錄
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        //將job中配置的相關參數,以及job所用的java類所在的jar包,提交給yarn去運行
        //job.submit();  //如果submit的話,提交完任務客戶端就退出了,而不知道任務在集群上的運行情況
        boolean res = job.waitForCompletion(true);  //所以我們這裏使用wait提交,參數true表示將集群情況返回客戶端
        System.exit(res?0:1);
    }
}

Hadoop Mapreduce運行流程