1. 程式人生 > 實用技巧 >Mapreduce之排序&規約&實戰案例

Mapreduce之排序&規約&實戰案例

MapReduce 排序和序列化

簡單介紹

①序列化 (Serialization) 是指把結構化物件轉化為位元組流
②反序列化 (Deserialization) 是序列化的逆過程. 把位元組流轉為結構化物件. 當要在程序間傳
遞物件或持久化物件的時候, 就需要序列化物件成位元組流, 反之當要將接收到或從磁碟讀取
的位元組流轉換為物件, 就要進行反序列化
③Java 的序列化 (Serializable) 是一個重量級序列化框架, 一個物件被序列化後, 會附帶很多額
外的資訊 (各種校驗資訊, header, 繼承體系等), 不便於在網路中高效傳輸. 所以, Hadoop
自己開發了一套序列化機制(Writable), 精簡高效. 不用像 Java 物件類一樣傳輸多層的父子


關係, 需要哪個屬性就傳輸哪個屬性值, 大大的減少網路傳輸的開銷
④Writable 是 Hadoop 的序列化格式, Hadoop 定義了這樣一個 Writable 介面. 一個類要支援可
序列化只需實現這個介面即可
⑤另外 Writable 有一個子介面是 WritableComparable, WritableComparable 是既可實現序列
化, 也可以對key進行比較, 我們這裡可以通過自定義 Key 實現 WritableComparable 來實現
我們的排序功能

實戰案例

資料格式如下

要求:
第一列按照字典順序進行排列
第一列相同的時候, 第二列按照升序進行排列
解決思路:
將 Map 端輸出的 <key,value> 中的 key 和 value 組合成一個新的 key (newKey), value值


不變
這裡就變成 <(key,value),value> , 在針對 newKey 排序的時候, 如果 key 相同, 就再對
value進行排序


Step 1. 自定義型別和比較器

package cn.itcast.mapreduce.sort;

import org.apache.hadoop.io.WritableComparable;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

public class SortBean implements WritableComparable<SortBean> {
    
private String word; private int num; public String getWord() { return word; } public void setWord(String word) { this.word = word; } public int getNum() { return num; } public void setNum(int num) { this.num = num; } @Override public String toString() { return word + '\t' + num ; } //實現比較器,指定排序規則 /* 規則: 第一列:按照字典順序進行排列 第二列:當第一列相同,num按照升序進行排列 */ @Override public int compareTo(SortBean o) { //先對第一列排序 int result = this.word.compareTo(o.word); //如果第一列相同,則按照第二列排序 if(result==0) { return this.num-o.num; } return result; } //實現序列化 @Override public void write(DataOutput dataOutput) throws IOException { dataOutput.writeUTF(word); dataOutput.writeInt(num); } //實現反序列化 @Override public void readFields(DataInput dataInput) throws IOException { this.word=dataInput.readUTF(); this.num=dataInput.readInt(); } }

Step 2. Mapper

package cn.itcast.mapreduce.sort;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

public class SortMapper extends Mapper<LongWritable, Text,SortBean, NullWritable> {

    /*
    將我們的K1和V1轉為K2和V2
    K1         v1
    0          a   3
    5          b   7
    -----------------
    K2                         V2
    SortBean(a 3)         Nullwritable
    SortBean(b 7)         NullWritable
     */
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        //1.將V1行文字資料拆分,並將資料封裝到SortBean物件,就可以得到K2
        String[] split = value.toString().split("\t");
        SortBean sortBean = new SortBean();
        sortBean.setWord(split[0]);
        sortBean.setNum(Integer.parseInt(split[1]));
        //2.將K2和V2寫入上下文中
        context.write(sortBean,NullWritable.get());
    }
}

Step 3. Reducer

package cn.itcast.mapreduce.sort;

import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

public class SortReducer extends Reducer<SortBean, NullWritable,SortBean,NullWritable> {

