1. 程式人生 > 實用技巧 >Hadoop基礎(二十二):Shuffle機制(三)

Hadoop基礎(二十二):Shuffle機制(三)

7 Combiner合併

6)自定義Combiner實現步驟

(a)自定義一個Combiner繼承Reducer,重寫Reduce方法

public class WordcountCombiner extends Reducer<Text, IntWritable, Text,IntWritable>{

    @Override
    protected void reduce(Text key, Iterable<IntWritable> values,Context context) throws IOException, InterruptedException {

        
// 1 彙總操作 int count = 0; for(IntWritable v :values){ count += v.get(); } // 2 寫出 context.write(key, new IntWritable(count)); } }

(b)在Job驅動類中設定:

job.setCombinerClass(WordcountCombiner.class);

8Combiner合併案例實操

1.需求

統計過程對每一個MapTask的輸出進行區域性彙總,以減小網路傳輸量即採用

Combiner功能。

1)資料輸入

2)期望輸出資料

期望Combine輸入資料多,輸出時經過合併,輸出資料降低。

2.需求分析

4-15Combiner的合併案例

3.案例實操-方案

1增加一個WordcountCombiner類繼承Reducer

package com.atguigu.mr.combiner;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class WordcountCombiner extends Reducer<Text, IntWritable, Text, IntWritable>{ IntWritable v = new IntWritable(); @Override protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { // 1 彙總 int sum = 0; for(IntWritable value :values){ sum += value.get(); } v.set(sum); // 2 寫出 context.write(key, v); } }
View Code

2)在WordcountDriver驅動類中指定Combiner

// 指定需要使用combiner,以及用哪個類作為combiner的邏輯
job.setCombinerClass(WordcountCombiner.class);

4.案例實操-方案二

1)將WordcountReducer作為CombinerWordcountDriver驅動類中指定

// 指定需要使用Combiner,以及用哪個類作為Combiner的邏輯
job.setCombinerClass(WordcountReducer.class);

執行程式,如圖4-164-17所示

4-16未使用前

4-17使用後

9 GroupingComparator分組(輔助排序

對Reduce階段的資料根據某一個幾個欄位進行分組

分組排序步驟:

1)自定義類繼承WritableComparator

2重寫compare()方法

@Override
public int compare(WritableComparable a, WritableComparable b) {
        // 比較的業務邏輯
        return result;
}

3)建立一個構造比較物件的類傳父類

protected OrderGroupingComparator() {
        super(OrderBean.class, true);
}

10GroupingComparator分組案例實操

1.需求

有如下訂單資料

4-2 訂單資料

訂單id

商品id

成交金額

0000001

Pdt_01

222.8

Pdt_02

33.8

0000002

Pdt_03

522.8

Pdt_04

122.4

Pdt_05

722.4

0000003

Pdt_06

232.8

Pdt_02

33.8

現在需要求出每一個訂單中最貴的商品。

1)輸入資料

2)期望輸出資料

1 222.8

2 722.4

3 232.8

2.需求分析

1)利用“訂單id和成交金額”作為key,可以將Map階段讀取到的所有訂單資料按照id升序排序,如果id相同再按照金額降序排序,傳送到Reduce

2)在Reduce端利用groupingComparator將訂單id相同的kv聚合成組,然後取第一個即是該訂單中最貴商品,如圖4-18所示。

4-18過程分析

3.程式碼實現

1)定義訂單資訊OrderBean

package com.atguigu.mapreduce.order;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.WritableComparable;

public class OrderBean implements WritableComparable<OrderBean> {

    private int order_id; // 訂單id號
    private double price; // 價格

    public OrderBean() {
        super();
    }

    public OrderBean(int order_id, double price) {
        super();
        this.order_id = order_id;
        this.price = price;
    }

    @Override
    public void write(DataOutput out) throws IOException {
        out.writeInt(order_id);
        out.writeDouble(price);
    }

    @Override
    public void readFields(DataInput in) throws IOException {
        order_id = in.readInt();
        price = in.readDouble();
    }

    @Override
    public String toString() {
        return order_id + "\t" + price;
    }

    public int getOrder_id() {
        return order_id;
    }

    public void setOrder_id(int order_id) {
        this.order_id = order_id;
    }

    public double getPrice() {
        return price;
    }

    public void setPrice(double price) {
        this.price = price;
    }

    // 二次排序
    @Override
    public int compareTo(OrderBean o) {

        int result;

        if (order_id > o.getOrder_id()) {
            result = 1;
        } else if (order_id < o.getOrder_id()) {
            result = -1;
        } else {
            // 價格倒序排序
            result = price > o.getPrice() ? -1 : 1;
        }

        return result;
    }
}
View Code

2)編寫OrderSortMapper

package com.atguigu.mapreduce.order;
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class OrderMapper extends Mapper<LongWritable, Text, OrderBean, NullWritable> {

    OrderBean k = new OrderBean();
    
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        
        // 1 獲取一行
        String line = value.toString();
        
        // 2 擷取
        String[] fields = line.split("\t");
        
        // 3 封裝物件
        k.setOrder_id(Integer.parseInt(fields[0]));
        k.setPrice(Double.parseDouble(fields[2]));
        
        // 4 寫出
        context.write(k, NullWritable.get());
    }
}
View Code

(3)編寫OrderSortGroupingComparator

package com.atguigu.mapreduce.order;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;

public class OrderGroupingComparator extends WritableComparator {

    protected OrderGroupingComparator() {
        super(OrderBean.class, true);
    }

    @Override
    public int compare(WritableComparable a, WritableComparable b) {

        OrderBean aBean = (OrderBean) a;
        OrderBean bBean = (OrderBean) b;

        int result;
        if (aBean.getOrder_id() > bBean.getOrder_id()) {
            result = 1;
        } else if (aBean.getOrder_id() < bBean.getOrder_id()) {
            result = -1;
        } else {
            result = 0;
        }

        return result;
    }
}

(4)編寫OrderSortReducer

package com.atguigu.mapreduce.order;
import java.io.IOException;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Reducer;

public class OrderReducer extends Reducer<OrderBean, NullWritable, OrderBean, NullWritable> {

    @Override
    protected void reduce(OrderBean key, Iterable<NullWritable> values, Context context)        throws IOException, InterruptedException {
        
        context.write(key, NullWritable.get());
    }
}
View Code

(5)編寫OrderSortDriver

package com.atguigu.mapreduce.order;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class OrderDriver {

    public static void main(String[] args) throws Exception, IOException {

// 輸入輸出路徑需要根據自己電腦上實際的輸入輸出路徑設定
        args  = new String[]{"e:/input/inputorder" , "e:/output1"};

        // 1 獲取配置資訊
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf);

        // 2 設定jar包載入路徑
        job.setJarByClass(OrderDriver.class);

        // 3 載入map/reduce類
        job.setMapperClass(OrderMapper.class);
        job.setReducerClass(OrderReducer.class);

        // 4 設定map輸出資料key和value型別
        job.setMapOutputKeyClass(OrderBean.class);
        job.setMapOutputValueClass(NullWritable.class);

        // 5 設定最終輸出資料的key和value型別
        job.setOutputKeyClass(OrderBean.class);
        job.setOutputValueClass(NullWritable.class);

        // 6 設定輸入資料和輸出資料路徑
        FileInputFormat.setInputPaths(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));

        // 8 設定reduce端的分組
    job.setGroupingComparatorClass(OrderGroupingComparator.class);

        // 7 提交
        boolean result = job.waitForCompletion(true);
        System.exit(result ? 0 : 1);
    }
}