1. 程式人生 > >Mapreduce的分區—Partitioner

Mapreduce的分區—Partitioner

nds sub etc long on() hash rri 業務 exception

1. 需求
將流量匯總統計結果按照手機歸屬地不同省份輸出到不同文件中。
2. 分析
Mapreduce中會將map輸出的kv對,按照相同key分組,然後分發給不同的reducetask。
默認的分發規則為:根據key的hashcode%reducetask數來分發
所以:如果要按照我們自己的需求進行分組,則需要改寫數據分發(分組)組件Partitioner,自定義一個CustomPartitioner繼承抽象類:Partitioner,然後在job對象中,設置自定義partitioner: job.setPartitionerClass(CustomPartitioner.class
3. 實現

public class ProvincePartitioner extends Partitioner<Text, FlowBean>

    public static HashMap<String, Integer>  provinceMap = new HashMap<String, Integer>();

    static{

        provinceMap.put("134", 0);

        provinceMap.put("135", 1);

        provinceMap.put("136", 2);

        provinceMap.put("137", 3);

        provinceMap.put("138", 4);

    }

    @Override

    public int getPartition(Text key, FlowBean value, int numPartitions) {

        Integer code = provinceMap.get(key.toString().substring(0, 3));

        if (code != null) {

            return code;

        }

        return 5;

    }

}
public class FlowSumProvince {

 public static class FlowSumProvinceMapper extends Mapper<LongWritable, Text, Text, FlowBean>{

     Text k = new Text();

     FlowBean  v = new FlowBean();

     @Override

     protected void map(LongWritable key, Text value,Context context)

             throws IOException, InterruptedException {

             //拿取一行文本轉為String

             String line = value.toString();

             //按照分隔符\t進行分割

             String[] fileds = line.split("\t");

             //獲取用戶手機號

             String phoneNum = fileds[1];

             long upFlow = Long.parseLong(fileds[fileds.length-3]);

             long downFlow = Long.parseLong(fileds[fileds.length-2]);

             k.set(phoneNum);

             v.set(upFlow, downFlow);

             context.write(k,v);

        }

    }

    public static class FlowSumProvinceReducer extends Reducer<Text, FlowBean, Text, FlowBean>{

        FlowBean  v  = new FlowBean();

        @Override

        protected void reduce(Text key, Iterable<FlowBean> flowBeans,Context context) throws IOException, InterruptedException {

            long upFlowCount = 0;

            long downFlowCount = 0;

            for (FlowBean flowBean : flowBeans) {

                upFlowCount += flowBean.getUpFlow();

                downFlowCount += flowBean.getDownFlow();

            }

            v.set(upFlowCount, downFlowCount);

            context.write(key, v);

    }

    public static void main(String[] args) throws Exception{

        Configuration conf = new Configuration();

        conf.set("mapreduce.framework.name", "local");

        Job job = Job.getInstance(conf);

        //指定我這個 job 所在的 jar包位置

        job.setJarByClass(FlowSumProvince.class);

        //指定我們使用的Mapper是那個類  reducer是哪個類

        job.setMapperClass(FlowSumProvinceMapper.class);

        job.setReducerClass(FlowSumProvinceReducer.class);

//        job.setCombinerClass(FlowSumProvinceReducer.class);

        // 設置我們的業務邏輯 Mapper 類的輸出 key 和 value 的數據類型

        job.setMapOutputKeyClass(Text.class);

        job.setMapOutputValueClass(FlowBean.class);

        // 設置我們的業務邏輯 Reducer 類的輸出 key 和 value 的數據類型

        job.setOutputKeyClass(Text.class);

        job.setOutputValueClass(FlowBean.class);

        //這裏設置運行reduceTask的個數

        //getPartition 返回的分區個數 = NumReduceTasks   正常執行

        //getPartition 返回的分區個數 > NumReduceTasks   報錯:Illegal partition

        //getPartition 返回的分區個數 < NumReduceTasks   可以執行 ,多出空白文件

        job.setNumReduceTasks(10);

        //這裏指定使用我們自定義的分區組件

        job.setPartitionerClass(ProvincePartitioner.class);

        FileInputFormat.setInputPaths(job, new Path("D:\\flowsum\\input"));

        // 指定處理完成之後的結果所保存的位置

        FileOutputFormat.setOutputPath(job, new Path("D:\\flowsum\\outputProvince"));

        boolean res = job.waitForCompletion(true);

        System.exit(res ? 0 : 1);

    }

 }

}

Mapreduce的分區—Partitioner