    //reduce方法將新的K2和V2轉為K3和V3
    @Override
    protected void reduce(SortBean key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {
        context.write(key,NullWritable.get());
    }
}

Step 4. Main 入口

package cn.itcast.mapreduce.sort;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class JobMain  extends Configured implements Tool {
    @Override
    public int run(String[] strings) throws Exception {
        //1.建立job物件
        Job job = Job.getInstance(super.getConf(), "mapreduce_sort");
        //job.setJarByClass(JobMain.class);
        //2.配置job任務(八個步驟)

        //第一步:設定輸入類和路徑
        job.setInputFormatClass(TextInputFormat.class);
        //TextInputFormat.addInputPath(job,new Path("hdfs://hadoop101:8020/input/sort_input"));
        TextInputFormat.addInputPath(job,new Path("file:///E:\\input\\sort_input"));

        //第二步:設定Mapper型別
        job.setMapperClass(SortMapper.class);
        job.setMapOutputKeyClass(SortBean.class);
        job.setMapOutputValueClass(NullWritable.class);
        //第三,四,五,六,排序不需要設定,只要制定好排序規則即可

        //第七步:設定Reducer類和型別
        job.setReducerClass(SortReducer.class);
        job.setOutputKeyClass(SortBean.class);
        job.setOutputValueClass(NullWritable.class);

        //第八步:設定輸出類和輸出路徑
        job.setOutputFormatClass(TextOutputFormat.class);
        //TextOutputFormat.setOutputPath(job,new Path("hdfs://hadoop101:8020/out/sort_out"));
        TextOutputFormat.setOutputPath(job,new Path("file:///E:\\out\\sort_out"));

        //3.等待任務結束
        boolean b = job.waitForCompletion(true);
        return b?0:1;
    }

    public static void main(String[] args) throws Exception {
        Configuration configuration = new Configuration();
        int run = ToolRunner.run(configuration, new JobMain(), args);
        System.exit(run);
    }

}

輸出結果:

規約Combiner

概念

每一個 map 都可能會產生大量的本地輸出,Combiner 的作用就是對 map 端的輸出先做一次
合併,以減少在 map 和 reduce 節點之間的資料傳輸量,以提高網路IO 效能,是 MapReduce
的一種優化手段之一

  • combiner 是 MR 程式中 Mapper 和 Reducer 之外的一種元件
  • combiner 元件的父類就是 Reducer
  • combiner 和 reducer 的區別在於執行的位置
  • Combiner 是在每一個 maptask 所在的節點執行
  • Reducer 是接收全域性所有 Mapper 的輸出結果
  • combiner 的意義就是對每一個 maptask 的輸出進行區域性彙總,以減小網路傳輸量

實現步驟

1. 自定義一個 combiner 繼承 Reducer,重寫 reduce 方法
2. 在 job 中設定 job.setCombinerClass(CustomCombiner.class)
combiner 能夠應用的前提是不能影響最終的業務邏輯,而且,combiner 的輸出 kv 應該跟
reducer 的輸入 kv 型別要對應起來

統計單詞的出現次數:

MyCombiner類:

package cn.itcast.mapreduce.combiner;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

public class MyCombiner extends Reducer<Text, LongWritable,Text,LongWritable> {

    @Override
    protected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException {
        //1.便利結合,將集合中的數字相加,得到V3
        long count=0;
        for (LongWritable value : values) {
            count+=value.get();
        }
        //2.將K3和V3寫入上下文中
        context.write(key,new LongWritable(count));
    }
}

無Combiner:

有Combiner:

實戰案例

案例一

統計每個手機號的上行資料包總和,下行資料包總和,上行總流量之和,下行總流量之和 分
析:以手機號碼作為key值,上行流量,下行流量,上行總流量,下行總流量四個欄位作為
value值,然後以這個key,和value作為map階段的輸出,reduce階段的輸入

Step 1: 自定義map的輸出value物件FlowBean

package cn.itcast.mapreduce.floow_count_demo1;

import org.apache.hadoop.io.Writable;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

public class FloowBean implements Writable {

