1. 程式人生 > >一個簡單的MapReduce示例(多個MapReduce任務處理)

一個簡單的MapReduce示例(多個MapReduce任務處理)

.lib exceptio apr private util sum length reat lin

一、需求

  有一個列表,只有兩列:id、pro,記錄了id與pro的對應關系,但是在同一個id下,pro有可能是重復的。

  現在需要寫一個程序,統計一下每個id下有多少個不重復的pro。

  為了寫一個完整的示例,我使用了多job!

二、文件目錄

|- OutCount    //單Job的,本次試驗沒有使用到,這裏寫出來供參考
|- OutCount2
|- OutCountMapper
|- OutCountMapper2
|- OutCountReduce
|- OutCountReduce2

三、樣本數據(部分)

2,10000088379
9,10000088379
6,10000088379
1,10000088379
8,10000088379
0,10000088379
1,10000088379
4,10000091621
3,10000091621
2,10000091621
0,10000091621
6,10000091621
2,10000091621
0,10000091621
0,10000091621
9,10000091621
2,10000091621

四、Java代碼

1、OutCountMapper.java

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

/**
 * created by wangjunfu on 2017-05-25.
 * 4個泛型中,前兩個是指定mapper輸入數據的類型,KEYIN是輸入的key的類型,VALUEIN是輸入的value的類型
 * map 和 reduce 的數據輸入輸出都是以 key-value對的形式封裝的
 * 默認情況下,Map框架傳遞給我們的mapper的輸入數據中,key是要處理的文本中一行的起始偏移量(選用LongWritable),value是這一行的內容(VALUEIN選用Text)
 * 在wordcount中,經過mapper處理數據後,得到的是<單詞,1>這樣的結果,所以KEYOUT選用Text,VAULEOUT選用IntWritable
 
*/ public class OutCountMapper extends Mapper<LongWritable, Text, Text, Text> { // MapReduce框架每讀一行數據就調用一次map方法 public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { // 數據格式:uid skuid String oneline = value.toString().replace(‘,‘, ‘_‘).trim();
// 去重思路:Map的key具有數據去重的功能,以整個數據作為key發送出去, value為null context.write(new Text(oneline), new Text("")); /* // 這裏需要說明一下,我們現在的樣本是標準的,一行一個樣本。 // 有的情況下一行多個,那就需要進行分割。 // 對這一行的文本按特定分隔符切分 String[] words = oneline.split("\t"); for (String word : words) { // 遍歷這個單詞數組,輸出為key-value形式 key:單詞 value : 1 context.write(new Text(word), new IntWritable(1)); } */ } }

2、OutCountReduce.java

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

/**
 * created by wangjunfu on 2017-05-25.
 * 經過mapper處理後的數據會被reducer拉取過來,所以reducer的KEYIN、VALUEIN和mapper的KEYOUT、VALUEOUT一致
 * 經過reducer處理後的數據格式為<單詞,頻數>,所以KEYOUT為Text,VALUEOUT為IntWritable
 */
public class OutCountReduce extends Reducer<Text, Text, Text, Text> {
    // 當mapper框架將相同的key的數據處理完成後,reducer框架會將mapper框架輸出的數據<key,value>變成<key,values{}>。
    // 例如,在wordcount中會將mapper框架輸出的所有<hello,1>變為<hello,{1,1,1...}>,即這裏的<k2,v2s>,然後將<k2,v2s>作為reduce函數的輸入
    // 這個將在下面reduce2 中得到體現
    public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
        context.write(key, new Text(""));
    }
}

3、OutCountMapper2.java

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

/**
 * created by wangjunfu on 2017-05-27.
 * 將原始數據作為map輸出的key設置為int類型。map會自動的根據key進行排序
 */
public class OutCountMapper2 extends Mapper<LongWritable, Text, Text, IntWritable> {
    private final static IntWritable one = new IntWritable(1);

    public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        // 數據格式:uid_skuid
        String oneline = value.toString();

        // 將這條數據中的uid 發出去, value為計算one
        context.write(new Text(oneline.split("_")[0]), one);
    }
}

4、OutCountReduce2.java

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;
import java.util.Iterator;

/**
 * created by wangjunfu on 2017-05-27.
 * 按統計數排序:將values作為次序key,將map排序好的key作為value輸出
 */
public class OutCountReduce2 extends Reducer<Text, IntWritable, Text, IntWritable> {
    public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
        int sum = 0;

        // 叠代器,訪問容器中的元素,為容器而生
        Iterator<IntWritable> itr = values.iterator();
        while (itr.hasNext()) {
            sum += itr.next().get();
        }

        /*
        // 這種遍歷也可以
        // 遍歷v2的list,進行累加求和
        for (IntWritable v2 : itr) {
            sum = v2.get();
        }
        */

        // 按統計數排序:將values作為次序key,將map排序好的key作為value輸出
        //context.write(new IntWritable(sum), key);     //需要再起一個 map-reduce
        context.write(key, new IntWritable(sum));
    }
}

5、OutCount2.java

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.jobcontrol.ControlledJob;
import org.apache.hadoop.mapreduce.lib.jobcontrol.JobControl;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

/**
 * 需求:給定一個列表uid skuid,求出uid下不重復的skuid數據;然後再按統計大小排序。
 * 涉及到多job 處理。
 * created by wangjunfu on 2017-05-27.
 */
public class OutCount2 {
    public static void main(String[] args) throws Exception {
        JobConf conf = new JobConf(OutCount.class);

        //第一個job的配置
        Job job1 = new Job(conf, "Join1");
        job1.setJarByClass(OutCount.class);

        job1.setMapperClass(OutCountMapper.class);
        job1.setReducerClass(OutCountReduce.class);

        job1.setMapOutputKeyClass(Text.class);          //map階段的輸出的key
        job1.setMapOutputValueClass(Text.class); //map階段的輸出的value

        job1.setOutputKeyClass(Text.class);             //reduce階段的輸出的key
        job1.setOutputValueClass(Text.class);    //reduce階段的輸出的value

        //job-1 加入控制容器
        ControlledJob ctrljob1 = new ControlledJob(conf);
        ctrljob1.setJob(job1);

        //job-1 的輸入輸出文件路徑
        FileInputFormat.addInputPath(job1, new Path(args[0]));
        FileOutputFormat.setOutputPath(job1, new Path(args[1]));

        //第二個job的配置
        Job job2 = new Job(conf, "Join2");
        job2.setJarByClass(OutCount.class);             // 設置job所在的類在哪個jar包

        job2.setMapperClass(OutCountMapper2.class);     // 指定job所用的mappe類
        job2.setReducerClass(OutCountReduce2.class);    // 指定job所用的reducer類

        // 指定mapper輸出類型和reducer輸出類型
        // 由於在wordcount中mapper和reducer的輸出類型一致,
        // 所以使用setOutputKeyClass和setOutputValueClass方法可以同時設定mapper和reducer的輸出類型
        // 如果mapper和reducer的輸出類型不一致時,可以使用setMapOutputKeyClass和setMapOutputValueClass單獨設置mapper的輸出類型
        job2.setMapOutputKeyClass(Text.class);          //map階段的輸出的key
        job2.setMapOutputValueClass(IntWritable.class); //map階段的輸出的value

        job2.setOutputKeyClass(Text.class);             //reduce階段的輸出的key
        job2.setOutputValueClass(IntWritable.class);    //reduce階段的輸出的value

        //job-2 加入控制容器
        ControlledJob ctrljob2 = new ControlledJob(conf);
        ctrljob2.setJob(job2);

        //設置多個作業直接的依賴關系
        //job-2 的啟動,依賴於job-1作業的完成
        ctrljob2.addDependingJob(ctrljob1);

        //輸入路徑是上一個作業的輸出路徑,因此這裏填args[1],要和上面對應好
        FileInputFormat.addInputPath(job2, new Path(args[1]));

        //輸出路徑從新傳入一個參數,這裏需要註意,因為我們最後的輸出文件一定要是沒有出現過得
        //因此我們在這裏new Path(args[2])因為args[2]在上面沒有用過,只要和上面不同就可以了
        FileOutputFormat.setOutputPath(job2, new Path(args[2]));

        //主的控制容器,控制上面的總的兩個子作業
        JobControl jobCtrl = new JobControl("myOutCount");

        //添加到總的JobControl裏,進行控制
        jobCtrl.addJob(ctrljob1);
        jobCtrl.addJob(ctrljob2);

        //在線程啟動,記住一定要有這個
        Thread t = new Thread(jobCtrl);
        t.start();

        while (true) {
            if (jobCtrl.allFinished()) {
                //如果作業成功完成,就打印成功作業的信息
                System.out.println(jobCtrl.getSuccessfulJobList());
                jobCtrl.stop();
                break;
            }
        }
    }
}

6、OutCount.java

單Job的,本次試驗沒有使用到,這裏寫出來供參考

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.util.GenericOptionsParser;

/**
 * 需求:給定一個列表uid skuid,求出uid下不重復的skuid數據;然後再按統計大小排序。
 * 涉及到多job 處理。
 * created by wangjunfu on 2017-05-25.
 */
public class OutCount {
    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();       //指定作業執行規範
        String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
        if (otherArgs.length != 2) {
            System.err.println("Usage:wordcount <in> <out>");
            System.exit(2);
        }

        Job job = new Job(conf, "word count");  //指定job名稱,及運行對象
        job.setJarByClass(OutCount.class);
        job.setMapperClass(OutCountMapper.class);       //指定map函數
        job.setCombinerClass(OutCountReduce.class);     //是否需要conbiner整合
        job.setReducerClass(OutCountReduce.class);      //指定reduce函數
        job.setOutputKeyClass(Text.class);              //輸出key格式
        job.setOutputValueClass(IntWritable.class);     //輸出value格式
        org.apache.hadoop.mapreduce.lib.input.FileInputFormat.addInputPath(job, new Path(otherArgs[0]));       //處理文件路徑
        org.apache.hadoop.mapreduce.lib.output.FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));    //結果輸出路徑
        // 將job提交給集群運行
        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }
}

五、結果

11    0
11    1
7    2
10    3
10    4
9    5
10    6
7    7
13    8
9    9

一個簡單的MapReduce示例(多個MapReduce任務處理)