自定義GroupingComparator -- 求出每一筆訂單中成交金額最大的一筆交易
阿新 • • 發佈:2018-11-20
程式碼地址:
https://gitee.com/tanghongping/hadoopMapReduce/tree/master/src/com/thp/bigdata/secondarySort
訂單id | 商品id | 成交金額 |
---|---|---|
Order_0000001 | Pdt_01 | 222.8 |
Order_0000001 | Pdt_05 | 25.8 |
Order_0000002 | Pdt_03 | 522.8 |
Order_0000002 | Pdt_04 | 122.4 |
Order_0000002 | Pdt_05 | 722.4 |
Order_0000003 | Pdt_01 | 222.8 |
現在需要求出每一個訂單中成交金額最大的一筆交易
分析:
相同的訂單id必須到同一個reduce去才能進行統計出每個訂單中數量最大的那筆。
寫一個Partition方法,只要是訂單相同的就讓他們到同一個reduce中。
但是傳遞過去的給同一個reduce進行處理的資料都是相同的訂單id,但是卻是三個不同的bean,三個bean是不能看成一個key的。
OrderBean:
package com.thp.bigdata.secondarySort; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import org.apache.hadoop.io.DoubleWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.WritableComparable; /** * 訂單 * @author 湯小萌 * */ public class OrderBean implements WritableComparable<OrderBean>{ private Text itemId; // 訂單id private DoubleWritable mount; // 訂單數量 public OrderBean() {} public OrderBean(Text itemId, DoubleWritable mount) { set(itemId, mount); } public void set(Text itemId, DoubleWritable mount) { this.itemId = itemId; this.mount = mount; } public Text getItemId() { return itemId; } public void setItemId(Text itemId) { this.itemId = itemId; } public DoubleWritable getMount() { return mount; } public void setMount(DoubleWritable mount) { this.mount = mount; } @Override public String toString() { return itemId + "\t" + mount.get(); } @Override public void write(DataOutput out) throws IOException { out.writeUTF(itemId.toString()); out.writeDouble(mount.get()); } @Override public void readFields(DataInput in) throws IOException { this.itemId = new Text(in.readUTF()); this.mount = new DoubleWritable(in.readDouble()); } // 【注意:】 // 這個方法是進行排序的 /** * 在記憶體往外溢位的時候需要呼叫比較方法進行排序 * 在檔案進行合併 merge 的時候也需要呼叫比較方法進行排序 */ @Override public int compareTo(OrderBean o) { int cmp = this.itemId.compareTo(o.getItemId()); if(cmp == 0) { // 加上了 - 號 就變成了倒序排序了 從大往小排序 cmp = -this.mount.compareTo(o.mount); } return cmp; } }
ItemIdPartitioner :
package com.thp.bigdata.secondarySort;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Partitioner;
import com.thp.bigdata.secondarySort.OrderBean;
/**
* 自定義的Paritioner:
* 讓相同的id分到相同的partition 進行處理
* @author 湯小萌
*
*/
public class ItemIdPartitioner extends Partitioner<OrderBean, NullWritable> {
/**
* 相同id的OrderBean會發往相同的parttion
* 而且產生的分割槽數,是會跟使用者設定的 reduce task保持一致
* numPartitions 就是 設定的 reduce task
*/
@Override
public int getPartition(OrderBean bean, NullWritable value, int numPartitions) {
return (bean.getItemId().hashCode() & Integer.MAX_VALUE) % numPartitions;
}
}
ItemIdGroupingComparator :
package com.thp.bigdata.secondarySort;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
import com.thp.bigdata.secondarySort.OrderBean;
/**
* 利用reduce端的ItemIdGroupingComparator來實現將相同的id的OrderBean看成相同的Key
* @author 湯小萌
*
*/
public class ItemIdGroupingComparator extends WritableComparator {
// 這個構造方法是一定要有的
// 傳入作為key的bean的class型別,已經制定主要讓框架做反射的例項物件
protected ItemIdGroupingComparator() {
super(OrderBean.class, true);
}
@Override
public int compare(WritableComparable a, WritableComparable b) {
OrderBean aBean = (OrderBean) a;
OrderBean bBean = (OrderBean) b;
// 相同的orderId就認為是相同的key
return aBean.getItemId().compareTo(bBean.getItemId());
}
}
MapReduce過程
package com.thp.bigdata.secondarySort;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class SecondarySort {
/**
* Order_0000001,Pdt_01,222.8
Order_0000001,Pdt_05,25.8
Order_0000002,Pdt_05,325.8
Order_0000002,Pdt_03,522.8
Order_0000002,Pdt_04,122.4
Order_0000003,Pdt_01,222.8
*
* 由於Orderbean定義了compareTo方法,所以在shuffle階段就會進行排序
* 接下來就是要使用自定義的partitioner進行分割槽
* 我們進行分割槽的目的是要將相同的id的OrderBean發往相同的partition進行處理
* 每一個partition拿到的都是相同的id的OrderBean
* 但是key卻不是一樣的,我們現在要欺騙parition,讓它以為相同id的OrderBean都是相同的key
* 那麼處理的時候,就會只保留第一個key,就是我們之前排序好放在最前面的key就是這個id下的訂單數量最高的OrderBean
*
*/
static class SecondarySortMapper extends Mapper<LongWritable, Text, OrderBean, NullWritable> {
OrderBean bean = new OrderBean();
@Override
protected void map(LongWritable key, Text value,Context context)
throws IOException, InterruptedException {
String line = value.toString();
System.out.println(line);
String[] fields = line.split(",");
// System.out.println(fields[0] + " -- " + fields[2]);
bean.set(new Text(fields[0]), new DoubleWritable(Double.parseDouble(fields[2])));
// System.out.println(bean.getItemId());
context.write(bean, NullWritable.get());
}
}
static class SecondarySortReducer extends Reducer<OrderBean, NullWritable, OrderBean, NullWritable> {
@Override
protected void reduce(OrderBean key, Iterable<NullWritable> values, Context context)
throws IOException, InterruptedException {
context.write(key, NullWritable.get());
}
}
public static void main(String[] args) throws IllegalArgumentException, IOException, ClassNotFoundException, InterruptedException {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
job.setJarByClass(SecondarySort.class);
job.setMapperClass(SecondarySortMapper.class);
job.setReducerClass(SecondarySortReducer.class);
job.setOutputKeyClass(OrderBean.class);
job.setOutputValueClass(NullWritable.class);
FileInputFormat.setInputPaths(job, new Path("f:/order/input"));
FileOutputFormat.setOutputPath(job, new Path("f:/order/output"));
job.setGroupingComparatorClass(ItemIdGroupingComparator.class);
job.setPartitionerClass(ItemIdPartitioner.class);
job.setNumReduceTasks(3);
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}