1. 程式人生 > >Hadoop MapReduce多表關聯查詢-案例

Hadoop MapReduce多表關聯查詢-案例

案例:將工廠名和所在地點輸出
        原始資料為factory.txt工廠庫和address.txt地址庫
    【factory.txt】如下:
        factoryname    addressId
        Beijing Red Star    1
        Shenzhen Thunder    3
        Guangzhou Honda    2
        Beijing Rising    1
        Guangzhou Development Bank    2
        Tencent    3
        Back of Beijing    1

        【address.txt】如下:
        addressID    addressname
        1    Beijing
        2    Guangzhou
        3    Shenzhen
        4    Xian

思路分析:
       a.關聯鍵ID
       Mapper:<k2,v2>-->  <1,"1,Beijing Red Star","1,Beijing Rising","1,Back of Beijing">
                           <1,"2,Beijing">
       Reducer:<k2,v2>-->  <1,["1,Beijing Red Star","1,Beijing Rising","1,Back of Beijing",1,"2,Beijing"]>

程式碼如下:

Mapper端:

package com.hyxy.hadoop.join.multi;

import java.io.IOException;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;

public class MultiJoinMapper extends Mapper<LongWritable, Text, Text, Text>{
	private Text _key = new Text();
	private Text _value = new Text();
	@Override
	protected void map(LongWritable key, Text value,Context context)throws IOException, InterruptedException {
		/*
		 * 先區分輸入行屬於哪個表,然後對兩列值進行分割,連線列儲存在key值,剩餘的列和表標誌儲存在value中,最後輸出
		 */
		String[] line = value.toString().split("\t");
		if(line.length!=2){
			return ;
		}
		//獲取檔名factory.txt(工廠名字,序號),address.txt(序號,地址)
		String filename = ((FileSplit)context.getInputSplit()).getPath().getName();
		if("factory.txt".equals(filename)){
			_key.set(line[1]);
			_value.set("1"+line[0]);
		}else{
			if("address.txt".equals(filename)){
				_key.set(line[0]);
				_value.set("2"+line[1]);
			}else {
				return ;
			}
		}
		context.write(_key, _value);	
	}

}

Reducer端:

package com.hyxy.hadoop.join.multi;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
/*
 * <1,["1,Beijing Red Star","1,Beijing Rising","1,Back of Beijing",1,"2,Beijing"]>
 */
public class MultiJoinReducer extends Reducer<Text, Text, Text, Text>{
	private static List<String> names = new ArrayList<String>();
	private static List<String> address = new ArrayList<String>();
	private Text _key = new Text();
	private Text _value = new Text();
	@Override
	protected void setup(org.apache.hadoop.mapreduce.Reducer.Context context)
			throws IOException, InterruptedException {
		context.write(new Text("factoryname"), new Text("addressname"));
	}
	@Override
	protected void reduce(Text key, Iterable<Text> values,Context context)
			throws IOException, InterruptedException {
		 for (Text value : values) {
			 String temp = value.toString();
			 if(temp.startsWith("1")){
				 names.add(temp.substring(1));
			 }else{
				 address.add(temp.substring(1));
			 }
		}
		 for(String n : names){
			 for (String a : address) {
				_key.set(n);
				_value.set(a);
				context.write(_key, _value);
			}
		 }
		 names.clear();
		 address.clear();
	}

}

Driver端:

package com.hyxy.hadoop.join.multi;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class MultiJoinDriver {
	public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
		Configuration conf = new Configuration();
		Path outfile = new Path("file:///G:/joinOut");
		FileSystem fs = outfile.getFileSystem(conf);
		if(fs.exists(outfile)){
			fs.delete(outfile,true);
		}
		Job job = Job.getInstance(conf);
		job.setJarByClass(MultiJoinDriver.class);
		job.setJobName("Join Demo");
		
		job.setMapperClass(MultiJoinMapper.class);
		job.setReducerClass(MultiJoinReducer.class);
		
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(Text.class);
		
		FileInputFormat.addInputPath(job, new Path("file:///G:/測試資料/多表關聯/"));
		FileOutputFormat.setOutputPath(job, outfile);
		System.exit(job.waitForCompletion(true)?0:1);
	}

}

輸出結果樣式:

factoryname	addressname
Back of Beijing	Beijing
Beijing Rising	Beijing
Beijing Red Star	Beijing
Guangzhou Development Bank	Guangzhou
Guangzhou Honda	Guangzhou
Tencent	Shenzhen
Shenzhen Thunder	Shenzhen