1. 程式人生 > >hadoop mr 輔助排序

hadoop mr 輔助排序

輔助排序
    需求:訂單資料
    求出每個訂單中最貴的商品?

    訂單id正序,成交金額倒序。
    結果檔案三個,每個結果檔案只要一條資料


    public class OrderBean implements WritableComparable<OrderBean>{
        private int order_id;
        private double price;

        public OrderBean(){}

        public OrderBean(int order_id,double price){
            this.order_id = order_id;
            this.price = price;
        }

        getter;
        setter;

        //序列化
        @Override
        public void write(DataOutput out) throws IOException{
            out.write(order_id);
            out.write(price)
        }
        //反序列化
        @Override
        public void readFields(DataInput in) throws IOException{
            order_id = in.readInt();
            price = in.readDouble()
        }
        @Override
        public String toString(){
            return order_id +"\t"+ price;
        }
        //排序id,在比較價格
        @Override
        public int compareTo(OrderBean o){
            int rs;

            if (order_id>0.order_id) {
                //id大的往下排
                rs = 1;
            } else if(order_id<0.order_id){
                rs = -1;
            } else {
                //id相等,價格高的往上排
                rs = price>o.getPrice()?-1:1;
            }
        }
    }

    public class OrderMapper extends Mapper<LongWritable, Text, OrderBean, NullWritable>{
        public void map(LongWritable key, Text value, Context context)
                throws IOException,InterruptException{

            String line = value.toString();

            String[] fields = line.split("\t");

            Integer order_id = Integer.parseInt(fields[0]);
            Double price = Double.parseDouble(fields[2]);

            context.write(new OrderBean(order_id, price),NullWritable.get());
        } 
    }
    public class OrderPartitioner extends Partitioner<OrderBean, NullWritable>{
        @Override
        public int getPartition(OrderBean key, NullWritable value, int numPartitions){
            return (key.getOrder_id & Integer.MAX_VALUE) % numPartitions;
        }
    }
    //輔助排序
    public class OrderGroupingComparator extends WritableComparator{
        //構造必須加
        protected OrderGroupingComparator(){
            super(OrderBean.class,true);
        }

        public int compare(WritableComparable a,WritableComparable b){
            OrderBean aBean = (OrderBean)a;
            OrderBean bBean = (OrderBean)b;
            int rs;

            //id不同不是同一個物件
            if(aBean.getOrder_id()>bBean.getOrder_id()){
                rs = 1;
            }else if(aBean.getOrder_id()<bBean.getOrder_id()){
                rs = -1;
            }else {
                rs = 0;
            }
            return rs;
        }


    }
    public class OrderReducer extends Reducer<OrderBean, NullWritable, OrderBean, NullWritable>{
        public void reduce<OrdeBean key, Interable<NullWritable> values, Context context>
                throws Exception{

            context.write(key, NullWritable.get());
        }
    }
    public class OrderDriver{
        public static void main(String[] args) throws Exception{
            Configuration conf = new Configuration();
            Job job = Job.getInstance();

            job.setJarByClass(OrderDriver.class);

            job.setMapperClass(OrderMapper.class);
            job.setReducerClass(OrderReducer.class);

            job.setMapOutputKeyClass(OrderBean.class);
            job.setMapOutputValueClass(NullWritable.class);

            job.setOutputKeyClass(OrderBean.class);
            job.setOutputValueClass(NullWritable.class);

            //設定reducer段段分組
            job.setGroupingComparatorClass(OrderGroupingComparator.class);

            //分割槽,然後reduceTask個數
            job.setPartitionerClass(OrderPartitioner.class);
            job.setNumReduceTask(3);

            FileInputFormat.setInputPaths(job, new Path("/c.txt"));
            FileOutputFormat.setOutputPath(job, new Path("/out"));

            Syso(job.waitForCompletion(true)?0:1);
        }
    }