Hadoop MapReduce多表關聯查詢-案例
案例:將工廠名和所在地點輸出
原始資料為factory.txt工廠庫和address.txt地址庫
【factory.txt】如下:
factoryname addressId
Beijing Red Star 1
Shenzhen Thunder 3
Guangzhou Honda 2
Beijing Rising 1
Guangzhou Development Bank 2
Tencent 3
Back of Beijing 1
【address.txt】如下:
addressID addressname
1 Beijing
2 Guangzhou
3 Shenzhen
4 Xian
思路分析:
a.關聯鍵ID
Mapper:<k2,v2>--> <1,"1,Beijing Red Star","1,Beijing Rising","1,Back of Beijing">
<1,"2,Beijing">
Reducer:<k2,v2>--> <1,["1,Beijing Red Star","1,Beijing Rising","1,Back of Beijing",1,"2,Beijing"]>
程式碼如下:
Mapper端:
package com.hyxy.hadoop.join.multi; import java.io.IOException; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.FileInputFormat; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.lib.input.FileSplit; public class MultiJoinMapper extends Mapper<LongWritable, Text, Text, Text>{ private Text _key = new Text(); private Text _value = new Text(); @Override protected void map(LongWritable key, Text value,Context context)throws IOException, InterruptedException { /* * 先區分輸入行屬於哪個表,然後對兩列值進行分割,連線列儲存在key值,剩餘的列和表標誌儲存在value中,最後輸出 */ String[] line = value.toString().split("\t"); if(line.length!=2){ return ; } //獲取檔名factory.txt(工廠名字,序號),address.txt(序號,地址) String filename = ((FileSplit)context.getInputSplit()).getPath().getName(); if("factory.txt".equals(filename)){ _key.set(line[1]); _value.set("1"+line[0]); }else{ if("address.txt".equals(filename)){ _key.set(line[0]); _value.set("2"+line[1]); }else { return ; } } context.write(_key, _value); } }
Reducer端:
package com.hyxy.hadoop.join.multi;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
/*
* <1,["1,Beijing Red Star","1,Beijing Rising","1,Back of Beijing",1,"2,Beijing"]>
*/
public class MultiJoinReducer extends Reducer<Text, Text, Text, Text>{
private static List<String> names = new ArrayList<String>();
private static List<String> address = new ArrayList<String>();
private Text _key = new Text();
private Text _value = new Text();
@Override
protected void setup(org.apache.hadoop.mapreduce.Reducer.Context context)
throws IOException, InterruptedException {
context.write(new Text("factoryname"), new Text("addressname"));
}
@Override
protected void reduce(Text key, Iterable<Text> values,Context context)
throws IOException, InterruptedException {
for (Text value : values) {
String temp = value.toString();
if(temp.startsWith("1")){
names.add(temp.substring(1));
}else{
address.add(temp.substring(1));
}
}
for(String n : names){
for (String a : address) {
_key.set(n);
_value.set(a);
context.write(_key, _value);
}
}
names.clear();
address.clear();
}
}
Driver端:
package com.hyxy.hadoop.join.multi;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class MultiJoinDriver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
Configuration conf = new Configuration();
Path outfile = new Path("file:///G:/joinOut");
FileSystem fs = outfile.getFileSystem(conf);
if(fs.exists(outfile)){
fs.delete(outfile,true);
}
Job job = Job.getInstance(conf);
job.setJarByClass(MultiJoinDriver.class);
job.setJobName("Join Demo");
job.setMapperClass(MultiJoinMapper.class);
job.setReducerClass(MultiJoinReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
FileInputFormat.addInputPath(job, new Path("file:///G:/測試資料/多表關聯/"));
FileOutputFormat.setOutputPath(job, outfile);
System.exit(job.waitForCompletion(true)?0:1);
}
}
輸出結果樣式:
factoryname addressname
Back of Beijing Beijing
Beijing Rising Beijing
Beijing Red Star Beijing
Guangzhou Development Bank Guangzhou
Guangzhou Honda Guangzhou
Tencent Shenzhen
Shenzhen Thunder Shenzhen