1. 程式人生 > 實用技巧 >MapReduce怎麼優雅地實現全域性排序

MapReduce怎麼優雅地實現全域性排序

思考

想到全域性排序,是否第一想到的是,從map端收集資料,shuffle到reduce來,設定一個reduce,再對reduce中的資料排序,顯然這樣和單機器並沒有什麼區別,要知道mapreduce框架預設是對key來排序的,當然也可以將value放到key上面來達到對value排序,最後在reduce時候對調回去,另外排序是針對相同分割槽,即一個reduce來排序的,這樣其實也不能充分運用到叢集的並行,那麼如何更優雅地實現全域性排序呢?

摘要

hadoop中的排序分為部分排序,全域性排序,輔助排序,二次排序等,本文主要介紹如何實現key全域性排序,共有三種實現方式:

  1. 設定一個reduce
  2. 利用自定義partition 將資料按順序分批次分流到多個分割槽
  3. 利用框架自實現TotalOrderPartitioner 分割槽器來實現

實現

首先準備一些輸入資料:https://github.com/hulichao/bigdata-code/tree/master/data/job,如下,

/data/job/file.txt
2
32
654
32
15
756
65223

通過設定一 個reduce來實現全域性排序

利用一個reduce來實現全域性排序,可以說不需要做什麼特別的操作,mapper,reduce,driver實現如下:

package com.hoult.mr.job;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

public class JobMapper extends Mapper<LongWritable, Text, IntWritable, IntWritable> {
    @Override
    protected void map(LongWritable key, Text value,
                       Context context) throws IOException, InterruptedException {
        IntWritable intWritable = new IntWritable(Integer.parseInt(value.toString()));
        context.write(intWritable, intWritable);
    }
}
package com.hoult.mr.job;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

public class JobReducer  extends
        Reducer<IntWritable, IntWritable, IntWritable, IntWritable> {

    private int index = 0;//全域性排序計數器
    @Override
    protected void reduce(IntWritable key, Iterable<IntWritable> values,
                          Context context) throws IOException, InterruptedException {
        for (IntWritable value : values)
            context.write(new IntWritable(++index), value);
    }
}
package com.hoult.mr.job;

import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class JobDriver extends Configured implements Tool {
    @Override
    public int run(String[] args) throws Exception {
        if (args.length != 2) {
            System.err.println("input-path output-path");
            System.exit(1);
        }

        Job job = Job.getInstance(getConf());
        job.setJarByClass(JobDriver.class);
        FileInputFormat.addInputPath(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));

        job.setMapperClass(JobMapper.class);
        job.setReducerClass(JobReducer.class);
        job.setMapOutputKeyClass(IntWritable.class);
        job.setMapOutputValueClass(IntWritable.class);
        job.setOutputKeyClass(IntWritable.class);
        job.setOutputValueClass(NullWritable.class);
        //使用一個reduce來排序
        job.setNumReduceTasks(1);
        job.setJobName("JobDriver");
        return job.waitForCompletion(true) ? 0 : 1;
    }

    public static void main(String[] args)throws Exception{

//        int exitCode = ToolRunner.run(new JobDriver(), args);
        int exitCode = ToolRunner.run(new JobDriver(), new String[] {"data/job/", "data/job/output"});
        System.exit(exitCode);
    }
}

//加了排序索引,最後輸出一個檔案,內容如下:
1	2
2	6
3	15
4	22
5	26
6	32
7	32
8	54
9	92
10	650
11	654
12	756
13	5956
14	65223

PS; 以上通過hadoop自帶的ToolRunner工具來啟動任務,後續程式碼涉及到重複的不再列出,只針對差異性的程式碼。

利用自定義partition 將資料按順序分批次分流到多個分割槽

通過自定義分割槽如何保證資料的全域性有序呢?我們知道key值分割槽,會通過預設分割槽函式HashPartition將不同範圍的key傳送到不同的reduce,所以利用這一點,這樣來實現分割槽器,例如有資料分佈在1-1億,可以將1-1000萬的資料讓reduce1來跑,1000萬+1-2000萬的資料來讓reduce2來跑。。。。最後可以對這十個檔案,按順序組合即可得到所有資料按分割槽有序的全域性排序資料,由於資料量較小,採用分11個分割槽,分別是1-1000,10001-2000,。跟第一種方式實現不同的有下面兩個點,

//partitionner實現
package com.hoult.mr.job;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Partitioner;

/**
 * @author hulichao
 * @date 20-9-20
 **/
public class JobPartitioner extends Partitioner<IntWritable, IntWritable> {
    @Override
    public int getPartition(IntWritable key, IntWritable value, int numPartitions) {
        int keyValue = Integer.parseInt(key.toString());

        for (int i = 0; i < 10; i++) {
            if (keyValue < 1000 * (i+1) && keyValue >= 1000 * (i-1)) {
                System.out.println("key:" + keyValue + ", part:" + i);
                return i;
            }
        }

        return 10;
    }
}

//driver處需要增加:
        //設定自定義分割槽器
        job.setPartitionerClass(JobPartitioner.class);
        
//driver處需要修改reduce數量
        job.setNumReduceTasks(10);

執行程式,結果會產生10個檔案,檔案內有序。

part-r-00000
part-r-00001
part-r-00002
part-r-00003
part-r-00004
part-r-00005
part-r-00006
part-r-00007
part-r-00008
part-r-00009

注意:需要注意一點,partition含有資料的分割槽要小於等於reduce數,否則會包Illegal partiion錯誤。另外缺點分割槽的實現如果對資料知道較少可能會導致資料傾斜和OOM問題。

利用框架自實現TotalOrderPartitioner 分割槽器來實現

