GroupingComparator分組(輔助排序)的作用以及GroupingComparator分組案例實操
問題分析:
partioner是在MapTask階段將資料寫入環形緩衝區中進行的分割槽操作,其目的是為了劃分出幾個結果檔案(ReduceTask,但是partioner必須小於ReduceTask個數),而是什麼決定將一組資料傳送給一次Reduce類中的reduce方法中呢?換句話說,Reduce類中的reduce方法中key一樣,values有多個,是什麼情況下的key是一樣的,能不能自定義。其實這就是 GroupingComparator分組(輔助排序)的作用。
GroupingComparator分組(輔助排序)
對Reduce階段的資料根據某一個或幾個欄位進行分組。
分組排序步驟:
(1)自定義類繼承WritableComparator
(2)重寫compare()方法
@Override
public int compare(WritableComparable a, WritableComparable b) {
// 比較的業務邏輯
return result;
}
(3)建立一個構造將比較物件的類傳給父類
protected OrderGroupingComparator() { super(OrderBean.class, true); }
GroupingComparator分組案例實操
1.需求
有如下訂單資料
表4-2 訂單資料
訂單id |
商品id |
成交金額 |
0000001 |
Pdt_01 |
222.8 |
Pdt_02 |
33.8 |
|
0000002 |
Pdt_03 |
522.8 |
Pdt_04 |
122.4 |
|
Pdt_05 |
722.4 |
|
0000003 |
Pdt_06 |
232.8 |
Pdt_02 |
33.8 |
現在需要求出每一個訂單中最貴的商品。
1)輸入資料
0000001 Pdt_01 222.8
0000002 Pdt_05 722.4
0000001 Pdt_02 33.8
0000003 Pdt_06 232.8
0000003 Pdt_02 33.8
0000002 Pdt_03 522.8
0000002 Pdt_04 122.4
(2)期望輸出資料
1 222.8
2 722.4
3 232.8
2.需求分析
(1)利用“訂單id和成交金額”作為key,可以將Map階段讀取到的所有訂單資料按照id升序排序,如果id相同再按照金額降序排序,傳送到Reduce。
(2)在Reduce端利用groupingComparator將訂單id相同的kv聚合成組,然後取第一個即是該訂單中最貴商品,如圖所示。
3.程式碼實現
(1)定義訂單資訊OrderBean類
package com.demo.mapreduce.order; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import org.apache.hadoop.io.WritableComparable;
public class OrderBean implements WritableComparable<OrderBean> {
private int order_id; // 訂單id號 private double price; // 價格
public OrderBean() { super(); }
public OrderBean(int order_id, double price) { super(); this.order_id = order_id; this.price = price; }
@Override public void write(DataOutput out) throws IOException { out.writeInt(order_id); out.writeDouble(price); }
@Override public void readFields(DataInput in) throws IOException { order_id = in.readInt(); price = in.readDouble(); }
@Override public String toString() { return order_id + "\t" + price; }
public int getOrder_id() { return order_id; }
public void setOrder_id(int order_id) { this.order_id = order_id; }
public double getPrice() { return price; }
public void setPrice(double price) { this.price = price; }
// 二次排序 @Override public int compareTo(OrderBean o) {
int result;
if (order_id > o.getOrder_id()) { result = 1; } else if (order_id < o.getOrder_id()) { result = -1; } else { // 價格倒序排序 result = price > o.getPrice() ? -1 : 1; }
return result; } } |
(2)編寫OrderSortMapper類
package com.demo.mapreduce.order;
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class OrderMapper extends Mapper<LongWritable, Text, OrderBean, NullWritable> {
OrderBean k = new OrderBean();
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
// 1 獲取一行
String line = value.toString();
// 2 擷取
String[] fields = line.split("\t");
// 3 封裝物件
k.setOrder_id(Integer.parseInt(fields[0]));
k.setPrice(Double.parseDouble(fields[2]));
// 4 寫出
context.write(k, NullWritable.get());
}
}
(3)編寫OrderSortGroupingComparator類
package com.demo.mapreduce.order;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
public class OrderGroupingComparator extends WritableComparator {
protected OrderGroupingComparator() {
super(OrderBean.class, true);
}
@Override
public int compare(WritableComparable a, WritableComparable b) {
OrderBean aBean = (OrderBean) a;
OrderBean bBean = (OrderBean) b;
int result;
if (aBean.getOrder_id() > bBean.getOrder_id()) {
result = 1;
} else if (aBean.getOrder_id() < bBean.getOrder_id()) {
result = -1;
} else {
result = 0;
}
return result;
}
}
(4)編寫OrderSortReducer類
package com.demo.mapreduce.order;
import java.io.IOException;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Reducer;
public class OrderReducer extends Reducer<OrderBean, NullWritable, OrderBean, NullWritable> {
@Override
protected void reduce(OrderBean key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {
context.write(key, NullWritable.get());
}
}
(5)編寫OrderSortDriver類
package com.demo.mapreduce.order;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class OrderDriver {
public static void main(String[] args) throws Exception, IOException {
// 輸入輸出路徑需要根據自己電腦上實際的輸入輸出路徑設定
args = new String[]{"e:/input/inputorder" , "e:/output1"};
// 1 獲取配置資訊
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
// 2 設定jar包載入路徑
job.setJarByClass(OrderDriver.class);
// 3 載入map/reduce類
job.setMapperClass(OrderMapper.class);
job.setReducerClass(OrderReducer.class);
// 4 設定map輸出資料key和value型別
job.setMapOutputKeyClass(OrderBean.class);
job.setMapOutputValueClass(NullWritable.class);
// 5 設定最終輸出資料的key和value型別
job.setOutputKeyClass(OrderBean.class);
job.setOutputValueClass(NullWritable.class);
// 6 設定輸入資料和輸出資料路徑
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
// 8 設定reduce端的分組
job.setGroupingComparatorClass(OrderGroupingComparator.class);
// 7 提交
boolean result = job.waitForCompletion(true);
System.exit(result ? 0 : 1);
}
}