1. 程式人生 > >Mapreduce之多表關聯Join---(附例子)

Mapreduce之多表關聯Join---(附例子)

需求:

 

address.txt:  

1    Beijing
2    Guangzhou
3    Shenzhen
4    Xian    
factory.txt:
Beijing Red Star    1
Shenzhen Thunder    3
Guangzhou Honda    2
Beijing Rising    1
Guangzhou Development Bank    2
Tencent    3
Back of Beijing    1

結果:
             factory                   city

             Beijing Red Star    Beijing
             Shenzhen Thunder    Shenzhen
             Guangzhou Honda    Guangzhou
             Beijing Rising    Beijing
             Guangzhou Development Bank    Guangzhou
              Tencent    Shenzhen
             Back of Beijing    Beijing

分析:
              map函式:<k2,v2>------<1,“1 ,Beijing Red Star ”>  ,  <1, "1,Back of Beijing">  , <1,"0,Beijing">
              reducer函式:<k2,v2>----<1,[ " 1 ,Beijing Red Star " ,"1,Back of Beijing", "0,Beijing" ]

             關聯查詢需要一個標識位,我們需要利用標識位去得到所對應的value值。
1.Mapper.class
 

public class JoinMapper extends Mapper<LongWritable, Text, Text, Text> {

	 @Override
	protected void map(LongWritable key, Text value, Context context)
			throws IOException, InterruptedException {
		 String line=value.toString();
		 FileSplit inputsplit=(FileSplit)context.getInputSplit();
		 String filename= inputsplit.getPath().toString();//得到檔案路徑名
		 if(line.contains("address.txt")| line.contains("factory.txt")){
			 return ;
		 }
		 String[] _str=line.split("\t");  //切分
		 if(filename.endsWith("address.txt")){
			 context.write(new Text(_str[0]), new Text("1,"+_str[1]));
		 }else{         //   key作為標識位                  //標識位
			 context.write(new Text(_str[1]),new Text("0,"+_str[0]));
		 }
	 }
}

2.Reducer。class

public class JoinReduce extends Reducer<Text, Text, Text, Text> {
	@Override
	protected void setup(Context context)
			throws IOException, InterruptedException {
			 context.write(new Text("工廠名"),new Text("城市"));
	}//只執行一次
	@Override
	protected void reduce(Text key, Iterable<Text> values, Context context)
			throws IOException, InterruptedException {
		 ArrayList<String> left=new ArrayList<String>();
		 ArrayList<String> right=new ArrayList<String>();
		for(Text v:values){
			if(v.toString().contains("1")){
				left.add(v.toString().split(",")[1]);//city名
			}else{
				right.add(v.toString().split(",")[1]);//工廠名
			}
		} 
		for(int i=0;i<left.size();i++){//left 城市名
			for(int j=0;j<right.size();j++){  //right 工廠名
				context.write(new Text(right.get(j)), new Text(left.get(i)));
			}		//工廠名+ city
		}
	}
}

3.Driver.class

public class JoinDriver {
	public static void main(String[] args) throws IllegalArgumentException, IOException, ClassNotFoundException, InterruptedException, URISyntaxException {
		Configuration conf = new Configuration();
		conf.set("mapred.job.queue.name", "order");
		 Path outfile = new Path("file:///D:/輸出結果/joinout");
		FileSystem fs = outfile.getFileSystem(conf);
		if(fs.exists(outfile)){
			fs.delete(outfile,true);
		}
		Job job = Job.getInstance(conf);
		job.setJarByClass(JoinDriver.class);
		job.setJobName("Sencondary Sort");
		job.setMapperClass(JoinMapper.class);  
	    job.setReducerClass(JoinReduce.class);
	 
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(Text.class);
		 
		FileInputFormat.addInputPath(job, new Path("file:///D:/測試資料/join連線/"));
		FileOutputFormat.setOutputPath(job,outfile);
		
		System.exit(job.waitForCompletion(true)?0:1);
	}
}

4.執行結果

address.txt:  

        1    Beijing
        2    Guangzhou
        3    Shenzhen
        4    Xian    
factory.txt:
        Beijing Red Star    1
        Shenzhen Thunder    3
        Guangzhou Honda    2
        Beijing Rising    1
        Guangzhou Development Bank    2
        Tencent    3
        Back of Beijing    1

 結果:
             factory             city

             Beijing Red Star    Beijing
             Shenzhen Thunder    Shenzhen
             Guangzhou Honda    Guangzhou
             Beijing Rising    Beijing
             Guangzhou Development Bank    Guangzhou
              Tencent    Shenzhen
             Back of Beijing    Beijing

總結:join解決表關聯查詢的時候,特別要鎖定標識位,通常作為key,去比較篩選所得的value,最後context.write(),寫出