1. 程式人生 > >GroupingComparator分組(輔助排序)的作用以及GroupingComparator分組案例實操

GroupingComparator分組(輔助排序)的作用以及GroupingComparator分組案例實操

問題分析:

partioner是在MapTask階段將資料寫入環形緩衝區中進行的分割槽操作,其目的是為了劃分出幾個結果檔案(ReduceTask,但是partioner必須小於ReduceTask個數),而是什麼決定將一組資料傳送給一次Reduce類中的reduce方法中呢?換句話說,Reduce類中的reduce方法中key一樣,values有多個,是什麼情況下的key是一樣的,能不能自定義。其實這就是 GroupingComparator分組(輔助排序)的作用。

 GroupingComparator分組(輔助排序)

對Reduce階段的資料根據某一個或幾個欄位進行分組。

分組排序步驟:

(1)自定義類繼承WritableComparator

(2)重寫compare()方法

@Override

public int compare(WritableComparable a, WritableComparable b) {

      // 比較的業務邏輯

      return result;

}

(3)建立一個構造將比較物件的類傳給父類

protected OrderGroupingComparator() {

      super(OrderBean.class, true);

}

GroupingComparator分組案例實操

1.需求

有如下訂單資料

表4-2 訂單資料

訂單id

商品id

成交金額

0000001

Pdt_01

222.8

Pdt_02

33.8

0000002

Pdt_03

522.8

Pdt_04

122.4

Pdt_05

722.4

0000003

Pdt_06

232.8

Pdt_02

33.8

現在需要求出每一個訂單中最貴的商品。

1)輸入資料

0000001    Pdt_01    222.8
0000002    Pdt_05    722.4
0000001    Pdt_02    33.8
0000003    Pdt_06    232.8
0000003    Pdt_02    33.8
0000002    Pdt_03    522.8
0000002    Pdt_04    122.4

(2)期望輸出資料

1       222.8

2       722.4

3       232.8

2.需求分析

(1)利用“訂單id和成交金額”作為key,可以將Map階段讀取到的所有訂單資料按照id升序排序,如果id相同再按照金額降序排序,傳送到Reduce

(2)在Reduce端利用groupingComparator將訂單id相同的kv聚合成組,然後取第一個即是該訂單中最貴商品,如圖所示。

3.程式碼實現

(1)定義訂單資訊OrderBean類

package com.demo.mapreduce.order;

import java.io.DataInput;

import java.io.DataOutput;

import java.io.IOException;

import org.apache.hadoop.io.WritableComparable;

 

public class OrderBean implements WritableComparable<OrderBean> {

 

   private int order_id; // 訂單id號

   private double price; // 價格

 

   public OrderBean() {

      super();

   }

 

   public OrderBean(int order_id, double price) {

      super();

      this.order_id = order_id;

      this.price = price;

   }

 

   @Override

   public void write(DataOutput out) throws IOException {

      out.writeInt(order_id);

      out.writeDouble(price);

   }

 

   @Override

   public void readFields(DataInput in) throws IOException {

      order_id = in.readInt();

      price = in.readDouble();

   }

 

   @Override

   public String toString() {

      return order_id + "\t" + price;

   }

 

   public int getOrder_id() {

      return order_id;

   }

 

   public void setOrder_id(int order_id) {

      this.order_id = order_id;

   }

 

   public double getPrice() {

      return price;

   }

 

   public void setPrice(double price) {

      this.price = price;

   }

 

   // 二次排序

   @Override

   public int compareTo(OrderBean o) {

 

      int result;

 

      if (order_id > o.getOrder_id()) {

          result = 1;

      } else if (order_id < o.getOrder_id()) {

          result = -1;

      } else {

          // 價格倒序排序

          result = price > o.getPrice() ? -1 : 1;

      }

 

       return result;

   }

}

(2)編寫OrderSortMapper類

package com.demo.mapreduce.order;

import java.io.IOException;

import org.apache.hadoop.io.LongWritable;

import org.apache.hadoop.io.NullWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Mapper;

 

public class OrderMapper extends Mapper<LongWritable, Text, OrderBean, NullWritable> {

 

   OrderBean k = new OrderBean();

  

   @Override

   protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

     

      // 1 獲取一行

      String line = value.toString();

     

      // 2 擷取

      String[] fields = line.split("\t");

     

      // 3 封裝物件

      k.setOrder_id(Integer.parseInt(fields[0]));

      k.setPrice(Double.parseDouble(fields[2]));

     

      // 4 寫出

      context.write(k, NullWritable.get());

   }

}

(3)編寫OrderSortGroupingComparator類

package com.demo.mapreduce.order;

import org.apache.hadoop.io.WritableComparable;

import org.apache.hadoop.io.WritableComparator;

 

public class OrderGroupingComparator extends WritableComparator {

 

   protected OrderGroupingComparator() {

      super(OrderBean.class, true);

   }

 

   @Override

   public int compare(WritableComparable a, WritableComparable b) {

 

      OrderBean aBean = (OrderBean) a;

      OrderBean bBean = (OrderBean) b;

 

      int result;

      if (aBean.getOrder_id() > bBean.getOrder_id()) {

          result = 1;

   } else if (aBean.getOrder_id() < bBean.getOrder_id()) {

          result = -1;

      } else {

          result = 0;

      }

 

      return result;

   }

}

(4)編寫OrderSortReducer類

package com.demo.mapreduce.order;

import java.io.IOException;

import org.apache.hadoop.io.NullWritable;

import org.apache.hadoop.mapreduce.Reducer;

 

public class OrderReducer extends Reducer<OrderBean, NullWritable, OrderBean, NullWritable> {

 

   @Override

   protected void reduce(OrderBean key, Iterable<NullWritable> values, Context context)      throws IOException, InterruptedException {

     

      context.write(key, NullWritable.get());

   }

}

(5)編寫OrderSortDriver類

package com.demo.mapreduce.order;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.NullWritable;

import org.apache.hadoop.mapreduce.Job;

import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

 

public class OrderDriver {

 

   public static void main(String[] args) throws Exception, IOException {

 

// 輸入輸出路徑需要根據自己電腦上實際的輸入輸出路徑設定

      args  = new String[]{"e:/input/inputorder" , "e:/output1"};

 

      // 1 獲取配置資訊

      Configuration conf = new Configuration();

      Job job = Job.getInstance(conf);

 

      // 2 設定jar包載入路徑

      job.setJarByClass(OrderDriver.class);

 

      // 3 載入map/reduce類

      job.setMapperClass(OrderMapper.class);

      job.setReducerClass(OrderReducer.class);

 

      // 4 設定map輸出資料key和value型別

      job.setMapOutputKeyClass(OrderBean.class);

      job.setMapOutputValueClass(NullWritable.class);

 

      // 5 設定最終輸出資料的key和value型別

      job.setOutputKeyClass(OrderBean.class);

      job.setOutputValueClass(NullWritable.class);

 

      // 6 設定輸入資料和輸出資料路徑

      FileInputFormat.setInputPaths(job, new Path(args[0]));

      FileOutputFormat.setOutputPath(job, new Path(args[1]));

 

       // 8 設定reduce端的分組

   job.setGroupingComparatorClass(OrderGroupingComparator.class);

 

      // 7 提交

      boolean result = job.waitForCompletion(true);

      System.exit(result ? 0 : 1);

   }

}