hadoop mr 輔助排序
輔助排序
需求:訂單資料
求出每個訂單中最貴的商品?
訂單id正序,成交金額倒序。
結果檔案三個,每個結果檔案只要一條資料
public class OrderBean implements WritableComparable<OrderBean>{
private int order_id;
private double price;
public OrderBean(){}
public OrderBean(int order_id,double price){
this.order_id = order_id;
this.price = price;
}
getter;
setter;
//序列化
@Override
public void write(DataOutput out) throws IOException{
out.write(order_id);
out.write(price)
}
//反序列化
@Override
public void readFields(DataInput in) throws IOException{
order_id = in.readInt();
price = in.readDouble()
}
@Override
public String toString(){
return order_id +"\t"+ price;
}
//排序id,在比較價格
@Override
public int compareTo(OrderBean o){
int rs;
if (order_id>0.order_id) {
//id大的往下排
rs = 1;
} else if(order_id<0.order_id){
rs = -1;
} else {
//id相等,價格高的往上排
rs = price>o.getPrice()?-1:1;
}
}
}
public class OrderMapper extends Mapper<LongWritable, Text, OrderBean, NullWritable>{
public void map(LongWritable key, Text value, Context context)
throws IOException,InterruptException{
String line = value.toString();
String[] fields = line.split("\t");
Integer order_id = Integer.parseInt(fields[0]);
Double price = Double.parseDouble(fields[2]);
context.write(new OrderBean(order_id, price),NullWritable.get());
}
}
public class OrderPartitioner extends Partitioner<OrderBean, NullWritable>{
@Override
public int getPartition(OrderBean key, NullWritable value, int numPartitions){
return (key.getOrder_id & Integer.MAX_VALUE) % numPartitions;
}
}
//輔助排序
public class OrderGroupingComparator extends WritableComparator{
//構造必須加
protected OrderGroupingComparator(){
super(OrderBean.class,true);
}
public int compare(WritableComparable a,WritableComparable b){
OrderBean aBean = (OrderBean)a;
OrderBean bBean = (OrderBean)b;
int rs;
//id不同不是同一個物件
if(aBean.getOrder_id()>bBean.getOrder_id()){
rs = 1;
}else if(aBean.getOrder_id()<bBean.getOrder_id()){
rs = -1;
}else {
rs = 0;
}
return rs;
}
}
public class OrderReducer extends Reducer<OrderBean, NullWritable, OrderBean, NullWritable>{
public void reduce<OrdeBean key, Interable<NullWritable> values, Context context>
throws Exception{
context.write(key, NullWritable.get());
}
}
public class OrderDriver{
public static void main(String[] args) throws Exception{
Configuration conf = new Configuration();
Job job = Job.getInstance();
job.setJarByClass(OrderDriver.class);
job.setMapperClass(OrderMapper.class);
job.setReducerClass(OrderReducer.class);
job.setMapOutputKeyClass(OrderBean.class);
job.setMapOutputValueClass(NullWritable.class);
job.setOutputKeyClass(OrderBean.class);
job.setOutputValueClass(NullWritable.class);
//設定reducer段段分組
job.setGroupingComparatorClass(OrderGroupingComparator.class);
//分割槽,然後reduceTask個數
job.setPartitionerClass(OrderPartitioner.class);
job.setNumReduceTask(3);
FileInputFormat.setInputPaths(job, new Path("/c.txt"));
FileOutputFormat.setOutputPath(job, new Path("/out"));
Syso(job.waitForCompletion(true)?0:1);
}
}