hadoop二次排序實現join
阿新 • • 發佈:2019-02-19
package join;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.Iterator;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
import org.apache.hadoop.io.WritableUtils;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Partitioner;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.MultipleInputs;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class j {
public static class TextPair implements WritableComparable<TextPair>{
public Text first;
public Text second;
public TextPair(){
this.first=new Text();
this.second=new Text();
}
public TextPair(String f,String s) {
this.first=new Text(f);
this.second=new Text(s);
}
@Override
public void write(DataOutput out) throws IOException {
first.write(out);
second.write(out);
}
@Override
public void readFields(DataInput in) throws IOException {
first.readFields(in);
second.readFields(in);
}
@Override
public int compareTo(TextPair t) {
int i=first.compareTo(t.first);
if(i==0){
return second.compareTo(t.second);
}
return i;
}
}
public static class map1 extends Mapper<LongWritable, Text,TextPair, Text > {
public void map (LongWritable key,Text value,Context context) throws IOException, InterruptedException{
String[] s=value.toString().split(",");
context.write(new TextPair(s[0],"0"),new Text(s[1]));
}
}
public static class map2 extends Mapper<LongWritable, Text,TextPair, Text> {
public void map (LongWritable key,Text value,Context context) throws IOException, InterruptedException{
String[] s=value.toString().split(",");
context.write(new TextPair(s[0],"1"),new Text(s[1]));
}
}
public static class reduce extends Reducer<TextPair,Text,Text,Text>{
public void reduce(TextPair key,Iterable<Text> value,Context context) throws IOException, InterruptedException{
Iterator<Text> i=value.iterator();
Text t1=new Text(i.next());//注意此處必須用new複製一個物件,否則在下面的迭代中t1會改變,不再是第一個值
Text t2;
while(i.hasNext()){
t2=i.next();
context.write(key.first,new Text(t2.toString()+"\t"+t1.toString()));
System.out.println(t1.toString());
}
}
}
public int getPartition(TextPair key, Text value, int numpartition) {
return (key.first.hashCode()&Integer.MAX_VALUE)%numpartition;
}
}
public UDF_group(){
super(TextPair.class,true);//注意此處構造方法中的引數true是父類的建構函式中不可缺少的
}
public int compare(WritableComparable a,WritableComparable b){
if(a instanceof TextPair && b instanceof TextPair){
return ((TextPair)a).first.compareTo(((TextPair)b).first);
}
return super.compare(a, b);
}
}
public static void main(String args[]) throws IOException, ClassNotFoundException, InterruptedException{
Configuration con=new Configuration();
Job job=new Job(con);
job.setJarByClass(j.class);
MultipleInputs.addInputPath(job, new Path(args[0]), TextInputFormat.class,map1.class);
MultipleInputs.addInputPath(job, new Path(args[1]), TextInputFormat.class,map2.class);
FileOutputFormat.setOutputPath(job,new Path(args[2]));
job.setReducerClass(reduce.class);
job.setPartitionerClass(UDF_partition.class);
job.setGroupingComparatorClass(UDF_group.class);
job.setMapOutputKeyClass(TextPair.class);
job.setMapOutputValueClass(Text.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
System.exit(job.waitForCompletion(true)?0:1);
}
}
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.Iterator;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
import org.apache.hadoop.io.WritableUtils;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Partitioner;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.MultipleInputs;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class j {
public static class TextPair implements WritableComparable<TextPair>{
public Text first;
public Text second;
public TextPair(){
this.first=new Text();
this.second=new Text();
}
public TextPair(String f,String s) {
this.first=new Text(f);
this.second=new Text(s);
}
@Override
public void write(DataOutput out) throws IOException {
first.write(out);
second.write(out);
}
@Override
public void readFields(DataInput in) throws IOException {
first.readFields(in);
second.readFields(in);
}
@Override
public int compareTo(TextPair t) {
int i=first.compareTo(t.first);
if(i==0){
return second.compareTo(t.second);
}
return i;
}
}
public static class map1 extends Mapper<LongWritable, Text,TextPair, Text > {
public void map (LongWritable key,Text value,Context context) throws IOException, InterruptedException{
String[] s=value.toString().split(",");
context.write(new TextPair(s[0],"0"),new Text(s[1]));
}
}
public static class map2 extends Mapper<LongWritable, Text,TextPair, Text> {
public void map (LongWritable key,Text value,Context context) throws IOException, InterruptedException{
String[] s=value.toString().split(",");
context.write(new TextPair(s[0],"1"),new Text(s[1]));
}
}
public static class reduce extends Reducer<TextPair,Text,Text,Text>{
public void reduce(TextPair key,Iterable<Text> value,Context context) throws IOException, InterruptedException{
Iterator<Text> i=value.iterator();
Text t1=new Text(i.next());//注意此處必須用new複製一個物件,否則在下面的迭代中t1會改變,不再是第一個值
Text t2;
while(i.hasNext()){
t2=i.next();
context.write(key.first,new Text(t2.toString()+"\t"+t1.toString()));
System.out.println(t1.toString());
}
}
}
public static class UDF_partition extends Partitioner<TextPair,Text>{
//partiton發生在map階段最後,確定每個kv對的分組編號,發往不同的reduce
@Overridepublic int getPartition(TextPair key, Text value, int numpartition) {
return (key.first.hashCode()&Integer.MAX_VALUE)%numpartition;
}
}
public static class UDF_group extends WritableComparator{
//group發生在reduce之前,將((1,2),a),((1,3),b),((1,2),a),((2,2),d),((2,2),a)分組,變為k-list(v)的形式,k取同組的第一個k
super(TextPair.class,true);//注意此處構造方法中的引數true是父類的建構函式中不可缺少的
}
public int compare(WritableComparable a,WritableComparable b){
if(a instanceof TextPair && b instanceof TextPair){
return ((TextPair)a).first.compareTo(((TextPair)b).first);
}
return super.compare(a, b);
}
}
public static void main(String args[]) throws IOException, ClassNotFoundException, InterruptedException{
Configuration con=new Configuration();
Job job=new Job(con);
job.setJarByClass(j.class);
MultipleInputs.addInputPath(job, new Path(args[0]), TextInputFormat.class,map1.class);
MultipleInputs.addInputPath(job, new Path(args[1]), TextInputFormat.class,map2.class);
FileOutputFormat.setOutputPath(job,new Path(args[2]));
job.setReducerClass(reduce.class);
job.setPartitionerClass(UDF_partition.class);
job.setGroupingComparatorClass(UDF_group.class);
job.setMapOutputKeyClass(TextPair.class);
job.setMapOutputValueClass(Text.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
System.exit(job.waitForCompletion(true)?0:1);
}
}