    private Integer upFloow;//上行資料包數
    private Integer downFlow;
    private Integer upCountFlow;//上行流量總和
    private Integer downCountFlow;

    public FloowBean(Integer upFloow, Integer downFlow, Integer upCountFlow, Integer downCountFlow) {
        this.upFloow = upFloow;
        this.downFlow = downFlow;
        this.upCountFlow = upCountFlow;
        this.downCountFlow = downCountFlow;
    }

    public FloowBean() {

    }

    public Integer getUpFloow() {
        return upFloow;
    }

    public void setUpFloow(Integer upFloow) {
        this.upFloow = upFloow;
    }

    public Integer getDownFlow() {
        return downFlow;
    }

    public void setDownFlow(Integer downFlow) {
        this.downFlow = downFlow;
    }

    public Integer getUpCountFlow() {
        return upCountFlow;
    }

    public void setUpCountFlow(Integer upCountFlow) {
        this.upCountFlow = upCountFlow;
    }

    public Integer getDownCountFlow() {
        return downCountFlow;
    }

    public void setDownCountFlow(Integer downCountFlow) {
        this.downCountFlow = downCountFlow;
    }

    @Override
    public String toString() {
        return  upFloow +
                "\t" + downFlow +
                "\t" + upCountFlow +
                "\t" + downCountFlow;
    }
//序列化方法
    @Override
    public void write(DataOutput dataOutput) throws IOException {
        dataOutput.writeInt(upFloow);
        dataOutput.writeInt(downFlow);
        dataOutput.writeInt(upCountFlow);
        dataOutput.writeInt(downCountFlow);
    }
//反序列化方法
    @Override
    public void readFields(DataInput dataInput) throws IOException {
        this.upFloow=dataInput.readInt();
        this.downFlow=dataInput.readInt();
        this.upCountFlow=dataInput.readInt();
        this.downCountFlow=dataInput.readInt();
    }
}

Step 2: 定義FlowMapper類

package cn.itcast.mapreduce.floow_count_demo1;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

public class FloowCountMapper extends Mapper<LongWritable, Text,Text,FloowBean> {

    /*
    將K1和V1轉為K2和V2
     */
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        //1.拆分行文字資料,得到手機號-->K2
        String[] split = value.toString().split("\t");
        String phoneNumber=split[1];
        //2.建立FloowBean物件,並從行文資料中拆分流量的四個階段,並將四個流量的欄位值賦值給FlowBean物件
        FloowBean floowBean=new FloowBean();
        floowBean.setUpFloow(Integer.parseInt(split[6]));
        floowBean.setDownFlow(Integer.parseInt(split[7]));
        floowBean.setUpCountFlow(Integer.parseInt(split[8]));
        floowBean.setDownCountFlow(Integer.parseInt(split[9]));
        //3.將K2和V2寫入上下文中
        context.write(new Text(phoneNumber),floowBean);
    }
}

Step 3: 定義FlowReducer類

package cn.itcast.mapreduce.floow_count_demo1;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

public class FloowCountReducer extends Reducer<Text,FloowBean,Text,FloowBean> {

    @Override
    protected void reduce(Text key, Iterable<FloowBean> values, Context context) throws IOException, InterruptedException {
        //1.遍歷集合,並將集合中對應的四個欄位累加
         Integer upFloow=0;//上行資料包數
         Integer downFlow=0;
         Integer upCountFlow=0;//上行流量總和
         Integer downCountFlow=0;
        for (FloowBean value : values) {
            upFloow+=value.getUpFloow();
            downFlow+=value.getDownFlow();
            upCountFlow+=value.getUpCountFlow();
            downCountFlow+=value.getDownCountFlow();
        }
        //2.建立FloowBean物件,並給物件賦值
        FloowBean floowBean = new FloowBean(upFloow,downFlow,upCountFlow,downCountFlow);
        //3.將K3和V3寫入上下文中
        context.write(key,floowBean);
    }
}

