MapReduce實現分組求TopN
本文以訂單案例為例,演示如何進行分組,以及求取每組的前兩條資料。
一:案例需求
有如下訂單資料
訂單id |
商品id |
成交金額 |
Order_0000001 |
Pdt_01 |
222.8 |
Order_0000001 |
Pdt_05 |
25.8 |
Order_0000002 |
Pdt_03 |
522.8 |
Order_0000002 |
Pdt_04 |
122.4 |
Order_0000002 |
Pdt_05 |
722.4 |
Order_0000003 |
Pdt_01 |
222.8 |
現在需要求出相同訂單中成交金額最大的兩筆交易
二:分析實現過程
將訂單表資料封裝成一個OrderBean,實現WritableComparable介面,對同一個訂單id的資料進行價格排序。將OrderBean作為k2,價格作為v2,且對資料進行分割槽,將相同訂單id的資料放在同一區,所以這裡需要自定義實現分割槽類Partitioner 。在reduce階段,會將同一個分割槽的資料給同一個reduceTask去處理,該reduceTask從map階段的資料啟動執行緒copy到本地後,通過設定自定義實現分組類也就是繼承WritableComparator 的類,實現該類的compare方法對同一組的資料或者說同一個分割槽的資料進行價格比較,通過OrderBean的compareTo方法進行排序。之後再對相同的訂單號的key進行合併,得到最上面的一個訂單號作為k2,v2是相同訂單號的多個數據的價格集合,因為經過了compareTo方法的排序,所以該集合中的價格是有序的。這樣在自定義實現reduce中就能獲取到前兩個最大的價格值,然後作為k3,v3進行輸出。
驅動類中需要指定分組類:
job.setGroupingComparatorClass(MyGroupComparator.class);
設定分割槽:
job.setPartitionerClass(GroupPartition.class);
設定reduceTask數量:
job.setNumReduceTasks(2);
三:自定義Map
import org.apache.hadoop.io.DoubleWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import java.io.IOException; public class GroupMapper extends Mapper<LongWritable,Text,OrderBean,DoubleWritable> { @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String[] split = value.toString().split("\t"); OrderBean orderBean = new OrderBean(); orderBean.setOrderId(split[0]); orderBean.setPrice(Double.valueOf(split[2])); DoubleWritable doubleWritable = new DoubleWritable(Double.valueOf(split[2])); context.write(orderBean,doubleWritable); } }
四:自定義reduce,且獲取每組訂單前兩組資料
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
public class GroupReducer extends Reducer<OrderBean,DoubleWritable,OrderBean,DoubleWritable> {
@Override
protected void reduce(OrderBean key, Iterable<DoubleWritable> values, Context context) throws IOException, InterruptedException {
//context.write(key,NullWritable.get());
int i = 0;
for (DoubleWritable value : values) {
i++;
if(i <= 2){
context.write(key,value);
}else{
break;
}
}
}
}
五: 自定義實現的分割槽類
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.mapreduce.Partitioner;
public class GroupPartition extends Partitioner<OrderBean,DoubleWritable> {
/**
* 將相同訂單id的資料放在同一個分割槽中,讓同一個reduceTask去處理
* @param orderBean
* @param doubleWritable
* @param numPartitions
* @return
*/
@Override
public int getPartition(OrderBean orderBean, DoubleWritable doubleWritable, int numPartitions) {
return (orderBean.getOrderId().hashCode() & Integer.MAX_VALUE) % numPartitions;
}
}
六:自定義實現的分組類
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
public class MyGroupComparator extends WritableComparator {
public MyGroupComparator() {
super(OrderBean.class,true);
}
/**
* 將相同訂單id的資料作為一組進行比較,因為OrderBean重寫了compareTo方法,所以會進行價格比較排序
* @param a
* @param b
* @return
*/
@Override
public int compare(WritableComparable a, WritableComparable b) {
OrderBean first = (OrderBean) a;
OrderBean second = (OrderBean) b;
return first.getOrderId().compareTo(second.getOrderId());
}
}
七:OrderBean封裝類
import org.apache.hadoop.io.WritableComparable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
public class OrderBean implements WritableComparable<OrderBean> {
private String orderId;
private Double price;
/**
* 按照價格進行排序
* @param o
* @return
*/
@Override
public int compareTo(OrderBean o) {
//需要先比較我們的訂單id,如果訂單id相同的,我們再按照金額進行排序
//如果訂單id不相同,沒有可比性
int result = this.orderId.compareTo(o.orderId);
if(result ==0){
//如果訂單id相同,繼續比較價格,按照價格進行排序,
//如果訂單id不相同沒有可比性
result = this.price.compareTo(o.price);
return -result;
}
return result;
}
@Override
public void write(DataOutput out) throws IOException {
out.writeUTF(orderId);
out.writeDouble(price);
}
@Override
public void readFields(DataInput in) throws IOException {
this.orderId= in.readUTF();
this.price = in.readDouble();
}
set/get/toString/...
}
八:驅動程式類
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class GroupMain extends Configured implements Tool {
@Override
public int run(String[] args) throws Exception {
Job job = Job.getInstance(super.getConf(), "group");
job.setInputFormatClass(TextInputFormat.class);
TextInputFormat.addInputPath(job,new Path("file:///F:\\input"));
job.setMapperClass(GroupMapper.class);
job.setMapOutputKeyClass(OrderBean.class);
job.setMapOutputValueClass(DoubleWritable.class);
job.setPartitionerClass(GroupPartition.class);
//設定我們自定義的分組類
job.setGroupingComparatorClass(MyGroupComparator.class);
job.setReducerClass(GroupReducer.class);
job.setOutputKeyClass(OrderBean.class);
job.setOutputValueClass(DoubleWritable.class);
job.setOutputFormatClass(TextOutputFormat.class);
TextOutputFormat.setOutputPath(job,new Path("file:///F:\\output_topN"));
boolean b = job.waitForCompletion(true);
return b?0:1;
}
public static void main(String[] args) throws Exception {
int run = ToolRunner.run(new Configuration(), new GroupMain(), args);
System.exit(run);
}
}
測試結果: