MapReduce序列化、分割槽、排序、分組
阿新 • • 發佈:2019-01-06
package com.cxy.flow; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class Flow { public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { Configuration conf = new Configuration(); FileSystem file = FileSystem.get(conf); if(file.exists(new Path(args[1]))){ file.delete(new Path(args[1]), true); } Job job = Job.getInstance(conf); job.setJarByClass(Flow.class); job.setJobName("flow"); job.setMapperClass(FlowMap.class); job.setReducerClass(FlowReduce.class); job.setMapOutputKeyClass(FlowBean.class); job.setMapOutputValueClass(NullWritable.class); job.setOutputKeyClass(FlowBean.class); job.setOutputValueClass(NullWritable.class); job.setGroupingComparatorClass(FlowGroup.class); job.setPartitionerClass(FlowPartitioner.class); job.setNumReduceTasks(6);; FileInputFormat.addInputPath(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); job.waitForCompletion(true); } }
2.Mapper類
package com.cxy.flow; import java.io.IOException; public class FlowMap extends Mapper<LongWritable, Text, FlowBean, NullWritable>{ @Override protected void map(LongWritable key, Text value,Context context) throws IOException, InterruptedException { String[] values = value.toString().split("\\|"); context.write(new FlowBean(values[0],Integer.parseInt(values[1]),Integer.parseInt(values[2])),NullWritable.get()); } }
3.Reduce
package com.cxy.flow; import java.io.IOException; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.mapreduce.Reducer; public class FlowReduce extends Reducer<FlowBean, NullWritable, FlowBean, NullWritable>{ @Override protected void reduce(FlowBean key, Iterable<NullWritable> values,Context context) throws IOException, InterruptedException { context.write(key, NullWritable.get()); } }
4.Bean類
package com.cxy.flow;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.WritableComparable;
public class FlowBean implements WritableComparable<FlowBean>{
private String num;
private int upflow;
private int downflow;
private int count;
@Override
public void write(DataOutput out) throws IOException {
out.writeUTF(this.num);
out.writeInt(this.upflow);
out.writeInt(this.downflow);
out.writeInt(this.count);
}
@Override
public void readFields(DataInput in) throws IOException {
this.num=in.readUTF();
this.upflow=in.readInt();
this.downflow=in.readInt();
this.count=in.readInt();
}
@Override
public int compareTo(FlowBean o) {
long l1 = Long.parseLong(this.getNum());
long l2 = Long.parseLong(o.getNum());
if(this.getNum().equals(o.getNum())){
if(this.getCount()>o.getCount()){
return -1;
}else if(this.getCount()<o.getCount()){
return 1;
}
}else if(l1<l2){
return -1;
}else if(l1>l2){
return 1;
}
return 0;
}
@Override
public String toString() {
return this.num+","+this.upflow+","+this.downflow+","+this.count;
}
public FlowBean() {
super();
}
public FlowBean(String num, int upflow, int downflow) {
super();
this.num = num;
this.upflow = upflow;
this.downflow = downflow;
this.count = upflow + downflow;
}
public FlowBean(String num) {
super();
this.num = num;
}
public String getNum() {
return num;
}
public void setNum(String num) {
this.num = num;
}
public int getUpflow() {
return upflow;
}
public void setUpflow(int upflow) {
this.upflow = upflow;
}
public int getDownflow() {
return downflow;
}
public void setDownflow(int downflow) {
this.downflow = downflow;
}
public int getCount() {
return count;
}
public void setCount(int count) {
this.count = count;
}
}
5.分割槽類
package com.cxy.flow;
import java.util.HashMap;
import java.util.Map;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Partitioner;
public class FlowPartitioner extends Partitioner<FlowBean, NullWritable>{
private static Map<String,Integer> map = new HashMap<String,Integer>();
static{
map.put("150", 1);
map.put("159", 2);
map.put("187", 3);
map.put("136", 4);
}
@Override
public int getPartition(FlowBean key, NullWritable value, int partinum) {
String num = key.getNum();
partinum = map.containsKey(num.substring(0, 3))?map.get(num.substring(0, 3)):0;
return partinum;
}
}
6.分組類
package com.cxy.flow;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
public class FlowGroup extends WritableComparator{
public FlowGroup() {
super(FlowBean.class,true);
}
@Override
public int compare(WritableComparable a, WritableComparable b) {
FlowBean t1 = (FlowBean) a;
FlowBean t2 = (FlowBean) b;
if(t1.getNum().equals(t2.getNum())){
return 0;
}else if(Long.parseLong(t1.getNum())>Long.parseLong(t2.getNum())){
return 1;
}
return -1;
}
}
7.資料
data.txt
136139*****|100|100
137139*****|200|500
138139*****|100|300
187139*****|300|100
136139*****|400|200
139139*****|500|100
138139*****|600|200
150139*****|100|100