1. 程式人生 > >MapReduce實現分組求TopN

MapReduce實現分組求TopN

本文以訂單案例為例,演示如何進行分組,以及求取每組的前兩條資料。

一:案例需求

有如下訂單資料

訂單id

商品id

成交金額

Order_0000001

Pdt_01

222.8

Order_0000001

Pdt_05

25.8

Order_0000002

Pdt_03

522.8

Order_0000002

Pdt_04

122.4

Order_0000002

Pdt_05

722.4

Order_0000003

Pdt_01

222.8

 現在需要求出相同訂單中成交金額最大的兩筆交易

二:分析實現過程

將訂單表資料封裝成一個OrderBean,實現WritableComparable介面,對同一個訂單id的資料進行價格排序。將OrderBean作為k2,價格作為v2,且對資料進行分割槽,將相同訂單id的資料放在同一區,所以這裡需要自定義實現分割槽類Partitioner 。在reduce階段,會將同一個分割槽的資料給同一個reduceTask去處理,該reduceTask從map階段的資料啟動執行緒copy到本地後,通過設定自定義實現分組類也就是繼承WritableComparator 的類,實現該類的compare方法對同一組的資料或者說同一個分割槽的資料進行價格比較,通過OrderBean的compareTo方法進行排序。之後再對相同的訂單號的key進行合併,得到最上面的一個訂單號作為k2,v2是相同訂單號的多個數據的價格集合,因為經過了compareTo方法的排序,所以該集合中的價格是有序的。這樣在自定義實現reduce中就能獲取到前兩個最大的價格值,然後作為k3,v3進行輸出。

驅動類中需要指定分組類:

job.setGroupingComparatorClass(MyGroupComparator.class);

設定分割槽:

job.setPartitionerClass(GroupPartition.class);

設定reduceTask數量:

job.setNumReduceTasks(2);

三:自定義Map

import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

public class GroupMapper extends Mapper<LongWritable,Text,OrderBean,DoubleWritable> {

    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        String[] split = value.toString().split("\t");
        OrderBean orderBean = new OrderBean();
        orderBean.setOrderId(split[0]);
        orderBean.setPrice(Double.valueOf(split[2]));
        DoubleWritable doubleWritable = new DoubleWritable(Double.valueOf(split[2]));
        context.write(orderBean,doubleWritable);
    }
}

四:自定義reduce,且獲取每組訂單前兩組資料


import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

public class GroupReducer extends Reducer<OrderBean,DoubleWritable,OrderBean,DoubleWritable> {
    @Override
    protected void reduce(OrderBean key, Iterable<DoubleWritable> values, Context context) throws IOException, InterruptedException {
        //context.write(key,NullWritable.get());
        int i = 0;
        for (DoubleWritable value : values) {
            i++;
            if(i <= 2){
                context.write(key,value);
            }else{
                break;
            }

        }
    }
}

五: 自定義實現的分割槽類

import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.mapreduce.Partitioner;

public class GroupPartition extends Partitioner<OrderBean,DoubleWritable> {
    /**
     * 將相同訂單id的資料放在同一個分割槽中,讓同一個reduceTask去處理
     * @param orderBean
     * @param doubleWritable
     * @param numPartitions
     * @return
     */
    @Override
    public int getPartition(OrderBean orderBean, DoubleWritable doubleWritable, int numPartitions) {
        return (orderBean.getOrderId().hashCode() & Integer.MAX_VALUE) % numPartitions;
    }
}

六:自定義實現的分組類


import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;

public class MyGroupComparator extends WritableComparator {


    public MyGroupComparator() {
        super(OrderBean.class,true);
    }

    /**
     * 將相同訂單id的資料作為一組進行比較,因為OrderBean重寫了compareTo方法,所以會進行價格比較排序
     * @param a
     * @param b
     * @return
     */
    @Override
    public int compare(WritableComparable a, WritableComparable b) {
        OrderBean first = (OrderBean) a;
        OrderBean second = (OrderBean) b;
        return first.getOrderId().compareTo(second.getOrderId());
    }
}

七:OrderBean封裝類


import org.apache.hadoop.io.WritableComparable;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

public class OrderBean implements WritableComparable<OrderBean> {
    private String orderId;
    private Double price;
    /**
     * 按照價格進行排序
     * @param o
     * @return
     */
    @Override
    public int compareTo(OrderBean o) {
        //需要先比較我們的訂單id,如果訂單id相同的,我們再按照金額進行排序
        //如果訂單id不相同,沒有可比性
        int result = this.orderId.compareTo(o.orderId);
        if(result ==0){
            //如果訂單id相同,繼續比較價格,按照價格進行排序,
            //如果訂單id不相同沒有可比性
            result = this.price.compareTo(o.price);
            return -result;
        }
        return result;

    }

    @Override
    public void write(DataOutput out) throws IOException {
        out.writeUTF(orderId);
        out.writeDouble(price);

    }

    @Override
    public void readFields(DataInput in) throws IOException {
        this.orderId= in.readUTF();
        this.price = in.readDouble();

    }
set/get/toString/...
}

八:驅動程式類


import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class GroupMain extends Configured implements Tool {
    @Override
    public int run(String[] args) throws Exception {
        Job job = Job.getInstance(super.getConf(), "group");

        job.setInputFormatClass(TextInputFormat.class);
        TextInputFormat.addInputPath(job,new Path("file:///F:\\input"));

        job.setMapperClass(GroupMapper.class);
        job.setMapOutputKeyClass(OrderBean.class);
        job.setMapOutputValueClass(DoubleWritable.class);

        job.setPartitionerClass(GroupPartition.class);

        //設定我們自定義的分組類
        job.setGroupingComparatorClass(MyGroupComparator.class);
        job.setReducerClass(GroupReducer.class);
        job.setOutputKeyClass(OrderBean.class);
        job.setOutputValueClass(DoubleWritable.class);
        job.setOutputFormatClass(TextOutputFormat.class);
        TextOutputFormat.setOutputPath(job,new Path("file:///F:\\output_topN"));
        boolean b = job.waitForCompletion(true);
        return b?0:1;
    }

    public static void main(String[] args) throws Exception {
        int run = ToolRunner.run(new Configuration(), new GroupMain(), args);
        System.exit(run);
    }
}

測試結果: