1. 程式人生 > >MapReduce序列化、分割槽、排序、分組

MapReduce序列化、分割槽、排序、分組

package com.cxy.flow;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;


public class Flow {
	public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
		Configuration conf = new Configuration();
		FileSystem file = FileSystem.get(conf);
		if(file.exists(new Path(args[1]))){
			file.delete(new Path(args[1]), true);
		}
		Job job = Job.getInstance(conf);
		job.setJarByClass(Flow.class);
		job.setJobName("flow");
		
		job.setMapperClass(FlowMap.class);
		job.setReducerClass(FlowReduce.class);
		
		job.setMapOutputKeyClass(FlowBean.class);
		job.setMapOutputValueClass(NullWritable.class);
		
		job.setOutputKeyClass(FlowBean.class);
		job.setOutputValueClass(NullWritable.class);
		
		job.setGroupingComparatorClass(FlowGroup.class);
		job.setPartitionerClass(FlowPartitioner.class);
		job.setNumReduceTasks(6);;
		
		FileInputFormat.addInputPath(job, new Path(args[0]));
		FileOutputFormat.setOutputPath(job, new Path(args[1]));
		
		job.waitForCompletion(true);
	}
}

2.Mapper類

package com.cxy.flow;

import java.io.IOException;


public class FlowMap extends Mapper<LongWritable, Text, FlowBean, NullWritable>{
	@Override
	protected void map(LongWritable key, Text value,Context context)
			throws IOException, InterruptedException {
		String[] values = value.toString().split("\\|");
		context.write(new FlowBean(values[0],Integer.parseInt(values[1]),Integer.parseInt(values[2])),NullWritable.get());
	}
}

3.Reduce

package com.cxy.flow;

import java.io.IOException;

import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Reducer;

public class FlowReduce extends Reducer<FlowBean, NullWritable, FlowBean, NullWritable>{
	@Override
	protected void reduce(FlowBean key, Iterable<NullWritable> values,Context context)
			throws IOException, InterruptedException {
		context.write(key, NullWritable.get());
	}
}

4.Bean類

package com.cxy.flow;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

import org.apache.hadoop.io.WritableComparable;

public class FlowBean implements WritableComparable<FlowBean>{
	private String num;
	private int upflow;
	private int downflow;
	private int count;

	@Override
	public void write(DataOutput out) throws IOException {
		out.writeUTF(this.num);
		out.writeInt(this.upflow);
		out.writeInt(this.downflow);
		out.writeInt(this.count);
	}
	@Override
	public void readFields(DataInput in) throws IOException {
		this.num=in.readUTF();
		this.upflow=in.readInt();
		this.downflow=in.readInt();
		this.count=in.readInt();
	}

	@Override
	public int compareTo(FlowBean o) {
		long l1 = Long.parseLong(this.getNum());
		long l2 = Long.parseLong(o.getNum());
		if(this.getNum().equals(o.getNum())){
			if(this.getCount()>o.getCount()){
				return -1;
			}else if(this.getCount()<o.getCount()){
				return 1;
			}
		}else if(l1<l2){
			return -1;
		}else if(l1>l2){
			return 1;
		}
		return 0;
	}
	@Override
	public String toString() {
		return this.num+","+this.upflow+","+this.downflow+","+this.count;
	}
	public FlowBean() {
		super();
	}
	public FlowBean(String num, int upflow, int downflow) {
		super();
		this.num = num;
		this.upflow = upflow;
		this.downflow = downflow;
		this.count = upflow + downflow;
	}
	public FlowBean(String num) {
		super();
		this.num = num;
	}
	public String getNum() {
		return num;
	}
	public void setNum(String num) {
		this.num = num;
	}
	public int getUpflow() {
		return upflow;
	}
	public void setUpflow(int upflow) {
		this.upflow = upflow;
	}
	public int getDownflow() {
		return downflow;
	}
	public void setDownflow(int downflow) {
		this.downflow = downflow;
	}
	public int getCount() {
		return count;
	}
	public void setCount(int count) {
		this.count = count;
	}
}

5.分割槽類

package com.cxy.flow;

import java.util.HashMap;
import java.util.Map;

import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Partitioner;

public class FlowPartitioner extends Partitioner<FlowBean, NullWritable>{
	
	private static Map<String,Integer> map = new HashMap<String,Integer>();
	static{
		map.put("150", 1);
		map.put("159", 2);
		map.put("187", 3);
		map.put("136", 4);
	}
	@Override
	public int getPartition(FlowBean key, NullWritable value, int partinum) {
		String num = key.getNum();
		partinum = map.containsKey(num.substring(0, 3))?map.get(num.substring(0, 3)):0;
		return partinum;
	}
}

6.分組類

package com.cxy.flow;

import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;

public class FlowGroup extends WritableComparator{
	public FlowGroup() {
		super(FlowBean.class,true);
	}
	
	@Override
	public int compare(WritableComparable a, WritableComparable b) {
		FlowBean t1 = (FlowBean) a;
		FlowBean t2 = (FlowBean) b;
		if(t1.getNum().equals(t2.getNum())){
			return 0;
		}else if(Long.parseLong(t1.getNum())>Long.parseLong(t2.getNum())){
			return 1;
		}
		return -1;
	}
}

7.資料
data.txt

136139*****|100|100
137139*****|200|500
138139*****|100|300
187139*****|300|100
136139*****|400|200
139139*****|500|100
138139*****|600|200
150139*****|100|100