1. 程式人生 > >MapReduce之自定義分割槽器Partitioner

MapReduce之自定義分割槽器Partitioner

@[toc] ## 問題引出 >要求將統計結果按照條件輸出到不同檔案中(分割槽)。 比如:將統計結果按照**手機歸屬地不同省份**輸出到不同檔案中(分割槽) ## 預設Partitioner分割槽 ```java public class HashPartitioner extends Partitioner{ public int getPartition(K key,V value, int numReduceTasks){ return (key.hashCode() & Integer.MAX VALUE) & numReduceTasks; } } ``` - 預設分割槽是根據**key**的**hashCode**對**ReduceTasks**個數取模得到的。 - 使用者沒法控制哪個key儲存到哪個分割槽。 ## 自定義Partitioner步驟 1. 自定義類繼承`Partitioner`,重寫`getPartition()`方法 ```java public class CustomPartitioner extends Partitioner{ @Override public int getPartition(Text key,FlowBean value,int numPartitions){ //控制分割槽程式碼邏輯 …… return partition; } } ``` 2. 在Job驅動類中,設定自定義`Partitioner` ```java job.setPartitionerClass(CustomPartitioner.class) ``` 3. 自定義Partition後,要根據自定義Partitioner的邏輯設定相應數量的`ReduceTask` ```java job.setNumReduceTask(5);//假設需要分5個區 ``` ## Partition分割槽案例實操 將統計結果按照手機==歸屬地==不同省份輸出到不同檔案中(分割槽) **輸入資料:** ![在這裡插入圖片描述](https://img-blog.csdnimg.cn/20200721172302429.png?x-oss-process=image/watermark,type_ZmFuZ3poZW5naGVpdGk,shadow_10,text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L0RURlRf,size_16,color_FFFFFF,t_70) **期望輸出資料:** 手機號==136、137、138、139==開頭都分別放到一個獨立的4個檔案中,==其他開頭==的放到一個檔案中。所以總共分為5個檔案,也就是五個區。 >相比於之前的自定義flowbean,這次自定義分割槽,只需要多編寫一個分割槽器,以及在job驅動類中設定分割槽器,mapper和reducer類不改變 **MyPartitioner.java** ```java /* * KEY, VALUE: Mapper輸出的Key-value型別 */ public class MyPartitioner extends Partitioner{ // 計算分割槽 numPartitions為總的分割槽數,reduceTask的數量 // 分割槽號必須為int型的值,且必須符合 0<= partitionNum < numPartitions @Override public int getPartition(Text key, FlowBean value, int numPartitions) { String suffix = key.toString().substring(0, 3);//前開後閉,取手機號前三位數 int partitionNum=0;//分割槽編號 switch (suffix) { case "136": partitionNum=numPartitions-1;//由於分割槽編號不能大於分割槽總數,所以用這種方法比較好 break; case "137": partitionNum=numPartitions-2; break; case "138": partitionNum=numPartitions-3; break; case "139": partitionNum=numPartitions-4; break; default: break; } return partitionNum; } } ``` **FlowBeanDriver.java** ```java public class FlowBeanDriver { public static void main(String[] args) throws Exception { Path inputPath=new Path("e:/mrinput/flowbean"); Path outputPath=new Path("e:/mroutput/partitionflowbean"); //作為整個Job的配置 Configuration conf = new Configuration(); //保證輸出目錄不存在 FileSystem fs=FileSystem.get(conf); if (fs.exists(outputPath)) { fs.delete(outputPath, true); } // ①建立Job Job job = Job.getInstance(conf); // ②設定Job // 設定Job執行的Mapper,Reducer型別,Mapper,Reducer輸出的key-value型別 job.setMapperClass(FlowBeanMapper.class); job.setReducerClass(FlowBeanReducer.class); // Job需要根據Mapper和Reducer輸出的Key-value型別準備序列化器,通過序列化器對輸出的key-value進行序列化和反序列化 // 如果Mapper和Reducer輸出的Key-value型別一致,直接設定Job最終的輸出型別 job.setOutputKeyClass(Text.class); job.setOutputValueClass(FlowBean.class); // 設定輸入目錄和輸出目錄 FileInputFormat.setInputPaths(job, inputPath); FileOutputFormat.setOutputPath(job, outputPath); // 設定ReduceTask的數量為5 job.setNumReduceTasks(5); // 設定使用自定義的分割槽器 job.setPartitionerClass(MyPartitioner.class); // ③執行Job job.waitForCompletion(true); } } ``` **FlowBeanMapper.java** ```java /* * 1. 統計手機號(String)的上行(long,int),下行(long,int),總流量(long,int) * * 手機號為key,Bean{上行(long,int),下行(long,int),總流量(long,int)}為value * * * * */ public class FlowBeanMapper extends Mapper{ private Text out_key=new Text(); private FlowBean out_value=new FlowBean(); // (0,1 13736230513 192.196.100.1 www.atguigu.com 2481 24681 200) @Override protected void map(LongWritable key, Text value, Mapper.Context context) throws IOException, InterruptedException { String[] words = value.toString().split("\t"); //封裝手機號 out_key.set(words[1]); // 封裝上行 out_value.setUpFlow(Long.parseLong(words[words.length-3])); // 封裝下行 out_value.setDownFlow(Long.parseLong(words[words.length-2])); context.write(out_key, out_value); } } ``` **FlowBeanReducer.java** ```java public class FlowBeanReducer extends Reducer{ private FlowBean out_value=new FlowBean(); @Override protected void reduce(Text key, Iterable values, Reducer.Context context) throws IOException, InterruptedException { long sumUpFlow=0; long sumDownFlow=0; for (FlowBean flowBean : values) { sumUpFlow+=flowBean.getUpFlow(); sumDownFlow+=flowBean.getDownFlow(); } out_value.setUpFlow(sumUpFlow); out_value.setDownFlow(sumDownFlow); out_value.setSumFlow(sumDownFlow+sumUpFlow); context.write(key, out_value); } } ``` **FlowBean.java** ```java public class FlowBean implements Writable{ private long upFlow; private long downFlow; private long sumFlow; public FlowBean() { } public long getUpFlow() { return upFlow; } public void setUpFlow(long upFlow) { this.upFlow = upFlow; } public long getDownFlow() { return downFlow; } public void setDownFlow(long downFlow) { this.downFlow = downFlow; } public long getSumFlow() { return sumFlow; } public void setSumFlow(long sumFlow) { this.sumFlow = sumFlow; } // 序列化 在寫出屬性時,如果為引用資料型別,屬性不能為null @Override public void write(DataOutput out) throws IOException { out.writeLong(upFlow); out.writeLong(downFlow); out.writeLong(sumFlow); } //反序列化 序列化和反序列化的順序要一致 @Override public void readFields(DataInput in) throws IOException { upFlow=in.readLong(); downFlow=in.readLong(); sumFlow=in.readLong(); } @Override public String toString() { return upFlow + "\t" + downFlow + "\t" + sumFlow; } } ``` **輸出結果:** 總共五個檔案 ![在這裡插入圖片描述](https://img-blog.csdnimg.cn/20200721173120509.png) 一號區: ![在這裡插入圖片描述](https://img-blog.csdnimg.cn/20200721173159784.png) 二號區: ![在這裡插入圖片描述](https://img-blog.csdnimg.cn/20200721173216243.png) 三號區: ![在這裡插入圖片描述](https://img-blog.csdnimg.cn/20200721173300267.png) 四號區: ![在這裡插入圖片描述](https://img-blog.csdnimg.cn/20200721173233989.png) 其他號碼為第五號區: ![在這裡插入圖片描述](https://img-blog.csdnimg.cn/2020072117332855.png?x-oss-process=image/watermark,type_ZmFuZ3poZW5naGVpdGk,shadow_10,text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L0RURlRf,size_16,color_FFFFFF,t_70) ## 分割槽總結 - 如果`ReduceTask的數量 > getPartition的結果數`,則會多產生幾個空的輸出檔案part-r-000xx - 如果`Reduceask的數量 < getPartition的結果數`,則有一部分分割槽資料無處安放,會Exception - 如果`ReduceTask的數量 = 1`,則不管MapTask端輸出多少個分割槽檔案,最終結果都交給這一個ReduceTask,最終也就只會產生一個結果檔案partr-00000 以剛才的案例分析: 例如:**假設自定義分割槽數為5**,則 - job.setlNlurmReduce Task(1);會正常執行,只不過會產生一個輸出檔案 - job.setlNlunReduce Task(2),會報錯 - job.setNumReduceTasks(6);大於5,程式會正常執行,會產生