既然想到了第二種自定義方式,其實可以解決多數傾斜問題,但是實際上,在資料分佈不瞭解之前,對資料的分佈評估,只能去試,看結果值有哪些,進而自定義分割槽器,這不就是取樣嗎,針對取樣然後實現分割槽器這種方式,hadoop已經幫我們實現好了,並且解決了資料傾斜和OOM 問題,那就是TotalOrderPartitioner類,其類提供了資料取樣器,對key值進行部分取樣,然後按照取樣結果尋找key值的最佳分割點,從而將key均勻分佈在不同分割槽中。

TotalOrderPartitioner提供了三個取樣器如下:

  • SplitSampler 分片取樣器,從資料分片中取樣資料,該取樣器不適合已經排好序的資料
  • RandomSampler隨機取樣器,按照設定好的取樣率從一個數據集中取樣
  • IntervalSampler間隔取樣機,以固定的間隔從分片中取樣資料,對於已經排好序的資料效果非常好

取樣器實現了K[] getSample(InputFormat<K,V> info, Job job) 方法,返回的是取樣陣列,其中InputFormat是map輸入端前面的輸入輔助類,根據返回的K[]的長度進而生成陣列長度-1個partition,最後按照分割點範圍將對應資料傳送到相應分割槽中。

程式碼實現:

//mapper和driver的型別略有不同
package com.hoult.mr.job.totalsort;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

/**
 * @author hulichao
 * @date 20-9-20
 **/
public class TotalMapper extends Mapper<Text, Text, Text, IntWritable> {
    @Override
    protected void map(Text key, Text value,
                       Context context) throws IOException, InterruptedException {
        System.out.println("key:" + key.toString() + ", value:" + value.toString());
        context.write(key, new IntWritable(Integer.parseInt(key.toString())));
    }
}
package com.hoult.mr.job.totalsort;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

/**
 * @author hulichao
 * @date 20-9-20
 **/
public class TotalReducer extends Reducer<Text, IntWritable, IntWritable, NullWritable> {
    @Override
    protected void reduce(Text key, Iterable<IntWritable> values,
                          Context context) throws IOException, InterruptedException {
        for (IntWritable value : values)
            context.write(value, NullWritable.get());
    }
}
//比較器
package com.hoult.mr.job.totalsort;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;

/**
 * 自定義比較器來比較key的順序
 * @author hulichao
 * @date 20-9-20
 **/
public class KeyComparator extends WritableComparator {
    protected KeyComparator() {
        super(Text.class, true);
    }

    @Override
    public int compare(WritableComparable w1, WritableComparable w2) {
        int num1 = Integer.valueOf(w1.toString());
        int num2 = Integer.valueOf(w2.toString());
        return num1 - num2;
    }
}
package com.hoult.mr.job.totalsort;

//driver 實現
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.partition.InputSampler;
import org.apache.hadoop.mapreduce.lib.partition.TotalOrderPartitioner;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

/**
 * @author hulichao
 * @date 20-9-20
 **/
public class TotalDriver extends Configured implements Tool {
    @Override
    public int run(String[] args) throws Exception {
        Configuration conf = new Configuration();
        //設定非分割槽排序
        conf.set("mapreduce.totalorderpartitioner.naturalorder", "false");
        Job job = Job.getInstance(conf, "Total Driver");
        job.setJarByClass(TotalDriver.class);

        //設定讀取檔案的路徑,都是從HDFS中讀取。讀取檔案路徑從指令碼檔案中傳進來
        FileInputFormat.addInputPath(job,new Path(args[0]));
        //設定mapreduce程式的輸出路徑,MapReduce的結果都是輸入到檔案中
        FileOutputFormat.setOutputPath(job,new Path(args[1]));
        job.setInputFormatClass(KeyValueTextInputFormat.class);
        //設定比較器,用於比較資料的大小,然後按順序排序,該例子主要用於比較兩個key的大小
        job.setSortComparatorClass(KeyComparator.class);
        job.setNumReduceTasks(10);//設定reduce數量

        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);
        job.setOutputKeyClass(IntWritable.class);
        job.setOutputValueClass(NullWritable.class);

        //設定儲存partitions檔案的路徑
        TotalOrderPartitioner.setPartitionFile(job.getConfiguration(), new Path(args[2]));
        //key值取樣,0.01是取樣率,
        InputSampler.Sampler<Text, Text> sampler = new InputSampler.RandomSampler<>(0.1, 3, 100);
        //將取樣資料寫入到分割槽檔案中
        InputSampler.writePartitionFile(job, sampler);

        job.setMapperClass(TotalMapper.class);
        job.setReducerClass(TotalReducer.class);
        //設定分割槽類。
        job.setPartitionerClass(TotalOrderPartitioner.class);
        return job.waitForCompletion(true) ? 0 : 1;
    }
    public static void main(String[] args)throws Exception{
//        int exitCode = ToolRunner.run(new TotalDriver(), new String[] {"data/job/input", "data/job/output", "data/job/partition","data/job/partitio2"});
        int exitCode = ToolRunner.run(new TotalDriver(), args);
        System.exit(exitCode);
    }
}

結果和第二種實現類似,需要注意只在叢集測試時候才有效,本地測試可能會報錯

2020-09-20 16:36:10,664 WARN [org.apache.hadoop.util.NativeCodeLoader] - Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Exception in thread "main" java.lang.ArrayIndexOutOfBoundsException: 0
	at com.hoult.mr.job.totalsort.TotalDriver.run(TotalDriver.java:32)
	at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:76)
	at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:90)
	at com.hoult.mr.job.totalsort.TotalDriver.main(TotalDriver.java:60)