mapreduce控制map分割槽、reduce排序實現TopN
阿新 • • 發佈:2018-12-15
實現一個javabean類,並實現writablecomplle介面
public class OrderBean implements WritableComparable<OrderBean>{ private String orderId; private String userId; private String pdtName; private float price; private int number; private float amountFee; public void set(String orderId, String userId, String pdtName, float price, int number) { this.orderId = orderId; this.userId = userId; this.pdtName = pdtName; this.price = price; this.number = number; this.amountFee = price * number; } public String getOrderId() { return orderId; } public void setOrderId(String orderId) { this.orderId = orderId; } public String getUserId() { return userId; } public void setUserId(String userId) { this.userId = userId; } public String getPdtName() { return pdtName; } public void setPdtName(String pdtName) { this.pdtName = pdtName; } public float getPrice() { return price; } public void setPrice(float price) { this.price = price; } public int getNumber() { return number; } public void setNumber(int number) { this.number = number; } public float getAmountFee() { return amountFee; } public void setAmountFee(float amountFee) { this.amountFee = amountFee; } @Override public String toString() { return this.orderId + "," + this.userId + "," + this.pdtName + "," + this.price + "," + this.number + "," + this.amountFee; } @Override public void write(DataOutput out) throws IOException { out.writeUTF(this.orderId); out.writeUTF(this.userId); out.writeUTF(this.pdtName); out.writeFloat(this.price); out.writeInt(this.number); } @Override public void readFields(DataInput in) throws IOException { this.orderId = in.readUTF(); this.userId = in.readUTF(); this.pdtName = in.readUTF(); this.price = in.readFloat(); this.number = in.readInt(); this.amountFee = this.price * this.number; } // 比較規則:先比總金額,如果相同,再比商品名稱 @Override public int compareTo(OrderBean o) { return this.orderId.compareTo(o.getOrderId())==0?Float.compare(o.getAmountFee(), this.getAmountFee()):this.orderId.compareTo(o.getOrderId()); } }
mapreduce主程式
import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class OrderTopn { public static class OrderTopnMapper extends Mapper<LongWritable, Text, OrderBean, NullWritable>{ OrderBean orderBean = new OrderBean(); NullWritable v = NullWritable.get(); @Override protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, OrderBean, NullWritable>.Context context) throws IOException, InterruptedException { String[] fields = value.toString().split(","); orderBean.set(fields[0], fields[1], fields[2], Float.parseFloat(fields[3]), Integer.parseInt(fields[4])); context.write(orderBean,v); } } public static class OrderTopnReducer extends Reducer< OrderBean, NullWritable, OrderBean, NullWritable>{ /** * 雖然reduce方法中的引數key只有一個,但是隻要迭代器迭代一次,key中的值就會變 */ @Override protected void reduce(OrderBean key, Iterable<NullWritable> values, Reducer<OrderBean, NullWritable, OrderBean, NullWritable>.Context context) throws IOException, InterruptedException { int i=0; for (NullWritable v : values) { context.write(key, v); if(++i==3) return; } } } public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); // 預設只加載core-default.xml core-site.xml conf.setInt("order.top.n", 2); Job job = Job.getInstance(conf); job.setJarByClass(OrderTopn.class); job.setMapperClass(OrderTopnMapper.class); job.setReducerClass(OrderTopnReducer.class); job.setPartitionerClass(OrderIdPartitioner.class); job.setGroupingComparatorClass(OrderIdGroupingComparator.class); job.setNumReduceTasks(2); job.setMapOutputKeyClass(OrderBean.class); job.setMapOutputValueClass(NullWritable.class); job.setOutputKeyClass(OrderBean.class); job.setOutputValueClass(NullWritable.class); FileInputFormat.setInputPaths(job, new Path("F:\\mrdata\\order\\input")); FileOutputFormat.setOutputPath(job, new Path("F:\\mrdata\\order\\out-3")); job.waitForCompletion(true); } }
orderpartition控制分割槽,整合partitioner父類,重寫getpartition方法
import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.mapreduce.Partitioner; public class OrderIdPartitioner extends Partitioner<OrderBean, NullWritable>{ @Override public int getPartition(OrderBean key, NullWritable value, int numPartitions) { // 按照訂單中的orderid來分發資料 return (key.getOrderId().hashCode() & Integer.MAX_VALUE) % numPartitions; } }
控制reduce的排序規則
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
public class GroupingCompatrion extends WritableComparator{
public GroupingCompatrion() {
super(orderbean.class,true);
}
@Override
public int compare(WritableComparable a, WritableComparable b) {
// TODO Auto-generated method stub
orderbean o1 = (orderbean) a;
orderbean o2 = (orderbean) b;
return o1.getOrname().compareTo(o2.getOrname());
}
}