Step 4: 程式main函式入口FlowMain

package cn.itcast.mapreduce.floow_count_demo1;

import cn.itcast.mapreduce.combiner.MyCombiner;
import cn.itcast.mapreduce.combiner.WordCountMapper;
import cn.itcast.mapreduce.combiner.WordCountReduce;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class JobMain extends Configured implements Tool {

    //該方法用於指定一個job任務
    @Override
    public int run(String[] strings) throws Exception {
        //1.建立一個job任務物件
        Job job = Job.getInstance(super.getConf(),"mapreduce_floowcount");
        //2.配置job任務物件(八個步驟)

        //打包jar路徑主類
        job.setJarByClass(JobMain.class);

        //第一步:指定檔案的讀取方式和讀取路徑
        job.setInputFormatClass(TextInputFormat.class);
        //TextInputFormat.addInputPath(job,new Path("hdfs://hadoop101:8020/wordcount"));
        //本地測試元資料
        TextInputFormat.addInputPath(job,new Path("file:///E:\\input\\flowcount_input"));
        //第二步:指定map階段的處理方式和資料型別
        job.setMapperClass(FloowCountMapper.class);
        //設定Map階段K2的型別
        job.setMapOutputKeyClass(Text.class);
        //設定Map階段V2的型別
        job.setMapOutputValueClass(FloowBean.class);
        //第三<分割槽>,四(排序)
        //第五步(規約)
        // 第六步預設(分組)
        //第七步:指定reduce階段的處理方式和資料型別
        job.setReducerClass(FloowCountReducer.class);
        //設定K3的型別
        job.setOutputKeyClass(Text.class);
        //設定V3的型別
        job.setOutputValueClass(FloowBean.class);

        //第八步:設定輸出型別
        job.setOutputFormatClass(TextOutputFormat.class);

        //本地測試輸出
        TextOutputFormat.setOutputPath(job,new Path("file:///E:\\out\\flowcount_out"));


        //等待任務結束
        boolean bl=job.waitForCompletion(true);

        return bl?0:1;
    }

    public static void main(String[] args) throws Exception {
        Configuration configuration = new Configuration();
        //1.啟動job任務
        int run = ToolRunner.run(configuration, new JobMain(), args);
        System.exit(run);

    }
}

輸出:

案例二

需求二: 上行流量倒序排序(遞減排序)
分析,以需求一的輸出資料作為排序的輸入資料,自定義FlowBean,以FlowBean為map輸出的
key,以手機號作為Map輸出的value,因為MapReduce程式會對Map階段輸出的key進行排序

Step 1: 定義FlowBean實現WritableComparable實現比較排序
Java 的 compareTo 方法說明:

  • compareTo 方法用於將當前物件與方法的引數進行比較。
  • 如果指定的數與引數相等返回 0。
  • 如果指定的數小於引數返回 -1。
  • 如果指定的數大於引數返回 1。
package cn.itcast.mapreduce.flow_sort_demo2;

import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

public class FlowBean implements WritableComparable<FlowBean> {

    private Integer upFloow;//上行資料包數
    private Integer downFlow;
    private Integer upCountFlow;//上行流量總和
    private Integer downCountFlow;

    public FlowBean(Integer upFloow, Integer downFlow, Integer upCountFlow, Integer downCountFlow) {
        this.upFloow = upFloow;
        this.downFlow = downFlow;
        this.upCountFlow = upCountFlow;
        this.downCountFlow = downCountFlow;
    }

    public FlowBean() {

    }

    public Integer getUpFloow() {
        return upFloow;
    }

    public void setUpFloow(Integer upFloow) {
        this.upFloow = upFloow;
    }

