Hadoop基礎(二十二):Shuffle機制(三)
7 Combiner合併
(6)自定義Combiner實現步驟
(a)自定義一個Combiner繼承Reducer,重寫Reduce方法
public class WordcountCombiner extends Reducer<Text, IntWritable, Text,IntWritable>{ @Override protected void reduce(Text key, Iterable<IntWritable> values,Context context) throws IOException, InterruptedException {// 1 彙總操作 int count = 0; for(IntWritable v :values){ count += v.get(); } // 2 寫出 context.write(key, new IntWritable(count)); } }
(b)在Job驅動類中設定:
job.setCombinerClass(WordcountCombiner.class);
8Combiner合併案例實操
1.需求
統計過程中對每一個MapTask的輸出進行區域性彙總,以減小網路傳輸量即採用
(1)資料輸入
(2)期望輸出資料
期望:Combine輸入資料多,輸出時經過合併,輸出資料降低。
2.需求分析
圖4-15Combiner的合併案例
3.案例實操-方案一
1)增加一個WordcountCombiner類繼承Reducer
package com.atguigu.mr.combiner; import java.io.IOException; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer;View Codepublic class WordcountCombiner extends Reducer<Text, IntWritable, Text, IntWritable>{ IntWritable v = new IntWritable(); @Override protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { // 1 彙總 int sum = 0; for(IntWritable value :values){ sum += value.get(); } v.set(sum); // 2 寫出 context.write(key, v); } }
2)在WordcountDriver驅動類中指定Combiner
// 指定需要使用combiner,以及用哪個類作為combiner的邏輯 job.setCombinerClass(WordcountCombiner.class);
4.案例實操-方案二
1)將WordcountReducer作為Combiner在WordcountDriver驅動類中指定
// 指定需要使用Combiner,以及用哪個類作為Combiner的邏輯 job.setCombinerClass(WordcountReducer.class);
執行程式,如圖4-16,4-17所示
圖4-16未使用前
圖4-17使用後
9 GroupingComparator分組(輔助排序)
對Reduce階段的資料根據某一個或幾個欄位進行分組。
分組排序步驟:
(1)自定義類繼承WritableComparator
(2)重寫compare()方法
@Override public int compare(WritableComparable a, WritableComparable b) { // 比較的業務邏輯 return result; }
(3)建立一個構造將比較物件的類傳給父類
protected OrderGroupingComparator() { super(OrderBean.class, true); }
10GroupingComparator分組案例實操
1.需求
有如下訂單資料
表4-2 訂單資料
訂單id |
商品id |
成交金額 |
0000001 |
Pdt_01 |
222.8 |
Pdt_02 |
33.8 |
|
0000002 |
Pdt_03 |
522.8 |
Pdt_04 |
122.4 |
|
Pdt_05 |
722.4 |
|
0000003 |
Pdt_06 |
232.8 |
Pdt_02 |
33.8 |
現在需要求出每一個訂單中最貴的商品。
(1)輸入資料
(2)期望輸出資料
1 222.8
2 722.4
3 232.8
2.需求分析
(1)利用“訂單id和成交金額”作為key,可以將Map階段讀取到的所有訂單資料按照id升序排序,如果id相同再按照金額降序排序,傳送到Reduce。
(2)在Reduce端利用groupingComparator將訂單id相同的kv聚合成組,然後取第一個即是該訂單中最貴商品,如圖4-18所示。
圖4-18過程分析
3.程式碼實現
(1)定義訂單資訊OrderBean類
package com.atguigu.mapreduce.order; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import org.apache.hadoop.io.WritableComparable; public class OrderBean implements WritableComparable<OrderBean> { private int order_id; // 訂單id號 private double price; // 價格 public OrderBean() { super(); } public OrderBean(int order_id, double price) { super(); this.order_id = order_id; this.price = price; } @Override public void write(DataOutput out) throws IOException { out.writeInt(order_id); out.writeDouble(price); } @Override public void readFields(DataInput in) throws IOException { order_id = in.readInt(); price = in.readDouble(); } @Override public String toString() { return order_id + "\t" + price; } public int getOrder_id() { return order_id; } public void setOrder_id(int order_id) { this.order_id = order_id; } public double getPrice() { return price; } public void setPrice(double price) { this.price = price; } // 二次排序 @Override public int compareTo(OrderBean o) { int result; if (order_id > o.getOrder_id()) { result = 1; } else if (order_id < o.getOrder_id()) { result = -1; } else { // 價格倒序排序 result = price > o.getPrice() ? -1 : 1; } return result; } }View Code
(2)編寫OrderSortMapper類
package com.atguigu.mapreduce.order; import java.io.IOException; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; public class OrderMapper extends Mapper<LongWritable, Text, OrderBean, NullWritable> { OrderBean k = new OrderBean(); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { // 1 獲取一行 String line = value.toString(); // 2 擷取 String[] fields = line.split("\t"); // 3 封裝物件 k.setOrder_id(Integer.parseInt(fields[0])); k.setPrice(Double.parseDouble(fields[2])); // 4 寫出 context.write(k, NullWritable.get()); } }View Code
(3)編寫OrderSortGroupingComparator類
package com.atguigu.mapreduce.order; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.io.WritableComparator; public class OrderGroupingComparator extends WritableComparator { protected OrderGroupingComparator() { super(OrderBean.class, true); } @Override public int compare(WritableComparable a, WritableComparable b) { OrderBean aBean = (OrderBean) a; OrderBean bBean = (OrderBean) b; int result; if (aBean.getOrder_id() > bBean.getOrder_id()) { result = 1; } else if (aBean.getOrder_id() < bBean.getOrder_id()) { result = -1; } else { result = 0; } return result; } }
(4)編寫OrderSortReducer類
package com.atguigu.mapreduce.order; import java.io.IOException; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.mapreduce.Reducer; public class OrderReducer extends Reducer<OrderBean, NullWritable, OrderBean, NullWritable> { @Override protected void reduce(OrderBean key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException { context.write(key, NullWritable.get()); } }View Code
(5)編寫OrderSortDriver類
package com.atguigu.mapreduce.order; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class OrderDriver { public static void main(String[] args) throws Exception, IOException { // 輸入輸出路徑需要根據自己電腦上實際的輸入輸出路徑設定 args = new String[]{"e:/input/inputorder" , "e:/output1"}; // 1 獲取配置資訊 Configuration conf = new Configuration(); Job job = Job.getInstance(conf); // 2 設定jar包載入路徑 job.setJarByClass(OrderDriver.class); // 3 載入map/reduce類 job.setMapperClass(OrderMapper.class); job.setReducerClass(OrderReducer.class); // 4 設定map輸出資料key和value型別 job.setMapOutputKeyClass(OrderBean.class); job.setMapOutputValueClass(NullWritable.class); // 5 設定最終輸出資料的key和value型別 job.setOutputKeyClass(OrderBean.class); job.setOutputValueClass(NullWritable.class); // 6 設定輸入資料和輸出資料路徑 FileInputFormat.setInputPaths(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); // 8 設定reduce端的分組 job.setGroupingComparatorClass(OrderGroupingComparator.class); // 7 提交 boolean result = job.waitForCompletion(true); System.exit(result ? 0 : 1); } }