GroupingComparator分組(輔助分組)
阿新 • • 發佈:2020-07-26
一、輔助排序:(GroupingComparator分組)
在Reduce端對key進行分組。應用於:在接受的key為bean物件時,想讓一個或幾個欄位相同(全部欄位比較不相同)的key進入到同一個reduce方法時,可以採用分組排序。
二、舉例說明
1、需求
(1)統計同一品牌下,賣最貴的手機型號和手機價格
(2)希望輸出資訊(品牌名、手機型號名、價格)
1 xiaomi 小米10 1999 8 2020-07-10 2 huawei 華為P10 2999 7 2020-07-08 3 meizu 魅族E3660 1999 10 2020-07-09 4phone.txtxiaomi 小米9 1699 30 2020-07-09 5 xiaomi 小米8 1299 40 2020-07-11 6 xiaomi 小米10 1999 20 2020-07-12 7 xiaomi 小米9 1699 6 2020-07-13 8 meizu 魅族5300 2999 7 2020-07-14 9 meizu 魅族8 1899 8 2020-07-11 10 meizu 魅族e 1099 15 2020-07-06 11 huawei 華為P30 3999 18 2020-07-12 12huawei 華為P20 2999 80 2020-07-01 13 huawei 華為P10 1999 60 2020-07-03 14 xiaomi 小米10 1999 8 2020-07-12 15 huawei 華為P10 2999 7 2020-07-18 16 meizu 魅族E3660 1999 40 2020-07-19 17 xiaomi 小米9 1699 30 2020-07-29 18 xiaomi 小米8 1299 41 2020-07-21 19 xiaomi 小米10 1999 70 2020-07-23 20xiaomi 小米9 1699 6 2020-07-30 21 meizu 魅族5300 2999 7 2020-07-22 22 meizu 魅族8 1899 50 2020-07-16 23 meizu 魅族e 1099 55 2020-07-19 24 huawei 華為P30 3999 18 2020-07-25 25 huawei 華為P20 2999 80 2020-07-04 26 huawei 華為P10 1999 90 2020-07-03
2、PhoneBean.java
package com.jh.work02; import org.apache.hadoop.io.WritableComparable; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; public class PhoneBean implements WritableComparable<PhoneBean> { private String phoneName; // 手機品牌 private String phoneVersion; // 手機型號 private Long phoneMoney; // 手機單價 public PhoneBean() { super(); } @Override public String toString() { return phoneName + "\t" + phoneVersion + "\t" + phoneMoney; } public String getPhoneVersion() { return phoneVersion; } public void setPhoneVersion(String phoneVersion) { this.phoneVersion = phoneVersion; } public Long getPhoneMoney() { return phoneMoney; } public void setPhoneMoney(Long phoneMoney) { this.phoneMoney = phoneMoney; } public String getPhoneName() { return phoneName; } public void setPhoneName(String phoneName) { this.phoneName = phoneName; } // 序列化 @Override public void write(DataOutput out) throws IOException { out.writeUTF(phoneName); out.writeUTF(phoneVersion); out.writeLong(phoneMoney); } // 反序列化 @Override public void readFields(DataInput in) throws IOException { phoneName = in.readUTF(); phoneVersion = in.readUTF(); phoneMoney = in.readLong(); } // 排序 @Override public int compareTo(PhoneBean o) { /** * 說明: * compareTo方法被稱為自然比較方法,利用當前物件和傳入的目標物件進行比較; * 若是當前物件比目標物件大,則返回1,那麼當前物件會排在目標物件的後面 * 若是當前物件比目標物件小,則返回-1,那麼當前物件會排在目標物件的後面 * 若是兩個物件相等,則返回0 */ // 先根據手機品牌排序,相同的挨著放 int result = this.getPhoneName().compareTo(o.getPhoneName()); if (result == 0){ // 手機品牌相同時,再根據手機單價倒序排序 return o.getPhoneMoney().compareTo(this.getPhoneMoney()); }else{ return result; } } }PhoneBean.java
3、PhoneMapper.java
package com.jh.work02; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import java.io.IOException; public class PhoneMapper extends Mapper<LongWritable,Text,PhoneBean,NullWritable> { private PhoneBean bean = new PhoneBean(); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { // 獲取文字每行內容 String line = value.toString(); // 根據分隔符切割 String[] split = line.split("\t"); // 賦值 bean.setPhoneName(split[0]); bean.setPhoneVersion(split[1]); bean.setPhoneMoney(Long.parseLong(split[2])); context.write(bean,NullWritable.get()); } }PhoneMapper.java
4、PhoneGroupCompartor.java
package com.jh.work02; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.io.WritableComparator; public class PhoneGroupCompartor extends WritableComparator { /* 建立一個構造將比較物件的類傳給父類,便於反序列化, 如果不提前宣告空物件,在GroupingComparator呼叫時會丟擲空指標異常 */ protected PhoneGroupCompartor() { super(PhoneBean.class,true); } @Override public int compare(WritableComparable a, WritableComparable b) { PhoneBean abean = (PhoneBean)a; PhoneBean bbean = (PhoneBean)b; // 根據手機品牌分組 return abean.getPhoneName().compareTo(bbean.getPhoneName()); } }PhoneGroupCompartor.java
5、PhoneReducer.java
package com.jh.work02; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; public class PhoneReducer extends Reducer<PhoneBean,NullWritable,PhoneBean,NullWritable> { private PhoneBean bean = new PhoneBean(); @Override protected void reduce(PhoneBean key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException { // 輸出每組第一個,也就是最貴的的那個 context.write(key,values.iterator().next()); } }PhoneReducer.java
6、PhoneDriverWork02.java
package com.jh.work02; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import java.io.IOException; public class PhoneDriverWork02 { public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { //1.獲取job物件和配置檔案物件 Configuration configuration = new Configuration(); Job job = Job.getInstance(configuration); //2.新增jar的路徑 job.setJarByClass(PhoneDriverWork02.class); //3.設定mapper類和reducer類 job.setMapperClass(PhoneMapper.class); job.setReducerClass(PhoneReducer.class); //4.設定mapper類輸出的資料型別 job.setMapOutputKeyClass(PhoneBean.class); job.setMapOutputValueClass(NullWritable.class); //5.設定reducer類輸出的資料型別 job.setOutputKeyClass(PhoneBean.class); job.setOutputValueClass(NullWritable.class); // 設定輔助分組的類 job.setGroupingComparatorClass(PhoneGroupCompartor.class); //設定檔案的輸入出路徑 FileInputFormat.setInputPaths(job,new Path(args[0])); FileOutputFormat.setOutputPath(job,new Path(args[1])); //提交任務 boolean result = job.waitForCompletion(true); //成功返回0,失敗返回1 System.exit(result ? 0:1); } }PhoneDriverWork02.java
7、輸出檔案為
8、如果想統計同一品牌下,前兩名價格