MapReduce怎麼優雅地實現全域性排序
思考
想到全域性排序,是否第一想到的是,從map端收集資料,shuffle到reduce來,設定一個reduce,再對reduce中的資料排序,顯然這樣和單機器並沒有什麼區別,要知道mapreduce框架預設是對key來排序的,當然也可以將value放到key上面來達到對value排序,最後在reduce時候對調回去,另外排序是針對相同分割槽,即一個reduce來排序的,這樣其實也不能充分運用到叢集的並行,那麼如何更優雅地實現全域性排序呢?
摘要
hadoop中的排序分為部分排序,全域性排序,輔助排序,二次排序等,本文主要介紹如何實現key全域性排序,共有三種實現方式:
- 設定一個reduce
- 利用自定義partition 將資料按順序分批次分流到多個分割槽
- 利用框架自實現TotalOrderPartitioner 分割槽器來實現
實現
首先準備一些輸入資料:https://github.com/hulichao/bigdata-code/tree/master/data/job,如下,
/data/job/file.txt
2
32
654
32
15
756
65223
通過設定一 個reduce來實現全域性排序
利用一個reduce來實現全域性排序,可以說不需要做什麼特別的操作,mapper,reduce,driver實現如下:
package com.hoult.mr.job; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import java.io.IOException; public class JobMapper extends Mapper<LongWritable, Text, IntWritable, IntWritable> { @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { IntWritable intWritable = new IntWritable(Integer.parseInt(value.toString())); context.write(intWritable, intWritable); } }
package com.hoult.mr.job; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; public class JobReducer extends Reducer<IntWritable, IntWritable, IntWritable, IntWritable> { private int index = 0;//全域性排序計數器 @Override protected void reduce(IntWritable key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { for (IntWritable value : values) context.write(new IntWritable(++index), value); } }
package com.hoult.mr.job;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class JobDriver extends Configured implements Tool {
@Override
public int run(String[] args) throws Exception {
if (args.length != 2) {
System.err.println("input-path output-path");
System.exit(1);
}
Job job = Job.getInstance(getConf());
job.setJarByClass(JobDriver.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.setMapperClass(JobMapper.class);
job.setReducerClass(JobReducer.class);
job.setMapOutputKeyClass(IntWritable.class);
job.setMapOutputValueClass(IntWritable.class);
job.setOutputKeyClass(IntWritable.class);
job.setOutputValueClass(NullWritable.class);
//使用一個reduce來排序
job.setNumReduceTasks(1);
job.setJobName("JobDriver");
return job.waitForCompletion(true) ? 0 : 1;
}
public static void main(String[] args)throws Exception{
// int exitCode = ToolRunner.run(new JobDriver(), args);
int exitCode = ToolRunner.run(new JobDriver(), new String[] {"data/job/", "data/job/output"});
System.exit(exitCode);
}
}
//加了排序索引,最後輸出一個檔案,內容如下:
1 2
2 6
3 15
4 22
5 26
6 32
7 32
8 54
9 92
10 650
11 654
12 756
13 5956
14 65223
PS; 以上通過hadoop自帶的ToolRunner工具來啟動任務,後續程式碼涉及到重複的不再列出,只針對差異性的程式碼。
利用自定義partition 將資料按順序分批次分流到多個分割槽
通過自定義分割槽如何保證資料的全域性有序呢?我們知道key值分割槽,會通過預設分割槽函式HashPartition將不同範圍的key傳送到不同的reduce,所以利用這一點,這樣來實現分割槽器,例如有資料分佈在1-1億,可以將1-1000萬的資料讓reduce1來跑,1000萬+1-2000萬的資料來讓reduce2來跑。。。。最後可以對這十個檔案,按順序組合即可得到所有資料按分割槽有序的全域性排序資料,由於資料量較小,採用分11個分割槽,分別是1-1000,10001-2000,。跟第一種方式實現不同的有下面兩個點,
//partitionner實現
package com.hoult.mr.job;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Partitioner;
/**
* @author hulichao
* @date 20-9-20
**/
public class JobPartitioner extends Partitioner<IntWritable, IntWritable> {
@Override
public int getPartition(IntWritable key, IntWritable value, int numPartitions) {
int keyValue = Integer.parseInt(key.toString());
for (int i = 0; i < 10; i++) {
if (keyValue < 1000 * (i+1) && keyValue >= 1000 * (i-1)) {
System.out.println("key:" + keyValue + ", part:" + i);
return i;
}
}
return 10;
}
}
//driver處需要增加:
//設定自定義分割槽器
job.setPartitionerClass(JobPartitioner.class);
//driver處需要修改reduce數量
job.setNumReduceTasks(10);
執行程式,結果會產生10個檔案,檔案內有序。
part-r-00000
part-r-00001
part-r-00002
part-r-00003
part-r-00004
part-r-00005
part-r-00006
part-r-00007
part-r-00008
part-r-00009
注意:需要注意一點,partition含有資料的分割槽要小於等於reduce數,否則會包Illegal partiion錯誤。另外缺點分割槽的實現如果對資料知道較少可能會導致資料傾斜和OOM問題。
利用框架自實現TotalOrderPartitioner 分割槽器來實現
既然想到了第二種自定義方式,其實可以解決多數傾斜問題,但是實際上,在資料分佈不瞭解之前,對資料的分佈評估,只能去試,看結果值有哪些,進而自定義分割槽器,這不就是取樣嗎,針對取樣然後實現分割槽器這種方式,hadoop已經幫我們實現好了,並且解決了資料傾斜和OOM 問題,那就是TotalOrderPartitioner
類,其類提供了資料取樣器,對key值進行部分取樣,然後按照取樣結果尋找key值的最佳分割點,從而將key均勻分佈在不同分割槽中。
TotalOrderPartitioner
提供了三個取樣器如下:
- SplitSampler 分片取樣器,從資料分片中取樣資料,該取樣器不適合已經排好序的資料
- RandomSampler隨機取樣器,按照設定好的取樣率從一個數據集中取樣
- IntervalSampler間隔取樣機,以固定的間隔從分片中取樣資料,對於已經排好序的資料效果非常好
取樣器實現了K[] getSample(InputFormat<K,V> info, Job job) 方法,返回的是取樣陣列,其中InputFormat是map輸入端前面的輸入輔助類,根據返回的K[]的長度進而生成陣列長度-1個partition,最後按照分割點範圍將對應資料傳送到相應分割槽中。
程式碼實現:
//mapper和driver的型別略有不同
package com.hoult.mr.job.totalsort;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
/**
* @author hulichao
* @date 20-9-20
**/
public class TotalMapper extends Mapper<Text, Text, Text, IntWritable> {
@Override
protected void map(Text key, Text value,
Context context) throws IOException, InterruptedException {
System.out.println("key:" + key.toString() + ", value:" + value.toString());
context.write(key, new IntWritable(Integer.parseInt(key.toString())));
}
}
package com.hoult.mr.job.totalsort;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
/**
* @author hulichao
* @date 20-9-20
**/
public class TotalReducer extends Reducer<Text, IntWritable, IntWritable, NullWritable> {
@Override
protected void reduce(Text key, Iterable<IntWritable> values,
Context context) throws IOException, InterruptedException {
for (IntWritable value : values)
context.write(value, NullWritable.get());
}
}
//比較器
package com.hoult.mr.job.totalsort;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
/**
* 自定義比較器來比較key的順序
* @author hulichao
* @date 20-9-20
**/
public class KeyComparator extends WritableComparator {
protected KeyComparator() {
super(Text.class, true);
}
@Override
public int compare(WritableComparable w1, WritableComparable w2) {
int num1 = Integer.valueOf(w1.toString());
int num2 = Integer.valueOf(w2.toString());
return num1 - num2;
}
}
package com.hoult.mr.job.totalsort;
//driver 實現
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.partition.InputSampler;
import org.apache.hadoop.mapreduce.lib.partition.TotalOrderPartitioner;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
/**
* @author hulichao
* @date 20-9-20
**/
public class TotalDriver extends Configured implements Tool {
@Override
public int run(String[] args) throws Exception {
Configuration conf = new Configuration();
//設定非分割槽排序
conf.set("mapreduce.totalorderpartitioner.naturalorder", "false");
Job job = Job.getInstance(conf, "Total Driver");
job.setJarByClass(TotalDriver.class);
//設定讀取檔案的路徑,都是從HDFS中讀取。讀取檔案路徑從指令碼檔案中傳進來
FileInputFormat.addInputPath(job,new Path(args[0]));
//設定mapreduce程式的輸出路徑,MapReduce的結果都是輸入到檔案中
FileOutputFormat.setOutputPath(job,new Path(args[1]));
job.setInputFormatClass(KeyValueTextInputFormat.class);
//設定比較器,用於比較資料的大小,然後按順序排序,該例子主要用於比較兩個key的大小
job.setSortComparatorClass(KeyComparator.class);
job.setNumReduceTasks(10);//設定reduce數量
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
job.setOutputKeyClass(IntWritable.class);
job.setOutputValueClass(NullWritable.class);
//設定儲存partitions檔案的路徑
TotalOrderPartitioner.setPartitionFile(job.getConfiguration(), new Path(args[2]));
//key值取樣,0.01是取樣率,
InputSampler.Sampler<Text, Text> sampler = new InputSampler.RandomSampler<>(0.1, 3, 100);
//將取樣資料寫入到分割槽檔案中
InputSampler.writePartitionFile(job, sampler);
job.setMapperClass(TotalMapper.class);
job.setReducerClass(TotalReducer.class);
//設定分割槽類。
job.setPartitionerClass(TotalOrderPartitioner.class);
return job.waitForCompletion(true) ? 0 : 1;
}
public static void main(String[] args)throws Exception{
// int exitCode = ToolRunner.run(new TotalDriver(), new String[] {"data/job/input", "data/job/output", "data/job/partition","data/job/partitio2"});
int exitCode = ToolRunner.run(new TotalDriver(), args);
System.exit(exitCode);
}
}
結果和第二種實現類似,需要注意只在叢集測試時候才有效,本地測試可能會報錯
2020-09-20 16:36:10,664 WARN [org.apache.hadoop.util.NativeCodeLoader] - Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Exception in thread "main" java.lang.ArrayIndexOutOfBoundsException: 0
at com.hoult.mr.job.totalsort.TotalDriver.run(TotalDriver.java:32)
at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:76)
at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:90)
at com.hoult.mr.job.totalsort.TotalDriver.main(TotalDriver.java:60)