Mapreduce的分區—Partitioner
阿新 • • 發佈:2018-09-12
nds sub etc long on() hash rri 業務 exception 1. 需求
將流量匯總統計結果按照手機歸屬地不同省份輸出到不同文件中。
2. 分析
Mapreduce中會將map輸出的kv對,按照相同key分組,然後分發給不同的reducetask。
默認的分發規則為:根據key的hashcode%reducetask數來分發
所以:如果要按照我們自己的需求進行分組,則需要改寫數據分發(分組)組件Partitioner,自定義一個CustomPartitioner繼承抽象類:Partitioner,然後在job對象中,設置自定義partitioner: job.setPartitionerClass(CustomPartitioner.class
3. 實現
將流量匯總統計結果按照手機歸屬地不同省份輸出到不同文件中。
2. 分析
Mapreduce中會將map輸出的kv對,按照相同key分組,然後分發給不同的reducetask。
默認的分發規則為:根據key的hashcode%reducetask數來分發
所以:如果要按照我們自己的需求進行分組,則需要改寫數據分發(分組)組件Partitioner,自定義一個CustomPartitioner繼承抽象類:Partitioner,然後在job對象中,設置自定義partitioner: job.setPartitionerClass(CustomPartitioner.class
3. 實現
public class ProvincePartitioner extends Partitioner<Text, FlowBean> public static HashMap<String, Integer> provinceMap = new HashMap<String, Integer>(); static{ provinceMap.put("134", 0); provinceMap.put("135", 1); provinceMap.put("136", 2); provinceMap.put("137", 3); provinceMap.put("138", 4); } @Override public int getPartition(Text key, FlowBean value, int numPartitions) { Integer code = provinceMap.get(key.toString().substring(0, 3)); if (code != null) { return code; } return 5; } }
public class FlowSumProvince { public static class FlowSumProvinceMapper extends Mapper<LongWritable, Text, Text, FlowBean>{ Text k = new Text(); FlowBean v = new FlowBean(); @Override protected void map(LongWritable key, Text value,Context context) throws IOException, InterruptedException { //拿取一行文本轉為String String line = value.toString(); //按照分隔符\t進行分割 String[] fileds = line.split("\t"); //獲取用戶手機號 String phoneNum = fileds[1]; long upFlow = Long.parseLong(fileds[fileds.length-3]); long downFlow = Long.parseLong(fileds[fileds.length-2]); k.set(phoneNum); v.set(upFlow, downFlow); context.write(k,v); } } public static class FlowSumProvinceReducer extends Reducer<Text, FlowBean, Text, FlowBean>{ FlowBean v = new FlowBean(); @Override protected void reduce(Text key, Iterable<FlowBean> flowBeans,Context context) throws IOException, InterruptedException { long upFlowCount = 0; long downFlowCount = 0; for (FlowBean flowBean : flowBeans) { upFlowCount += flowBean.getUpFlow(); downFlowCount += flowBean.getDownFlow(); } v.set(upFlowCount, downFlowCount); context.write(key, v); } public static void main(String[] args) throws Exception{ Configuration conf = new Configuration(); conf.set("mapreduce.framework.name", "local"); Job job = Job.getInstance(conf); //指定我這個 job 所在的 jar包位置 job.setJarByClass(FlowSumProvince.class); //指定我們使用的Mapper是那個類 reducer是哪個類 job.setMapperClass(FlowSumProvinceMapper.class); job.setReducerClass(FlowSumProvinceReducer.class); // job.setCombinerClass(FlowSumProvinceReducer.class); // 設置我們的業務邏輯 Mapper 類的輸出 key 和 value 的數據類型 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(FlowBean.class); // 設置我們的業務邏輯 Reducer 類的輸出 key 和 value 的數據類型 job.setOutputKeyClass(Text.class); job.setOutputValueClass(FlowBean.class); //這裏設置運行reduceTask的個數 //getPartition 返回的分區個數 = NumReduceTasks 正常執行 //getPartition 返回的分區個數 > NumReduceTasks 報錯:Illegal partition //getPartition 返回的分區個數 < NumReduceTasks 可以執行 ,多出空白文件 job.setNumReduceTasks(10); //這裏指定使用我們自定義的分區組件 job.setPartitionerClass(ProvincePartitioner.class); FileInputFormat.setInputPaths(job, new Path("D:\\flowsum\\input")); // 指定處理完成之後的結果所保存的位置 FileOutputFormat.setOutputPath(job, new Path("D:\\flowsum\\outputProvince")); boolean res = job.waitForCompletion(true); System.exit(res ? 0 : 1); } } }
Mapreduce的分區—Partitioner