自定義分區實現全排序
阿新 • • 發佈:2019-04-07
hadoop utf shuf compareto on() dex rabl return main
數據準備
- 創建一個
creatadatas.sh
腳本
#!/bin/bash
for i in {1..1000};do
echo $RANDOM
done;
- 生成數據
$ sh createdatas.sh > data1
$ sh createdatas.sh > data2
$ sh createdatas.sh > data3
自定義分區
通過觀察,數據分布在
[0,35000]
這個區間,因此, 設置key >20000
為第0分區,key >10000
為第1分區,其他的為第2分區。
package com.hadoop.totasort; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.mapreduce.Partitioner; /** * 自定義分區 */ public class MyPartitioner extends Partitioner<IntWritable,IntWritable> { @Override public int getPartition(IntWritable key, IntWritable value, int numPartitions) { int keyInt = Integer.parseInt(key.toString()); if (keyInt >20000){ return 0; }else if (keyInt >10000){ return 1; }else { return 2; } } }
自定義排序
shuffle過程中默認的排序是
升序
的,我們需要的是倒排序
。
package com.hadoop.totasort; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.io.WritableComparator; /** * 自定義排序 */ public class MySort extends WritableComparator { public MySort(){ // 很重要的一個構造方法 super(IntWritable.class,true); } @Override public int compare(WritableComparable a, WritableComparable b) { IntWritable v1 = (IntWritable)a; IntWritable v2 = (IntWritable)b; return v2.compareTo(v1); } }
編寫Mapper類
package com.hadoop.totasort; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import java.io.IOException; public class TotalSortMapper extends Mapper<LongWritable, Text, IntWritable,IntWritable> { @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { int v = Integer.parseInt(value.toString()); // key使用v是為了分區,value使用v是為了傳值 context.write(new IntWritable(v),new IntWritable(v)); } }
編寫Reducer類
package com.hadoop.totasort;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
public class TotalSortReducer extends Reducer<IntWritable,IntWritable, NullWritable,IntWritable> {
@Override
protected void reduce(IntWritable key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
for (IntWritable value : values) {
context.write(NullWritable.get(),value);
}
}
}
編寫驅動類
package com.hadoop.totasort;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class TotalSortDriver {
public static void main(String[] args) throws Exception{
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
job.setJarByClass(TotalSortDriver.class);
job.setMapperClass(TotalSortMapper.class);
job.setReducerClass(TotalSortReducer.class);
//設置分區,排序,任務數
job.setPartitionerClass(MyPartitioner.class);
job.setSortComparatorClass(MySort.class);
job.setNumReduceTasks(3);
//其他設置
job.setMapOutputKeyClass(IntWritable.class);
job.setMapOutputValueClass(IntWritable.class);
job.setOutputKeyClass(NullWritable.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.setInputPaths(job,new Path("input/totalsort"));
FileOutputFormat.setOutputPath(job,new Path("output/totalsort"));
job.waitForCompletion(true);
}
}
運行結果
生成三個文件,
part-r-00000
、part-r-00001
、part-r-00002
自定義分區實現全排序