    public Integer getDownFlow() {
        return downFlow;
    }

    public void setDownFlow(Integer downFlow) {
        this.downFlow = downFlow;
    }

    public Integer getUpCountFlow() {
        return upCountFlow;
    }

    public void setUpCountFlow(Integer upCountFlow) {
        this.upCountFlow = upCountFlow;
    }

    public Integer getDownCountFlow() {
        return downCountFlow;
    }

    public void setDownCountFlow(Integer downCountFlow) {
        this.downCountFlow = downCountFlow;
    }

    @Override
    public String toString() {
        return  upFloow +
                "\t" + downFlow +
                "\t" + upCountFlow +
                "\t" + downCountFlow;
    }
//序列化方法
    @Override
    public void write(DataOutput dataOutput) throws IOException {
        dataOutput.writeInt(upFloow);
        dataOutput.writeInt(downFlow);
        dataOutput.writeInt(upCountFlow);
        dataOutput.writeInt(downCountFlow);
    }
//反序列化方法
    @Override
    public void readFields(DataInput dataInput) throws IOException {
        this.upFloow=dataInput.readInt();
        this.downFlow=dataInput.readInt();
        this.upCountFlow=dataInput.readInt();
        this.downCountFlow=dataInput.readInt();
    }
//指定排序規則
    @Override
    public int compareTo(FlowBean o) {
        return o.upFloow-this.upFloow;
    }
}

Step 2: 定義FlowMapper

package cn.itcast.mapreduce.flow_sort_demo2;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

public class FlowSortMapper extends Mapper<LongWritable, Text,FlowBean,Text> {

    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        String[] split = value.toString().split("\t");
        String phoneNumber=split[0];
        FlowBean flowBean = new FlowBean(Integer.parseInt(split[1]),Integer.parseInt(split[2]),Integer.parseInt(split[3]),Integer.valueOf(split[4]));
        context.write(flowBean,new Text(phoneNumber));
    }
}

Step 3: 定義FlowReducer

package cn.itcast.mapreduce.flow_sort_demo2;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

public class FlowSortReducer extends Reducer<FlowBean, Text,Text,FlowBean> {

    @Override
    protected void reduce(FlowBean key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
        for (Text value : values) {
            context.write(value,key);
        }
    }
}

Step 4: 程式main函式入口

package cn.itcast.mapreduce.flow_sort_demo2;

import cn.itcast.mapreduce.floow_count_demo1.FloowBean;
import cn.itcast.mapreduce.floow_count_demo1.FloowCountMapper;
import cn.itcast.mapreduce.floow_count_demo1.FloowCountReducer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class JobMain extends Configured implements Tool {

    //該方法用於指定一個job任務
    @Override
    public int run(String[] strings) throws Exception {
        //1.建立一個job任務物件
        Job job = Job.getInstance(super.getConf(),"mapreduce_flowsort");
        //2.配置job任務物件(八個步驟)

        //打包jar路徑主類
        job.setJarByClass(JobMain.class);

        //第一步:指定檔案的讀取方式和讀取路徑
        job.setInputFormatClass(TextInputFormat.class);
        //TextInputFormat.addInputPath(job,new Path("hdfs://hadoop101:8020/wordcount"));
        //本地測試元資料
        TextInputFormat.addInputPath(job,new Path("file:///E:\\out\\flowcount_out"));
        //第二步:指定map階段的處理方式和資料型別
        job.setMapperClass(FlowSortMapper.class);
        //設定Map階段K2的型別
        job.setMapOutputKeyClass(FlowBean.class);
        //設定Map階段V2的型別
        job.setMapOutputValueClass(Text.class);
        //第三<分割槽>,四(排序)
        //第五步(規約)
        // 第六步預設(分組)
        //第七步:指定reduce階段的處理方式和資料型別
        job.setReducerClass(FlowSortReducer.class);
        //設定K3的型別
        job.setOutputKeyClass(Text.class);
        //設定V3的型別
        job.setOutputValueClass(FlowBean.class);

        //第八步:設定輸出型別
        job.setOutputFormatClass(TextOutputFormat.class);

        //本地測試輸出
        TextOutputFormat.setOutputPath(job,new Path("file:///E:\\out\\flowsort_out"));


        //等待任務結束
        boolean bl=job.waitForCompletion(true);

        return bl?0:1;
    }

    public static void main(String[] args) throws Exception {
        Configuration configuration = new Configuration();
        //1.啟動job任務
        int run = ToolRunner.run(configuration, new JobMain(), args);
        System.exit(run);

    }
}

