MapReduce中的分割槽方法Partitioner
阿新 • • 發佈:2018-11-20
在進行MapReduce計算時,有時候需要把最終的輸出資料分到不同的檔案中,比如按照省份劃分的話,需要把同一個省份的資料放到一個檔案中,按照性別劃分的話,需要把同一個性別的資料放到一個檔案中.我們知道最終的輸出資料是來自Reducer任務的,那麼如果要得到多個檔案,意味著有同樣數的Reducer任務在執行.
Reducer任務的資料來自於Mapper任務,也就是說Mapper任務要劃分資料,對於不同的資料分配給不同的Reducer任務執行.
Mapper任務劃分資料的過程就稱作Partition.負責實現劃分資料的類稱作Partitioner
package com.thp.bigdata.provinceflow; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import org.apache.hadoop.io.Writable; public class FlowBean implements Writable { private long upFlow; // 上行流量 private long downFlow; // 下行流量 private long sumFlow; // 總流量 // 反序列化時,需要反射呼叫空參建構函式,所以要顯式定義一個 public FlowBean() {} public FlowBean(long upFlow, long downFlow) { this.upFlow = upFlow; this.downFlow = downFlow; this.sumFlow = upFlow + downFlow; } public long getUpFlow() { return upFlow; } public void setUpFlow(long upFlow) { this.upFlow = upFlow; } public long getDownFlow() { return downFlow; } public void setDownFlow(long downFlow) { this.downFlow = downFlow; } public long getSumFlow() { return sumFlow; } public void setSumFlow(long sumFlow) { this.sumFlow = sumFlow; } /** * 序列化方法 */ @Override public void write(DataOutput out) throws IOException { out.writeLong(upFlow); out.writeLong(downFlow); out.writeLong(sumFlow); } /** * 反序列化方法: * 注意 : 反序列化的順序跟序列化的順序完全一致 */ @Override public void readFields(DataInput in) throws IOException { upFlow = in.readLong(); downFlow = in.readLong(); sumFlow = in.readLong(); } // 輸出列印的時候呼叫的是toString() 方法 @Override public String toString() { return upFlow + "\t" + downFlow + "\t" + sumFlow; } }
package com.thp.bigdata.provinceflow; import java.util.HashMap; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Partitioner; public class ProvincePartitioner extends Partitioner<Text, FlowBean>{ public static HashMap<String, Integer> provinceDict = new HashMap<String, Integer>(); static { provinceDict.put("136", 0); provinceDict.put("137", 1); provinceDict.put("138", 2); provinceDict.put("139", 3); } @Override public int getPartition(Text key, FlowBean value, int numPartitions) { String prefix = key.toString().substring(0, 3); Integer pronviceId = provinceDict.get(prefix); return pronviceId == null ? 4 : pronviceId; } }
package com.thp.bigdata.provinceflow; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class FlowCount { static class FlowCountMapper extends Mapper<LongWritable, Text, Text, FlowBean> { @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { // 將一行內容轉換成string String line = value.toString(); // 切分欄位 String[] fields = line.split("\t"); // 取出手機號 String phoneNumber = fields[1]; // 取出上行流量和下行流量 long upFlow = Long.parseLong(fields[fields.length - 3]); long downFlow = Long.parseLong(fields[fields.length - 2]); context.write(new Text(phoneNumber), new FlowBean(upFlow, downFlow)); } } static class FlowCountReducer extends Reducer<Text, FlowBean, Text, FlowBean> { @Override protected void reduce(Text key, Iterable<FlowBean> values, Context context) throws IOException, InterruptedException { long sum_upFlow = 0; long sum_downFlow = 0; // 遍歷所有的bean,將其中的上行流量,下行流量分別累加 for(FlowBean bean : values) { sum_upFlow += bean.getUpFlow(); sum_downFlow += bean.getDownFlow(); } FlowBean resultBean = new FlowBean(sum_upFlow, sum_downFlow); context.write(key, resultBean); } } public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { Configuration conf = new Configuration(); Job job = Job.getInstance(conf); //指定本程式的jar包所在的本地路徑 job.setJarByClass(FlowCount.class); //指定本業務job要使用的mapper/Reducer業務類 job.setMapperClass(FlowCountMapper.class); job.setReducerClass(FlowCountReducer.class); // 指定我們自定義的資料分割槽器 job.setPartitionerClass(ProvincePartitioner.class); // 同時指定相應"分割槽"數量的reducetask job.setNumReduceTasks(5); //指定mapper輸出資料的kv型別 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(FlowBean.class); //指定最終輸出的資料的kv型別 job.setOutputKeyClass(Text.class); job.setOutputValueClass(FlowBean.class); //指定job的輸入原始檔案所在目錄 FileInputFormat.setInputPaths(job, new Path(args[0])); //指定job的輸出結果所在目錄 FileOutputFormat.setOutputPath(job, new Path(args[1])); //將job中配置的相關引數,以及job所用的java類所在的jar包,提交給yarn去執行 boolean res = job.waitForCompletion(true); System.exit(res ? 0 : 1); } }
程式碼地址 :
https://gitee.com/tanghongping/hadoopMapReduce/tree/master/src/com/thp/bigdata/provinceflow