hadoop入門7:自定義GroupingComparator進行分組
摘要:
GroupingComparator是在reduce階段分組來使用的,由於reduce階段,如果key相同的一組,只取第一個key作為key,迭代所有的values。 如果reduce的key是自定義的bean,我們只需要bean裡面的某個屬性相同就認為這樣的key是相同的,這是我們就需要之定義GroupCoparator來“欺騙”reduce了。 我們需要理清楚的還有map階段你的幾個自定義: parttioner中的getPartition()這個是map階段自定義分割槽, bean中定義CopmareTo()是在溢位和merge時用來來排序的。
demo資料:
訂單id 金額 產品名稱
order_234578,4789,筆記本 order_123456,7789,筆記本 order_123456,1789,手機 order_234578,4789,手機 order_123456,3789,筆記本 order_00001,4789,筆記本 order_00002,7789,筆記本 order_00001,5789,洗衣機 order_00002,17789,伺服器
根據上面的訂單資訊需要求出每一個訂單中成交金額最大的一筆交易。
設計思路:
1、利用“訂單id和金額”作為key,可以將map階段讀取到的所有訂單資料按照id分割槽,按照金額排序,傳送到reduce
2、在reduce端利用groupingcomparator將訂單id相同的kv聚合成組,然後取第一個即是最大值
groupingcomparator程式碼:
package com.zsy.mr.groupingcomparator; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.io.WritableComparator; public class ItemIdGroupingComparator extends WritableComparator { protected ItemIdGroupingComparator() { super(OrderBean.class,true); } @SuppressWarnings("rawtypes") @Override public int compare(WritableComparable a, WritableComparable b) { OrderBean aBean = (OrderBean)a; OrderBean bOrderBean = (OrderBean)b; return aBean.getItemId().compareTo(bOrderBean.getItemId()); } }
Partitioner程式碼:
package com.zsy.mr.groupingcomparator;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Partitioner;
public class ItemIdPartitioner extends Partitioner<OrderBean, NullWritable> {
//相同的id會發往相同的partitioner,產生的分割槽數是根據使用者設定的reducetask數保持一致,即numReduceTasks數是使用者在設定的數字
@Override
public int getPartition(OrderBean key, NullWritable value, int numReduceTasks) {
return (key.getItemId().hashCode() & Integer.MAX_VALUE) % numReduceTasks;
}
}
OrderBean程式碼:
package com.zsy.mr.groupingcomparator;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.WritableComparable;
public class OrderBean implements WritableComparable<OrderBean> {
private String itemId;
private String productName;
private Float price;
@Override
public void write(DataOutput out) throws IOException {
out.writeUTF(itemId);
out.writeUTF(productName);
out.writeFloat(price);
}
@Override
public void readFields(DataInput in) throws IOException {
this.itemId = in.readUTF();
this.productName = in.readUTF();
this.price = in.readFloat();
}
@Override
public int compareTo(OrderBean o) {
// 如果訂單號相同,在進行價格比較
int result = this.itemId.compareTo(o.getItemId());
if (result == 0) {
result = -this.price.compareTo(o.price);
}
return result;
}
public String getItemId() {
return itemId;
}
public void setItemId(String itemId) {
this.itemId = itemId;
}
public String getProductName() {
return productName;
}
public void setProductName(String productName) {
this.productName = productName;
}
public float getPrice() {
return price;
}
public void setPrice(float price) {
this.price = price;
}
@Override
public String toString() {
return "itemId=" + itemId + ", productName=" + productName + ", price=" + price;
}
}
GroupingCommparatorSort程式碼:
package com.zsy.mr.groupingcomparator;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import com.zsy.mr.groupingcomparator.GroupingCommparatorSort.GroupingCommparatorSortMapper.GroupingCommparatorSortReducer;
public class GroupingCommparatorSort {
static class GroupingCommparatorSortMapper extends Mapper<LongWritable, Text, OrderBean, NullWritable> {
OrderBean orderBean = new OrderBean();
@Override
protected void map(LongWritable key, Text value,
Mapper<LongWritable, Text, OrderBean, NullWritable>.Context context)
throws IOException, InterruptedException {
String[] str = value.toString().split(",");
orderBean.setItemId(str[0]);
orderBean.setPrice(Float.parseFloat(str[1]));
orderBean.setProductName(str[2]);
context.write(orderBean, NullWritable.get());
}
static class GroupingCommparatorSortReducer extends Reducer<OrderBean, NullWritable, OrderBean, NullWritable> {
@Override
protected void reduce(OrderBean arg0, Iterable<NullWritable> arg1,
Reducer<OrderBean, NullWritable, OrderBean, NullWritable>.Context context)
throws IOException, InterruptedException {
context.write(arg0, NullWritable.get());
}
}
}
/**
* main:(這裡用一句話描述這個方法的作用).
*
* @author zhaoshouyun
* @param args
* @since 1.0
*/
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
/*
* conf.set("mapreduce.framework.name", "yarn");
* conf.set("yarn.resoucemanger.hostname", "hadoop01");
*/
Job job = Job.getInstance(conf);
job.setJarByClass(GroupingCommparatorSort.class);
// 指定本業務job要使用的業務類
job.setMapperClass(GroupingCommparatorSortMapper.class);
job.setReducerClass(GroupingCommparatorSortReducer.class);
// 指定mapper輸出的k v型別 如果map的輸出和reduce的輸出一樣,只需要設定輸出即可
// job.setMapOutputKeyClass(Text.class);
// job.setMapOutputValueClass(FlowBean.class);
// 指定最終輸出kv型別(reduce輸出型別)
job.setOutputKeyClass(OrderBean.class);
job.setOutputValueClass(NullWritable.class);
// 指定job的輸入檔案所在目錄
FileInputFormat.setInputPaths(job, new Path(args[0]));
// 指定job的輸出結果目錄
FileOutputFormat.setOutputPath(job, new Path(args[1]));
// 設定setGroupingComparatorClass
job.setGroupingComparatorClass(ItemIdGroupingComparator.class);
// 設定自定義的setPartitionerClass
job.setPartitionerClass(ItemIdPartitioner.class);
// 設定reducetask任務數為2
job.setNumReduceTasks(2);
// 將job中配置的相關引數,以及job所有的java類所在 的jar包,提交給yarn去執行
// job.submit();無結果返回,建議不使用它
boolean res = job.waitForCompletion(true);
System.exit(res ? 0 : 1);
}
}
執行結果-part-00000:
執行結果-part-00001: