hadoop中MapReduce的sort(部分排序,完全排序,二次排序)
1.部分排序
MapReduce預設就是在每個分割槽裡進行排序
2.完全排序
在所有的分割槽中,整體有序
1)使用一個reduce
2)自定義分割槽函式
不同的key進入的到不同的分割槽之中,在每個分割槽中自動排序,實現完全分割槽..
import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Partitioner; public class PassPartition extends Partitioner<Text, IntWritable> { @Override public int getPartition(Text text, IntWritable intWritable, int numPartitions) { String key = text.toString(); if (key.compareTo("xxx") < 0) { return 0; } if (key.compareTo("aaaa") < 0) { return 1; } else return 2; } }
3)取樣 //對於純文字資料支援不友好,純文字的輸入輸出格式建議使用KeyValueTextInputFormat
//1、設定分割槽類TotalOrderPartition(MR中存在此類 )
//2、初始化取樣器 => InputSampler.RandomSampler<Text,Text> sampler = new InputSampler.RandomSampler<Text,Text>(0.01,10);
SplitSampler
IntervalSampler
//3、設定取樣資料地址 => TotalOrderPartitioner.setPartitionFile(job.getConfiguration(),new Path("D:/"));
//4、寫入取樣資料 => InputSampler.writePartitionFile(job,sampler);
//5、注意1-4步必須寫在配置檔案之後,job執行之前
1.隨機取樣
比較耗費資源,浪費效能
2.切片取樣
3. 間隔取樣 :效能最好
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.partition.InputSampler;
import org.apache.hadoop.mapreduce.lib.partition.TotalOrderPartitioner;
public class PassApp {
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
conf.set("fs.defaultFS", "file:///");
FileSystem fs = FileSystem.get(conf);
//通過配置檔案初始化job
Job job = Job.getInstance(conf);
//設定job名稱
job.setJobName("word count");
//job入口函式類
job.setJarByClass(PassApp.class);
//設定mapper類
job.setMapperClass(PassMapper.class);
//設定reducer類
job.setReducerClass(PassReducer.class);
//設定全排序取樣類TotalOrderPartitioner
job.setPartitionerClass(TotalOrderPartitioner.class);
//設定map的輸出k-v型別
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
//設定reduce的輸出k-v型別
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
//FileInputFormat.setMaxInputSplitSize(job,10);
//FileInputFormat.setMinInputSplitSize(job,10);
job.setInputFormatClass(KeyValueTextInputFormat.class);
//設定輸入路徑
FileInputFormat.addInputPath(job, new Path("D:/wc/out"));
//設定輸出路徑
FileOutputFormat.setOutputPath(job, new Path("D:/wc/out4"));
if(fs.exists(new Path("D:/wc/out4"))){
fs.delete(new Path("D:/wc/out4"),true);
}
//設定三個reduce
job.setNumReduceTasks(3);
/**
* 隨機取樣,比較浪費效能,耗費資源
* @param freq 每個key被選擇的概率 ,大於取樣數(2) / 所有key數量(100)
* @param numSamples 所有切片中需要選擇的key數量
*/
//設定取樣器型別
InputSampler.RandomSampler<Text,Text> sampler = new InputSampler.RandomSampler<Text,Text>(0.001,8800);
//InputSampler.SplitSampler<Text,Text> sampler = new InputSampler.SplitSampler<Text,Text>(10,3);
//InputSampler.IntervalSampler<Text,Text> sampler = new InputSampler.IntervalSampler<Text,Text>(0.001);
//設定取樣資料地址
TotalOrderPartitioner.setPartitionFile(job.getConfiguration(),new Path("D:/wc/par/"));
//寫入取樣資料
InputSampler.writePartitionFile(job,sampler);
//執行job
boolean b = job.waitForCompletion(true);
}
}
3.二次排序
在MapReduce完成後,在對key排序的基礎上,再對value進行排序
以年度氣溫最高統計
1901 :10 20 30 50 40
1901:30 20 10 11 -8
對年份進行排序完成後,對氣溫再進行一個排序
實現方法:
1.自定義key,使年份_氣溫 變成一個key,自定義comkey 實現WritableComparable介面,實現自定義序列化和比較器(自定義排序演算法)
import org.apache.hadoop.io.WritableComparable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
public class CompKey implements WritableComparable<CompKey> {
private String year;
private int temp;
//定義排序規則
public int compareTo(CompKey o) {
String oyear = o.getYear();//第一個
String tyear = this.getYear();
int otemp = o.getTemp();
int ttemp = this.getTemp();
//如果引數year 和現在的year相同,則比較temp的大小
if (tyear.equals(oyear)) {
return otemp - ttemp;
}
//不同,返回兩個year的比較值
return tyear.compareTo(oyear);
}
public void write(DataOutput out) throws IOException {
out.writeUTF(year);
out.writeInt(temp);
}
public void readFields(DataInput in) throws IOException {
this.setYear(in.readUTF());
this.setTemp(in.readInt());
}
@Override
public String toString() {
return "CompKey{" +
"year='" + year + '\'' +
", temp=" + temp +
'}';
}
public CompKey(String year, int temp) {
this.year = year;
this.temp = temp;
}
public CompKey() {
}
public String getYear() {
return year;
}
public void setYear(String year) {
this.year = year;
}
public int getTemp() {
return temp;
}
public void setTemp(int temp) {
this.temp = temp;
}
}
2.自定義分組對比器,將所有指定的key變成一個key,也就是說1920 30 ,1920 40這兩個不同key識別成不同的key,這個分組對比器是在reduce端,重寫WritableComparator中的MyGroupComparator和compar
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
/**
* reduce端 分組對比器,自定義key業務邏輯,將1902 20 和1902 30 識別為一個key
*/
public class MyGroupComparator extends WritableComparator {
//必須寫,建立例項必須寫true
protected MyGroupComparator() {
super(CompKey.class, true);
}
//比較演算法
//只要year相等則證明兩個key相等
@Override
public int compare(WritableComparable a, WritableComparable b) {
CompKey ck1 = (CompKey) a;
CompKey ck2 = (CompKey) b;
return ck1.getYear().compareTo(ck2.getYear());
}
}
在Mainapp中註冊分組對比器
job.setGroupingComparatorClass(MyGroupComparator.class);
.