Hadoop 多表關聯
阿新 • • 發佈:2018-07-21
結果 inf tro 數據庫 割點 ava other interrupt orm
一、實例描述
多表關聯和單表關聯類似,它也是通過對原始數據進行一定的處理,從其中挖掘出關心的信息。下面進入這個實例。
輸入是兩個文件,一個代表工廠表,包含工廠名列和地址編號列;另一個代表地址列,包含地址名列和地址編號列。要求從輸入數據中找出工廠名和地址名的對應關系,輸出工廠名-地址名表。
樣例輸入:
factory:
factoryname addressed
Beijing Red Star 1
Shenzhen Thunder 3
Guangzhou Honda 2
Beijing Rising 1
Guangzhou Development Bank 2
Tencent 3
Bank of Beijing 1
address:
addressID addressname
1 Beijing
2 Guangzhou
3 Shenzhen
4 Xian
樣例輸出:
二、設計思路
多表關聯和單表關聯類似,都類似於數據庫中的自然連接。相比單表關聯,多表關聯的左右表和連接列更清楚,因此可以采用和單表關聯相同的處理方式。Map識別出輸入的行屬於哪個表之後,對其進行分割,將連接的值保存在key中,另一列和左右表標誌保存在value中,然後輸出。Reduce拿到連接結果後,解析value內容,根據標誌將左右表內容分開存放,然後求笛卡爾積,最後直接輸出。
這個實例的具體分析參考Hadoop 單表關聯博客,下面貼出代碼。
三、程序代碼
程序代碼如下:
1 import java.io.IOException; 2 import java.util.Iterator; 3 4 import org.apache.hadoop.conf.Configuration; 5 import org.apache.hadoop.fs.Path; 6 import org.apache.hadoop.io.Text; 7 import org.apache.hadoop.mapreduce.Job; 8 import org.apache.hadoop.mapreduce.Mapper;9 import org.apache.hadoop.mapreduce.Reducer; 10 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 11 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 12 import org.apache.hadoop.util.GenericOptionsParser; 13 14 15 public class MTjoin { 16 17 public static int time = 0; 18 19 public static class Map extends Mapper<Object, Text, Text, Text>{ 20 // 在Map中先區分輸入行屬於左表還是右表,然後對兩列值進行分割, 21 // 連接列保存在key值,剩余列和左右表標誌保存在value中,最後輸出 22 @Override 23 protected void map(Object key, Text value,Mapper<Object, Text, Text, Text>.Context context) 24 throws IOException, InterruptedException { 25 // super.map(key, value, context); 26 String line = value.toString(); 27 int i=0; 28 // 輸入文件首行,不處理 29 if(line.contains("factoryname")==true || line.contains("addressID")==true){ 30 return ; 31 } 32 // 找出數據中的分割點 33 while(line.charAt(i)>=‘9‘ || line.charAt(i)<=‘0‘){ 34 i++; 35 } 36 if (line.charAt(0)>=‘9‘||line.charAt(0)<=‘0‘) { 37 // 左表 38 int j = i-1; 39 while(line.charAt(j)!=‘ ‘) j--; 40 String [] values = {line.substring(0,j),line.substring(i)}; 41 context.write(new Text(values[1]), new Text("1+"+values[0])); 42 }else { 43 // 右表 44 int j = i+1; 45 while(line.charAt(j)!=‘ ‘) j++; 46 String[] values = {line.substring(0,i+1),line.substring(j)}; 47 context.write(new Text(values[0]), new Text("2"+values[1])); 48 } 49 } 50 } 51 52 public static class Reduce extends Reducer<Text, Text, Text, Text>{ 53 // Reduce解析Map輸出,將value中數據按照左右表分別保存,然後求 // 笛卡爾積,輸出 54 @Override 55 protected void reduce(Text key, Iterable<Text> values,Reducer<Text, Text, Text, Text>.Context context) 56 throws IOException, InterruptedException { 57 // super.reduce(arg0, arg1, arg2); 58 if (time==0) { 59 // 輸出文件第一行 60 context.write(new Text("factoryname"), new Text("addressname")); 61 time++; 62 } 63 int factorynum = 0; 64 String[] factory = new String[10]; 65 int addressnum = 0; 66 String[] address = new String[10]; 67 Iterator ite = values.iterator(); 68 while (ite.hasNext()) { 69 String record = ite.next().toString(); 70 int len = record.length(); 71 int i = 2; 72 char type = record.charAt(0); 73 String factoryname = new String(); 74 String addressname = new String(); 75 if (type==‘1‘) { 76 // 左表 77 factory[factorynum] = record.substring(2); 78 factorynum++; 79 }else { 80 // 右表 81 address[addressnum] = record.substring(2); 82 addressnum++; 83 } 84 } 85 if (factorynum != 0 && addressnum !=0) { 86 // 求笛卡爾積 87 for(int m=0;m<factorynum;m++){ 88 for(int n=0;n<addressnum;n++){ 89 context.write(new Text(factory[m]), new Text(address[n])); 90 } 91 } 92 } 93 } 94 } 95 96 public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { 97 98 Configuration conf = new Configuration(); 99 String[] otherArgs = new GenericOptionsParser(conf,args).getRemainingArgs(); 100 if(otherArgs.length!=2){ 101 System.out.println("Usage:wordcount <in> <out>"); 102 System.exit(2); 103 } 104 Job job = new Job(conf,"multiple table join"); 105 job.setJarByClass(MTjoin.class); 106 job.setMapperClass(Map.class); 107 job.setReducerClass(Reduce.class); 108 job.setOutputKeyClass(Text.class); 109 job.setOutputValueClass(Text.class); 110 FileInputFormat.addInputPath(job,new Path(otherArgs[0])); 111 FileOutputFormat.setOutputPath(job, new Path(otherArgs[1])); 112 System.exit(job.waitForCompletion(true)?0:1); 113 } 114 115 }
Hadoop 多表關聯