Mapreduce之多表關聯Join---(附例子)
阿新 • • 發佈:2019-01-24
需求:
address.txt: 1 Beijing 2 Guangzhou 3 Shenzhen 4 Xian factory.txt: Beijing Red Star 1 Shenzhen Thunder 3 Guangzhou Honda 2 Beijing Rising 1 Guangzhou Development Bank 2 Tencent 3 Back of Beijing 1 結果: factory city Beijing Red Star Beijing Shenzhen Thunder Shenzhen Guangzhou Honda Guangzhou Beijing Rising Beijing Guangzhou Development Bank Guangzhou Tencent Shenzhen Back of Beijing Beijing
分析:
map函式:<k2,v2>------<1,“1 ,Beijing Red Star ”> , <1, "1,Back of Beijing"> , <1,"0,Beijing">
reducer函式:<k2,v2>----<1,[ " 1 ,Beijing Red Star " ,"1,Back of Beijing", "0,Beijing" ]
關聯查詢需要一個標識位,我們需要利用標識位去得到所對應的value值。
1.Mapper.class
public class JoinMapper extends Mapper<LongWritable, Text, Text, Text> { @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String line=value.toString(); FileSplit inputsplit=(FileSplit)context.getInputSplit(); String filename= inputsplit.getPath().toString();//得到檔案路徑名 if(line.contains("address.txt")| line.contains("factory.txt")){ return ; } String[] _str=line.split("\t"); //切分 if(filename.endsWith("address.txt")){ context.write(new Text(_str[0]), new Text("1,"+_str[1])); }else{ // key作為標識位 //標識位 context.write(new Text(_str[1]),new Text("0,"+_str[0])); } } }
2.Reducer。class
public class JoinReduce extends Reducer<Text, Text, Text, Text> {
@Override
protected void setup(Context context)
throws IOException, InterruptedException {
context.write(new Text("工廠名"),new Text("城市"));
}//只執行一次
@Override
protected void reduce(Text key, Iterable<Text> values, Context context)
throws IOException, InterruptedException {
ArrayList<String> left=new ArrayList<String>();
ArrayList<String> right=new ArrayList<String>();
for(Text v:values){
if(v.toString().contains("1")){
left.add(v.toString().split(",")[1]);//city名
}else{
right.add(v.toString().split(",")[1]);//工廠名
}
}
for(int i=0;i<left.size();i++){//left 城市名
for(int j=0;j<right.size();j++){ //right 工廠名
context.write(new Text(right.get(j)), new Text(left.get(i)));
} //工廠名+ city
}
}
}
3.Driver.class
public class JoinDriver {
public static void main(String[] args) throws IllegalArgumentException, IOException, ClassNotFoundException, InterruptedException, URISyntaxException {
Configuration conf = new Configuration();
conf.set("mapred.job.queue.name", "order");
Path outfile = new Path("file:///D:/輸出結果/joinout");
FileSystem fs = outfile.getFileSystem(conf);
if(fs.exists(outfile)){
fs.delete(outfile,true);
}
Job job = Job.getInstance(conf);
job.setJarByClass(JoinDriver.class);
job.setJobName("Sencondary Sort");
job.setMapperClass(JoinMapper.class);
job.setReducerClass(JoinReduce.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
FileInputFormat.addInputPath(job, new Path("file:///D:/測試資料/join連線/"));
FileOutputFormat.setOutputPath(job,outfile);
System.exit(job.waitForCompletion(true)?0:1);
}
}
4.執行結果
address.txt:
1 Beijing
2 Guangzhou
3 Shenzhen
4 Xian
factory.txt:
Beijing Red Star 1
Shenzhen Thunder 3
Guangzhou Honda 2
Beijing Rising 1
Guangzhou Development Bank 2
Tencent 3
Back of Beijing 1
結果:
factory city
Beijing Red Star Beijing
Shenzhen Thunder Shenzhen
Guangzhou Honda Guangzhou
Beijing Rising Beijing
Guangzhou Development Bank Guangzhou
Tencent Shenzhen
Back of Beijing Beijing