hadoop mr 全域性排序
全域性排序
需求:根據使用者每月使用的流量按照使用的流量多少排序
介面->WritableComparable
排序操作在hadoop中屬於預設的行為。預設按照字典順序排序。
排序的分類:
1)部分排序
2)全排序
3)輔助排序
4)二次排序
封裝類,直接完成排序
public class FlowBean implements WritalbeComparable<FlowBean>{
private long upFlow;
private long dfFlow;
private long flowsum;
pulibc FlowBean(){
}
public FlowBean(long upFlow, long dfFlow){
this.upFlow = upFlow;
this.dfFlow = dfFlow;
this.flowsum = upFlow+dfFlow;
}
setter
getter
@Override
public void readFields(DataInput in) throws Exception{
//反序列化
upFlow = in.readLong();
dfFlow = in.readLong();
flowsum = in.readLong();
}
@Override
public void write(DataOutput out) throws Exception{
//序列化
out.writeLong(upFlow);
out.writeLong(dfFlow);
out.writeLong(flowsum);
}
@Override
public int compareTo(FlowBean o){
return this.flowsum >o.getFolwSum()?-1:1;
}
@Override
public String toString(){
//倒序
return upFlow+"\t"+dfFlow+"\t"+flowsum;
}
}
public class FlowSortReducer extends Reducer<FlowBean, Text, Text,FlowBean>{
@Override
public void reduce<FlowBean key, Interater<Text> value, Context context>
throws IOException,InterruptException{
context.write(value.iterator().next(),key)
}
}
public class FlowSortMapper extends Mapper<LongWritable, Text, FlowBean, Text>{
@Override
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptException{
//1、獲取一行資料
String line = value.toString();
//2、切割
String[] fields = line.split("\t");
//3、取出關鍵欄位
long upFlow = Long.parseLong(fields[1]);
long dfFlow = Long.parseLong(fields[1]);
//4、寫出到reduce 階段
context.write(new FlowBean(upFlow,dfFlow), new Text(field[0]))
}
}
public class FlowSortDriver{
public static void main(String[] args){
Configuration conf = new Configuration();
Job job = Job.getInstance();
job.setJarByClass(FlowSortDriver.class);
job.setMapperClass(FlowSortMapper.class);
job.setReducerClass(FlowSortReducer.class);
job.setMapOutputKeyClass(FlowBean.class);
job.setMapOutputValueClass(Text.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(FlowBean.class);
FileInputFormat.setInputPaths(job,new Path("/wc/in"));
FileOutputFormat.setOutputPaths(job,new Paht("wc/out"));
System.out.pritnln(job.waitForCompletion(true)?0:1)
}
}
分割槽加排序
public class PhonenumPartitioner extends Partitioner<FlowBean, Text>{
@Override
public int getPartition(FlowBean key, Text value, int numPartitions){
//1、獲取手機號前三位
String phoneNum = value.toString().substring(0,3);
//2、分割槽
int partitioner = 4;
if("135".equals(phoneNum)){
return 0;
}else if("137".equals(phoneNum)){
return 1;
}else ("138".equals(phoneNum)){
return 2;
}
return partitioner;
}
}
Diriver類{
job.setPartitionerClass(PhonenumPartitioner.class);
job.setNumReduceTask(4); //設定的數量,要大於自定義的時候分割槽的數量
}