Hadoop 多表 join:map side join 範例
在沒有 pig 或者 hive 的環境下,直接在 mapreduce 中自己實現 join 是一件極其蛋疼的事情,MR中的join分為好幾種,比如有最常見的 reduce side join,map side join,semi join 等。今天我們要討論的是第 2 種:map side join,這種 join 在處理多個小表關聯大表時非常有用,而 reduce join 在處理多表關聯時是比較麻煩的,會造成大量的網路IO,效率低下。
1、原理:
之所以存在reduce side join,是因為在map階段不能獲取所有需要的join欄位,即:同一個key對應的欄位可能位於不同map中。但 Reduce side join是非常低效的,因為shuffle階段要進行大量的資料傳輸。Map side join是針對以下場景進行的優化:兩個待連線表中,有一個表非常大,而另一個表非常小,以至於小表可以直接存放到記憶體中。這樣,我們可以將小表複製多份,讓每個map task記憶體中存在一份(比如存放到hash table中),然後只掃描大表:對於大表中的每一條記錄key/value,在hash table中查詢是否有相同的key的記錄,如果有,則連線後輸出即可。為了支援檔案的複製,Hadoop提供了一個類DistributedCache,使用該類的方法如下:
(1)使用者使用靜態方法DistributedCache.addCacheFile()指定要複製的檔案,它的引數是檔案的URI(如果是HDFS上的檔案,可以這樣:hdfs://jobtracker:50030/home/XXX/file)。JobTracker在作業啟動之前會獲取這個URI列表,並將相應的檔案拷貝到各個TaskTracker的本地磁碟上。
(2)使用者使用DistributedCache.getLocalCacheFiles()方法獲取檔案目錄,並使用標準的檔案讀寫API讀取相應的檔案。
2、環境:
本例項需要的測試檔案及 hdfs 檔案存放目錄如下:
hadoop fs -ls /test/decli Found 4 items -rw-r--r-- 2 root supergroup 152 2013-03-06 02:05 /test/decli/login drwxr-xr-x - root supergroup 0 2013-03-06 02:45 /test/decli/output -rw-r--r-- 2 root supergroup 12 2013-03-06 02:12 /test/decli/sex -rw-r--r-- 2 root supergroup 72 2013-03-06 02:44 /test/decli/user
測試檔案內容分別為:
root@master 192.168.120.236 02:58:03 ~/test/table >
cat login # 登入表,需要判斷 uid 列是否有效,並得到對應使用者名稱、性別、訪問次數
1 0 20121213
2 0 20121213
3 1 20121213
4 1 20121213
1 0 20121114
2 0 20121114
3 1 20121114
4 1 20121114
1 0 20121213
1 0 20121114
9 0 20121114
root
測試環境 hadoop 版本:
echo $HADOOP_HOME
/work/hadoop-0.20.203.0
好了,廢話少說,上程式碼:
3、程式碼:
import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class MultiTableJoin extends Configured implements Tool {
public static class MapClass extends Mapper<LongWritable, Text, Text, Text> {
// 用於快取 sex、user 檔案中的資料
private Map<String, String> userMap = new HashMap<String, String>();
private Map<String, String> sexMap = new HashMap<String, String>();
private Text oKey = new Text();
private Text oValue = new Text();
private String[] kv;
// 此方法會在map方法執行之前執行
@Override
protected void setup(Context context) throws IOException,
InterruptedException {
BufferedReader in = null;
try {
// 從當前作業中獲取要快取的檔案
Path[] paths = DistributedCache.getLocalCacheFiles(context
.getConfiguration());
String uidNameAddr = null;
String sidSex = null;
for (Path path : paths) {
if (path.toString().contains("user")) {
in = new BufferedReader(new FileReader(path.toString()));
while (null != (uidNameAddr = in.readLine())) {
userMap.put(uidNameAddr.split("t", -1)[0],
uidNameAddr.split("t", -1)[1]);
}
} else if (path.toString().contains("sex")) {
in = new BufferedReader(new FileReader(path.toString()));
while (null != (sidSex = in.readLine())) {
sexMap.put(sidSex.split("t", -1)[0], sidSex.split(
"t", -1)[1]);
}
}
}
} catch (IOException e) {
e.printStackTrace();
} finally {
try {
if (in != null) {
in.close();
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
kv = value.toString().split("t");
// map join: 在map階段過濾掉不需要的資料
if (userMap.containsKey(kv[0]) && sexMap.containsKey(kv[1])) {
oKey.set(userMap.get(kv[0]) + "t" + sexMap.get(kv[1]));
oValue.set("1");
context.write(oKey, oValue);
}
}
}
public static class Reduce extends Reducer<Text, Text, Text, Text> {
private Text oValue = new Text();
public void reduce(Text key, Iterable<Text> values, Context context)
throws IOException, InterruptedException {
int sumCount = 0;
for (Text val : values) {
sumCount += Integer.parseInt(val.toString());
}
oValue.set(String.valueOf(sumCount));
context.write(key, oValue);
}
}
public int run(String[] args) throws Exception {
Job job = new Job(getConf(), "MultiTableJoin");
job.setJobName("MultiTableJoin");
job.setJarByClass(MultiTableJoin.class);
job.setMapperClass(MapClass.class);
job.setReducerClass(Reduce.class);
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
String[] otherArgs = new GenericOptionsParser(job.getConfiguration(),
args).getRemainingArgs();
// 我們把第1、2個引數的地址作為要快取的檔案路徑
DistributedCache.addCacheFile(new Path(otherArgs[1]).toUri(), job
.getConfiguration());
DistributedCache.addCacheFile(new Path(otherArgs[2]).toUri(), job
.getConfiguration());
FileInputFormat.addInputPath(job, new Path(otherArgs[3]));
FileOutputFormat.setOutputPath(job, new Path(otherArgs[4]));
return job.waitForCompletion(true) ? 0 : 1;
}
public static void main(String[] args) throws Exception {
int res = ToolRunner.run(new Configuration(), new MultiTableJoin(),
args);
System.exit(res);
}
}
執行命令:
hadoop jar MultiTableJoin.jar MultiTableJoin /test/decli/sex /test/decli/user /test/decli/login /test/decli/output
4、結果:
執行結果:
root@master 192.168.120.236 02:47:18 ~/test/table > hadoop fs -cat /test/decli/output/*|column -t cat: File does not exist: /test/decli/output/_logs 張三 男 4 李四 男 2 王五 女 2 趙六 女 2 root@master 192.168.120.236 02:47:26 ~/test/table >
TIPS:
更多關於 hadoop mapreduce 相關 join 介紹,請參考之前的博文:
MapReduce 中的兩表 join 幾種方案簡介
http://my.oschina.net/leejun2005/blog/95186
本例中用到了分散式快取,關於分散式快取的一些特性與原理,以及注意事項,
請參考:
HDFS 原理、架構與特性介紹