Hadoop之手寫原生態MapReduce的排序
阿新 • • 發佈:2018-11-10
測試資料:
2030 59
1976 68
2030 19
1997 5
年與溫度的文字,資料可以用java程式碼生成。
生成10000條資料程式碼:
public void makeData() throws IOException {
FileWriter fw = new FileWriter("e:/mr/tmp/temp.txt");
for (int i = 0; i < 10000;i++){
int year = 1970 + new Random().nextInt(100);
int temp = -30 + new Random().nextInt(100);
fw.write(""+year +" "+temp +"\r\n");
}
fw.close();
}
MapReduce全排序
1、應用場景
當需要從大量資料中獲取某一最大值最小值時,就得進行排序,這樣減少掉檢索的時間,優化了程式的執行效率。
2、實現方式
1、定義一個Reduce
2、自定義分割槽函式
3、使用hadoop取樣機制
3、程式碼
public static void main(String args[]) throws Exception {
Configuration conf = new Configuration();
conf.set("fs.defaultFS","file:///");
Job job = Job.getInstance(conf);
//設定job的各種屬性
job.setJobName("MaxTempApp"); //作業名稱
job.setJarByClass(MaxTempApp.class); //搜尋類
job.setInputFormatClass(SequenceFileInputFormat.class ); //設定輸入格式
//設定輸出格式類
//job.setOutputFormatClass(SequenceFileOutputFormat.class);
//新增輸入路徑
FileInputFormat.addInputPath(job, new Path(args[0]));
//設定輸出路徑
FileOutputFormat.setOutputPath(job, new Path(args[1]));
//設定最大切片數
//FileInputFormat.setMaxInputSplitSize(job,1024);
//設定最小切片數
//FileInputFormat.setMinInputSplitSize(job,1);
//設定合成類 --不能取平均值
//job.setCombinerClass(MaxTempReducer.class);
job.setMapperClass(MaxTempMapper.class); //mapper類
job.setReducerClass(MaxTempReducer.class); //reducer類
//可以設定reduce個數為1
job.setNumReduceTasks(3); //reducer個數
job.setMapOutputKeyClass(IntWritable.class);
job.setMapOutputValueClass(IntWritable.class);
job.setOutputKeyClass(IntWritable.class);
job.setOutputValueClass(IntWritable.class);
//建立隨機取樣器物件
//freq:每個key被選中的概率
//numSamples:抽取的樣本總數
//maxSplitsSampled:最大采樣切片數(分割槽數)
InputSampler.Sampler<IntWritable,IntWritable> sampler = new InputSampler.RandomSampler<IntWritable, IntWritable>(0.1,6000,3);
//生成的檔案value為空,key為取樣的區間 例如:本次測試的顯示內容2002年、2036年區間節點
//setPartitionFile(conf,path) 不要使用conf,設定job物件的conf(該物件的conf在底層重新建立)
TotalOrderPartitioner.setPartitionFile(job.getConfiguration(),new Path("d:/mr/par.lst"));
//設定全排序分割槽類
job.setPartitionerClass(TotalOrderPartitioner.class); //設定自定義分割槽
//將sampler寫入分割槽檔案
InputSampler.writePartitionFile(job,sampler);
job.waitForCompletion(true);
}
MapReduce二次排序
1、應用場景
由於MapReduce只能對key排序,當需求是獲取value的最大值最小值,對value進行排序稱之為二次排序。
2、實現方式
1、自定義key
實現org.apache.hadoop.io.WritableComparable介面
2、自定義分割槽類
繼承org.apache.hadoop.mapreduce.Partitioner類
3、定義分組對比起
繼承org.apache.hadoop.io.WritableComparator類
4、定義自定義key的排序對比器
繼承org.apache.hadoop.io.WritableComparator類
3、程式碼
自定義key
public class ComboKey implements WritableComparable<ComboKey> {
private int year ;
private int temp ;
public int getYear() {
return year;
}
public void setYear(int year) {
this.year = year;
}
public int getTemp() {
return temp;
}
public void setTemp(int temp) {
this.temp = temp;
}
/**
* 對key進行比較實現
*/
public int compareTo(ComboKey o) {
System.out.println("ComboKey.CompareTo "+ o.toString());
int y0 = o.getYear();
int t0 = o.getTemp() ;
//年份相同(升序)
if(year == y0){
//氣溫降序
return -(temp - t0) ;
}
else{
return year - y0 ;
}
}
/**
* 序列化過程
*/
public void write(DataOutput out) throws IOException {
//年份
out.writeInt(year);
//氣溫
out.writeInt(temp);
}
public void readFields(DataInput in) throws IOException {
year = in.readInt();
temp = in.readInt();
}
public String toString() {
return year+":"+temp;
}
}
自定義分割槽類
public class YearPartitioner extends Partitioner<ComboKey,NullWritable> {
public int getPartition(ComboKey key, NullWritable nullWritable, int numPartitions) {
int year = key.getYear();
return year % numPartitions;
}
}
自定義分組對比器
public class YearGroupComparator extends WritableComparator {
protected YearGroupComparator() {
super(ComboKey.class, true);
}
public int compare(WritableComparable a, WritableComparable b) {
System.out.println("YearGroupComparator"+a+","+b);
ComboKey k1 = (ComboKey)a ;
ComboKey k2 = (ComboKey)b ;
return k1.getYear() - k2.getYear() ;
}
}
自定義key排序對比器
public class ComboKeyComparator extends WritableComparator {
protected ComboKeyComparator() {
super(ComboKey.class, true);
}
public int compare(WritableComparable a, WritableComparable b) {
System.out.println("ComboKeyComparator"+a+","+b);
ComboKey k1 = (ComboKey) a;
ComboKey k2 = (ComboKey) b;
//對比方法在自定義key類中
return k1.compareTo(k2);
}
}
編寫Mapper
public class MaxTempMapper extends Mapper<LongWritable,Text,ComboKey,NullWritable> {
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
System.out.println("MaxTempMapper.map");
String line = value.toString();
String arr[] = line.split(" ");
ComboKey keyOut = new ComboKey();
keyOut.setYear(Integer.parseInt(arr[0]));
keyOut.setTemp(Integer.parseInt(arr[1]));
context.write(keyOut,NullWritable.get());
}
}
編寫Reduce
public class MaxTempReducer extends Reducer <ComboKey ,NullWritable, IntWritable ,IntWritable>{
protected void reduce(ComboKey key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {
int year = key.getYear();
int temp = key.getTemp();
context.write(new IntWritable(year),new IntWritable(temp));
}
}
編寫App
public class MaxTempApp {
public static void main(String args[]) throws Exception {
Configuration conf = new Configuration();
conf.set("fs.defaultFS","file:///");
Job job = Job.getInstance(conf);
//設定job的各種屬性
job.setJobName("SecondarySortApp"); //作業名稱
job.setJarByClass(MaxTempApp.class); //搜尋類
job.setInputFormatClass(TextInputFormat.class); //設定輸入格式
//新增輸入路徑
FileInputFormat.addInputPath(job,new Path(args[0]));
//設定輸出路徑
FileOutputFormat.setOutputPath(job,new Path(args[1]));
job.setMapperClass(MaxTempMapper.class); //mapper類
job.setReducerClass(MaxTempReducer.class); //reducer類
//設定Map輸出型別
job.setMapOutputKeyClass(ComboKey.class);
job.setMapOutputValueClass(NullWritable.class);
//設定ReduceOutput型別
job.setOutputKeyClass(IntWritable.class);
job.setOutputValueClass(IntWritable.class);
//設定分割槽類
job.setPartitionerClass(YearPartitioner.class);
//設定分組對比器
job.setGroupingComparatorClass(YearGroupComparator.class);
//設定排序對比器
job.setSortComparatorClass(ComboKeyComparator.class);
job.setNumReduceTasks(3); //reduce個數
job.waitForCompletion(true);
}
}