1. 程式人生 > 實用技巧 >GroupingComparator分組(輔助分組)

GroupingComparator分組(輔助分組)

一、輔助排序:(GroupingComparator分組)

  在Reduce端對key進行分組。應用於:在接受的key為bean物件時,想讓一個或幾個欄位相同(全部欄位比較不相同)的key進入到同一個reduce方法時,可以採用分組排序。

二、舉例說明

 1、需求 

  (1)統計同一品牌下,賣最貴的手機型號和手機價格

  (2)希望輸出資訊(品牌名、手機型號名、價格)

    
 1 xiaomi    小米10    1999    8    2020-07-10
 2 huawei    華為P10    2999    7    2020-07-08
 3 meizu    魅族E3660    1999    10    2020-07-09
 4
xiaomi 小米9 1699 30 2020-07-09 5 xiaomi 小米8 1299 40 2020-07-11 6 xiaomi 小米10 1999 20 2020-07-12 7 xiaomi 小米9 1699 6 2020-07-13 8 meizu 魅族5300 2999 7 2020-07-14 9 meizu 魅族8 1899 8 2020-07-11 10 meizu 魅族e 1099 15 2020-07-06 11 huawei 華為P30 3999 18 2020-07-12 12
huawei 華為P20 2999 80 2020-07-01 13 huawei 華為P10 1999 60 2020-07-03 14 xiaomi 小米10 1999 8 2020-07-12 15 huawei 華為P10 2999 7 2020-07-18 16 meizu 魅族E3660 1999 40 2020-07-19 17 xiaomi 小米9 1699 30 2020-07-29 18 xiaomi 小米8 1299 41 2020-07-21 19 xiaomi 小米10 1999 70 2020-07-23 20
xiaomi 小米9 1699 6 2020-07-30 21 meizu 魅族5300 2999 7 2020-07-22 22 meizu 魅族8 1899 50 2020-07-16 23 meizu 魅族e 1099 55 2020-07-19 24 huawei 華為P30 3999 18 2020-07-25 25 huawei 華為P20 2999 80 2020-07-04 26 huawei 華為P10 1999 90 2020-07-03
phone.txt

 2、PhoneBean.java

    
package com.jh.work02;


import org.apache.hadoop.io.WritableComparable;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

public class PhoneBean implements WritableComparable<PhoneBean> {
    private String phoneName;   // 手機品牌
    private String phoneVersion;    // 手機型號
    private Long phoneMoney;    // 手機單價

    public PhoneBean() {
        super();
    }

    @Override
    public String toString() {
        return phoneName + "\t" + phoneVersion + "\t" + phoneMoney;
    }

    public String getPhoneVersion() {
        return phoneVersion;
    }

    public void setPhoneVersion(String phoneVersion) {
        this.phoneVersion = phoneVersion;
    }

    public Long getPhoneMoney() {
        return phoneMoney;
    }

    public void setPhoneMoney(Long phoneMoney) {
        this.phoneMoney = phoneMoney;
    }

    public String getPhoneName() {
        return phoneName;
    }

    public void setPhoneName(String phoneName) {
        this.phoneName = phoneName;
    }

    // 序列化
    @Override
    public void write(DataOutput out) throws IOException {
        out.writeUTF(phoneName);
        out.writeUTF(phoneVersion);
        out.writeLong(phoneMoney);
    }

    // 反序列化
    @Override
    public void readFields(DataInput in) throws IOException {
        phoneName = in.readUTF();
        phoneVersion = in.readUTF();
        phoneMoney = in.readLong();
    }

    // 排序
    @Override
    public int compareTo(PhoneBean o) {
        /**
         * 說明:
         * compareTo方法被稱為自然比較方法,利用當前物件和傳入的目標物件進行比較;
         * 若是當前物件比目標物件大,則返回1,那麼當前物件會排在目標物件的後面      
         * 若是當前物件比目標物件小,則返回-1,那麼當前物件會排在目標物件的後面
         * 若是兩個物件相等,則返回0
         */
        // 先根據手機品牌排序,相同的挨著放
        int result = this.getPhoneName().compareTo(o.getPhoneName());
        if (result == 0){
            // 手機品牌相同時,再根據手機單價倒序排序
            return o.getPhoneMoney().compareTo(this.getPhoneMoney());
        }else{
            return result;
        }
    }
}
PhoneBean.java

 3、PhoneMapper.java

    
package com.jh.work02;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

public class PhoneMapper extends Mapper<LongWritable,Text,PhoneBean,NullWritable> {
    private PhoneBean bean = new PhoneBean();

    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        // 獲取文字每行內容
        String line = value.toString();
        // 根據分隔符切割
        String[] split = line.split("\t");

        // 賦值
        bean.setPhoneName(split[0]);
        bean.setPhoneVersion(split[1]);
        bean.setPhoneMoney(Long.parseLong(split[2]));

        context.write(bean,NullWritable.get());
    }
}
PhoneMapper.java

 4、PhoneGroupCompartor.java

    
package com.jh.work02;

import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;

public class PhoneGroupCompartor extends WritableComparator {

    /*
        建立一個構造將比較物件的類傳給父類,便於反序列化,
        如果不提前宣告空物件,在GroupingComparator呼叫時會丟擲空指標異常
     */
    protected PhoneGroupCompartor() {
        super(PhoneBean.class,true);
    }

    @Override
    public int compare(WritableComparable a, WritableComparable b) {
        PhoneBean abean = (PhoneBean)a;
        PhoneBean bbean = (PhoneBean)b;

        // 根據手機品牌分組
        return abean.getPhoneName().compareTo(bbean.getPhoneName());
    }
}
PhoneGroupCompartor.java

 5、PhoneReducer.java

    
package com.jh.work02;

import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

public class PhoneReducer extends Reducer<PhoneBean,NullWritable,PhoneBean,NullWritable> {
    private PhoneBean bean = new PhoneBean();

    @Override
    protected void reduce(PhoneBean key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {
        // 輸出每組第一個,也就是最貴的的那個
        context.write(key,values.iterator().next());
    }
}
PhoneReducer.java

 6、PhoneDriverWork02.java

    
package com.jh.work02;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

public class PhoneDriverWork02 {
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        //1.獲取job物件和配置檔案物件
        Configuration configuration = new Configuration();
        Job job = Job.getInstance(configuration);

        //2.新增jar的路徑
        job.setJarByClass(PhoneDriverWork02.class);

        //3.設定mapper類和reducer類
        job.setMapperClass(PhoneMapper.class);
        job.setReducerClass(PhoneReducer.class);

        //4.設定mapper類輸出的資料型別
        job.setMapOutputKeyClass(PhoneBean.class);
        job.setMapOutputValueClass(NullWritable.class);

        //5.設定reducer類輸出的資料型別
        job.setOutputKeyClass(PhoneBean.class);
        job.setOutputValueClass(NullWritable.class);

        // 設定輔助分組的類
        job.setGroupingComparatorClass(PhoneGroupCompartor.class);

        //設定檔案的輸入出路徑
        FileInputFormat.setInputPaths(job,new Path(args[0]));
        FileOutputFormat.setOutputPath(job,new Path(args[1]));

        //提交任務
        boolean result = job.waitForCompletion(true);

        //成功返回0,失敗返回1
        System.exit(result ? 0:1);
    }
}
PhoneDriverWork02.java

 7、輸出檔案為

  

 8、如果想統計同一品牌下,前兩名價格