Hadoop_26_MapReduce_Reduce端使用GroupingComparator求同一訂單中最大金額的訂單
阿新 • • 發佈:2018-07-02
size 流程 機制 apach ble lose alt inf ping
1. 自定義GroupingComparator
1.1.需求:有如下訂單
現在需要求出每一個訂單中成交金額最大的一筆交易
1.2.分析:
1、利用“訂單id和成交金額”Bean作為key,可以將map階段讀取到的所有訂單數據按照id分區,按照金額排序,
發送到reduce
2、在reduce端利用GroupingComparator將訂單id相同的kv聚合成組,然後取第一個即是最大值
定義訂單信息bean,實現CompareTo()方法用於排序
package cn.bigdata.hdfs.secondarySort; import java.io.DataInput; importView Codejava.io.DataOutput; import java.io.IOException; import org.apache.hadoop.io.DoubleWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.WritableComparable; /** * 訂單信息bean,實現hadoop的序列化機制 */ public class OrderBean implements WritableComparable<OrderBean>{ private Text itemid;private DoubleWritable amount; public OrderBean() { } public OrderBean(Text itemid, DoubleWritable amount) { set(itemid, amount); } public void set(Text itemid, DoubleWritable amount) { this.itemid = itemid; this.amount = amount; } public Text getItemid() {return itemid; } public DoubleWritable getAmount() { return amount; } //1.模型必須實現Comparable<T>接口 /*2.Collections.sort(list);會自動調用compareTo,如果沒有這句,list是不會排序的,也不會調用compareTo方法 3.如果是數組則用的是Arrays.sort(a)方法*/ //implements WritableComparable必須要實現的方法,用於比較排序 @Override public int compareTo(OrderBean o) { //根據ID排序 int cmp = this.itemid.compareTo(o.getItemid()); //id相同根據金額排序 if (cmp == 0) { cmp = -this.amount.compareTo(o.getAmount()); } return cmp; } @Override public void write(DataOutput out) throws IOException { out.writeUTF(itemid.toString()); out.writeDouble(amount.get()); } @Override public void readFields(DataInput in) throws IOException { String readUTF = in.readUTF(); double readDouble = in.readDouble(); this.itemid = new Text(readUTF); this.amount= new DoubleWritable(readDouble); } @Override public String toString() { return itemid.toString() + "\t" + amount.get(); } }
自定義Partitioner用於分區
import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.mapreduce.Partitioner; public class ItemIdPartitioner extends Partitioner<OrderBean, NullWritable>{ @Override public int getPartition(OrderBean bean, NullWritable value, int numReduceTasks) { //相同id的訂單bean,會發往相同的partition //而且,產生的分區數,是會跟用戶設置的reduce task數保持一致 return (bean.getItemid().hashCode() & Integer.MAX_VALUE) % numReduceTasks; } }
自定義GroupingComparator,在調用Reduce時對數據分組
package cn.bigdata.hdfs.secondarySort; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.io.WritableComparator; /** * 用於控制shuffle過程中reduce端對kv對的聚合邏輯 * 利用reduce端的GroupingComparator來實現將一組bean看成相同的key */ public class ItemidGroupingComparator extends WritableComparator { //傳入作為key的bean的class類型,以及制定需要讓框架做反射獲取實例對象 protected ItemidGroupingComparator() { super(OrderBean.class, true); } @Override public int compare(WritableComparable a, WritableComparable b) { OrderBean abean = (OrderBean) a; OrderBean bbean = (OrderBean) b; //將item_id相同的bean都視為相同,從而聚合為一組 //比較兩個bean時,指定只比較bean中的orderid return abean.getItemid().compareTo(bbean.getItemid()); } }
編寫mapreduce處理流程
/** * Order_0000001,Pdt_01,222.8 * Order_0000001,Pdt_05,25.8 * Order_0000002,Pdt_05,325.8 * Order_0000002,Pdt_03,522.8 * Order_0000002,Pdt_04,122.4 * Order_0000003,Pdt_01,222.8 */ public class SecondarySort { static class SecondarySortMapper extends Mapper<LongWritable, Text, OrderBean, NullWritable>{ OrderBean bean = new OrderBean(); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString(); String[] fields = StringUtils.split(line, ","); bean.set(new Text(fields[0]), new DoubleWritable(Double.parseDouble(fields[2]))); context.write(bean, NullWritable.get()); } } static class SecondarySortReducer extends Reducer<OrderBean, NullWritable, OrderBean, NullWritable>{ //到達reduce時,相同id的所有bean已經被看成一組,且金額最大的那個排在第一位 //在設置了groupingcomparator以後,這裏收到的kv數據就是: <1001 87.6>,null <1001 76.5>,null .... //此時,reduce方法中的參數key就是上述kv組中的第一個kv的key:<1001 87.6> //要輸出同一個item的所有訂單中最大金額的那一個,就只要輸出這個key @Override protected void reduce(OrderBean key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException { context.write(key, NullWritable.get()); } } public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); Job job = Job.getInstance(conf); job.setJarByClass(SecondarySort.class); job.setMapperClass(SecondarySortMapper.class); job.setReducerClass(SecondarySortReducer.class); job.setOutputKeyClass(OrderBean.class); job.setOutputValueClass(NullWritable.class); FileInputFormat.setInputPaths(job, new Path("F:/secondary")); FileOutputFormat.setOutputPath(job, new Path("F:/secondaryOut")); //在此設置自定義的Groupingcomparator類 job.setGroupingComparatorClass(ItemidGroupingComparator.class); //在此設置自定義的partitioner類 job.setPartitionerClass(ItemIdPartitioner.class); //設置Reduce的數量 job.setNumReduceTasks(2); job.waitForCompletion(true); } }
文件:
Order_0000001,Pdt_01,222.8 Order_0000001,Pdt_05,25.8 Order_0000002,Pdt_05,325.8 Order_0000002,Pdt_03,522.8 Order_0000002,Pdt_04,122.4 Order_0000003,Pdt_01,222.8
Hadoop_26_MapReduce_Reduce端使用GroupingComparator求同一訂單中最大金額的訂單