輸出:

案列三

在需求一的基礎上,繼續完善,將不同的手機號分到不同的資料檔案的當中去,需要自定義
分割槽來實現,這裡我們自定義來模擬分割槽,將以下數字開頭的手機號進行分開

  • 135 開頭資料到一個分割槽檔案
  • 136 開頭資料到一個分割槽檔案
  • 137 開頭資料到一個分割槽檔案
  • 其他分割槽

自定義分割槽

package cn.itcast.mapreduce.flow_count_sort_partition;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;

public class FlowCountPartition extends Partitioner<Text,FloowBean> {

    @Override
    public int getPartition(Text text, FloowBean floowBean, int i) {
        String s = text.toString();
        if(s.startsWith("135"))
            return 0;
        else if(s.startsWith("136"))
            return 1;
        else if(s.startsWith("137"))
            return 2;
        else
            return 3;
    }
}

程式main函式入口,設定排序的Partition和reducetask個數

package cn.itcast.mapreduce.flow_count_sort_partition;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class JobMain extends Configured implements Tool {

    //該方法用於指定一個job任務
    @Override
    public int run(String[] strings) throws Exception {
        //1.建立一個job任務物件
        Job job = Job.getInstance(super.getConf(),"mapreduce_flow_partition");
        //2.配置job任務物件(八個步驟)

        //打包jar路徑主類
        job.setJarByClass(JobMain.class);

        //第一步:指定檔案的讀取方式和讀取路徑
        job.setInputFormatClass(TextInputFormat.class);
        //TextInputFormat.addInputPath(job,new Path("hdfs://hadoop101:8020/wordcount"));
        //本地測試元資料
        TextInputFormat.addInputPath(job,new Path("file:///E:\\input\\flowpartition_input"));
        //第二步:指定map階段的處理方式和資料型別
        job.setMapperClass(FloowCountMapper.class);
        //設定Map階段K2的型別
        job.setMapOutputKeyClass(Text.class);
        //設定Map階段V2的型別
        job.setMapOutputValueClass(FloowBean.class);
        //第三<分割槽>,四(排序)
        job.setPartitionerClass(FlowCountPartition.class);

        //第五步(規約)
        // 第六步預設(分組)
        //第七步:指定reduce階段的處理方式和資料型別
        job.setReducerClass(FloowCountReducer.class);
        //設定K3的型別
        job.setOutputKeyClass(Text.class);
        //設定V3的型別
        job.setOutputValueClass(FloowBean.class);

        //設定reduce個數
        job.setNumReduceTasks(4);

        //第八步:設定輸出型別
        job.setOutputFormatClass(TextOutputFormat.class);

        //本地測試輸出
        TextOutputFormat.setOutputPath(job,new Path("file:///E:\\out\\flowpartition_out"));


        //等待任務結束
        boolean bl=job.waitForCompletion(true);

        return bl?0:1;
    }

    public static void main(String[] args) throws Exception {
        Configuration configuration = new Configuration();
        //1.啟動job任務
        int run = ToolRunner.run(configuration, new JobMain(), args);
        System.exit(run);

    }
}

輸出:

135分割槽

136分割槽

137分割槽

其他分割槽