MapReduce輔助排序
阿新 • • 發佈:2018-11-04
需求:訂單資料
求出每個訂單中最貴的商品?
訂單id正序,成交金額倒序。
結果檔案三個,每個結果檔案只要一條資料。
1.Mapper類
package com.css.order.mr; import java.io.IOException; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper;public class OrderMapper extends Mapper<LongWritable, Text, OrderBean, NullWritable>{ @Override protected void map(LongWritable key, Text value,Context context) throws IOException, InterruptedException { // 獲取每行資料 String line = value.toString();// 切分資料 String[] fields = line.split("\t"); // 取出欄位 Integer order_id = Integer.parseInt(fields[0]); Double price = Double.parseDouble(fields[2]); OrderBean orderBean = new OrderBean(order_id, price); // 輸出 context.write(orderBean, NullWritable.get()); } }
2.Reducer類
package com.css.order.mr; import java.io.IOException; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.mapreduce.Reducer; public class OrderReducer extends Reducer<OrderBean, NullWritable, OrderBean, NullWritable>{ @Override protected void reduce(OrderBean key, Iterable<NullWritable> values, Context context)throws IOException, InterruptedException { // 輸出 context.write(key, NullWritable.get()); } }
3.封裝類
package com.css.order.mr; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import org.apache.hadoop.io.WritableComparable; public class OrderBean implements WritableComparable<OrderBean>{ // 定義屬性 private int order_id; // 定義訂單id private double price; // 價格 public OrderBean(){ } public OrderBean(int order_id, double price) { super(); this.order_id = order_id; this.price = price; } public int getOrder_id() { return order_id; } public void setOrder_id(int order_id) { this.order_id = order_id; } public double getPrice() { return price; } public void setPrice(double price) { this.price = price; } // 序列化 @Override public void write(DataOutput out) throws IOException { out.writeInt(order_id); out.writeDouble(price); } // 反序列化 @Override public void readFields(DataInput in) throws IOException { order_id = in.readInt(); price = in.readDouble(); } @Override public String toString() { return order_id + "\t" + price; } // 排序 @Override public int compareTo(OrderBean o) { int rs; // 根據id排序 if (order_id > o.order_id) { // id 大的往下排 rs = 1; }else if (order_id < o.order_id) { // id小的往上排 rs = -1; }else { // id相等 價格高的往上排 rs = price > o.getPrice() ? -1 : 1; } return rs; } }
4.自定義分割槽類
package com.css.order.mr; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.mapreduce.Partitioner; public class OrderPartitioner extends Partitioner<OrderBean, NullWritable>{ @Override public int getPartition(OrderBean key, NullWritable value, int numPartitions) { return (key.getOrder_id() & Integer.MAX_VALUE) % numPartitions; } }
5.自定義排序分組類
package com.css.order.mr; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.io.WritableComparator; public class OrderGroupingComparator extends WritableComparator{ // 構造必須加 protected OrderGroupingComparator() { super(OrderBean.class, true); } // 重寫比較 @Override public int compare(WritableComparable a, WritableComparable b) { OrderBean aBean = (OrderBean) a; OrderBean bBean = (OrderBean) b; int rs; // id不同不是同一物件 if (aBean.getOrder_id() > bBean.getOrder_id()) { rs = 1; }else if (aBean.getOrder_id() < bBean.getOrder_id()) { rs = -1; }else { rs = 0; } return rs; } }
6.Driver類
package com.css.order.mr; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class OrderDriver { public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { // 1.獲取job資訊 Configuration conf = new Configuration(); Job job = Job.getInstance(conf); // 2.獲取jar包 job.setJarByClass(OrderDriver.class); // 3.獲取mapper與reducer job.setMapperClass(OrderMapper.class); job.setReducerClass(OrderReducer.class); // 4.定義mapper輸出型別 job.setMapOutputKeyClass(OrderBean.class); job.setMapOutputValueClass(NullWritable.class); // 5.定義reducer輸出型別 job.setOutputKeyClass(OrderBean.class); job.setOutputValueClass(NullWritable.class); // 6.設定reducer端的分組 job.setGroupingComparatorClass(OrderGroupingComparator.class); // 7.設定分割槽 job.setPartitionerClass(OrderPartitioner.class); // 8.設定reduceTask個數 job.setNumReduceTasks(3); // 9.設定資料的輸入與輸出 FileInputFormat.setInputPaths(job, new Path("c://in1026")); FileOutputFormat.setOutputPath(job, new Path("c://out1026")); // 10.提交任務 boolean rs = job.waitForCompletion(true); System.out.println(rs ? 0 : 1); } }
7.mr輸入檔案order.java
1001 Tmall_01 998 1001 Tmall_06 88.8 1001 Tmall_03 522.8 1002 Tmall_03 522.8 1002 Tmall_04 132.4 1002 Tmall_05 372.4 1003 Tmall_01 998 1003 Tmall_02 8.5 1003 Tmall_04 132.4
8.輸出檔案
(1)part-r-00000 1002 522.8 (2)part-r-00001 1003 998.0 (3)part-r-00002 1001 998.0