1. 程式人生 > >MapReduce輔助排序

MapReduce輔助排序

需求:訂單資料

    求出每個訂單中最貴的商品?

    訂單id正序,成交金額倒序。
    結果檔案三個,每個結果檔案只要一條資料。

1.Mapper類

package com.css.order.mr;

import java.io.IOException;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class OrderMapper extends Mapper<LongWritable, Text, OrderBean, NullWritable>{ @Override protected void map(LongWritable key, Text value,Context context) throws IOException, InterruptedException { // 獲取每行資料 String line = value.toString();
// 切分資料 String[] fields = line.split("\t"); // 取出欄位 Integer order_id = Integer.parseInt(fields[0]); Double price = Double.parseDouble(fields[2]); OrderBean orderBean = new OrderBean(order_id, price); // 輸出 context.write(orderBean, NullWritable.get()); } }

2.Reducer類

package com.css.order.mr;

import java.io.IOException;

import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Reducer;

public class OrderReducer extends Reducer<OrderBean, NullWritable, OrderBean, NullWritable>{
    @Override
    protected void reduce(OrderBean key, Iterable<NullWritable> values,
            Context context)throws IOException, InterruptedException {
        // 輸出
        context.write(key, NullWritable.get());
    }
}

3.封裝類

package com.css.order.mr;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

import org.apache.hadoop.io.WritableComparable;

public class OrderBean implements WritableComparable<OrderBean>{

    // 定義屬性
    private int order_id; // 定義訂單id
    private double price; // 價格
    
    public OrderBean(){        
    }
    
    public OrderBean(int order_id, double price) {
        super();
        this.order_id = order_id;
        this.price = price;
    }
    
    public int getOrder_id() {
        return order_id;
    }

    public void setOrder_id(int order_id) {
        this.order_id = order_id;
    }

    public double getPrice() {
        return price;
    }

    public void setPrice(double price) {
        this.price = price;
    }
    
    // 序列化
    @Override
    public void write(DataOutput out) throws IOException {
        out.writeInt(order_id);
        out.writeDouble(price);
    }

    // 反序列化
    @Override
    public void readFields(DataInput in) throws IOException {
        order_id = in.readInt();
        price = in.readDouble();
    }

    @Override
    public String toString() {
        return order_id + "\t" + price;
    }

    // 排序
    @Override
    public int compareTo(OrderBean o) {
        int rs;
        // 根據id排序
        if (order_id > o.order_id) {
            // id 大的往下排
            rs = 1;
        }else if (order_id < o.order_id) {
            // id小的往上排
            rs = -1;
        }else {
            // id相等 價格高的往上排
            rs = price > o.getPrice() ? -1 : 1;
        }
        return rs;
    }
    
}

4.自定義分割槽類

package com.css.order.mr;

import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Partitioner;

public class OrderPartitioner extends Partitioner<OrderBean, NullWritable>{

    @Override
    public int getPartition(OrderBean key, NullWritable value, int numPartitions) {
        return (key.getOrder_id() & Integer.MAX_VALUE) % numPartitions;
    }
}

5.自定義排序分組類

package com.css.order.mr;

import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;

public class OrderGroupingComparator extends WritableComparator{

    // 構造必須加
    protected OrderGroupingComparator() {
        super(OrderBean.class, true);
    }

    // 重寫比較
    @Override
    public int compare(WritableComparable a, WritableComparable b) {
        OrderBean aBean = (OrderBean) a;
        OrderBean bBean = (OrderBean) b;
        int rs;
        // id不同不是同一物件
        if (aBean.getOrder_id() > bBean.getOrder_id()) {
            rs = 1;
        }else if (aBean.getOrder_id() < bBean.getOrder_id()) {
            rs = -1;
        }else {
            rs = 0;
        }
        return rs;
    }
}

6.Driver類

package com.css.order.mr;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class OrderDriver {
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        // 1.獲取job資訊
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf);
        
        // 2.獲取jar包
        job.setJarByClass(OrderDriver.class);
        
        // 3.獲取mapper與reducer
        job.setMapperClass(OrderMapper.class);
        job.setReducerClass(OrderReducer.class);
        
        // 4.定義mapper輸出型別
        job.setMapOutputKeyClass(OrderBean.class);
        job.setMapOutputValueClass(NullWritable.class);
        
        // 5.定義reducer輸出型別
        job.setOutputKeyClass(OrderBean.class);
        job.setOutputValueClass(NullWritable.class);
        
        // 6.設定reducer端的分組
        job.setGroupingComparatorClass(OrderGroupingComparator.class);
        
        // 7.設定分割槽
        job.setPartitionerClass(OrderPartitioner.class);
        
        // 8.設定reduceTask個數
        job.setNumReduceTasks(3);
        
        // 9.設定資料的輸入與輸出
        FileInputFormat.setInputPaths(job, new Path("c://in1026"));
        FileOutputFormat.setOutputPath(job, new Path("c://out1026"));
        
        // 10.提交任務
        boolean rs = job.waitForCompletion(true);
        System.out.println(rs ? 0 : 1);
    }
}

7.mr輸入檔案order.java

1001    Tmall_01    998
1001    Tmall_06    88.8
1001    Tmall_03    522.8
1002    Tmall_03    522.8
1002    Tmall_04    132.4
1002    Tmall_05    372.4
1003    Tmall_01    998
1003    Tmall_02    8.5
1003    Tmall_04    132.4

8.輸出檔案

(1)part-r-00000
1002    522.82)part-r-00001
1003    998.03)part-r-00002
1001    998.0