結合案例講解MapReduce重要知識點 --------- 多表連線
阿新 • • 發佈:2018-12-20
第一張表的內容:
login:
uid sexid logindate
1 1 2017-04-17 08:16:20
2 2 2017-04-15 06:18:20
3 1 2017-04-16 05:16:24
4 2 2017-04-14 03:18:20
5 1 2017-04-13 02:16:25
6 2 2017-04-13 01:15:20
7 1 2017-04-12 08:16:34
8 2 2017-04-11 09:16:20
9 0 2017-04-10 05:16:50
第二張表的內容:
sex:
0 不知道
1 男
2 女
第三張表的內容:
user uname 1 小紅 2 小行 3 小通 4 小閃 5 小鎮 6 小振 7 小秀 8 小微 9 小懂 10 小明 11 小剛 12 小舉 13 小黑 14 小白 15 小鵬 16 小習
最終輸出效果:
loginuid sex uname logindate 1 男 小紅 2017-04-17 08:16:20 2 女 小行 2017-04-15 06:18:20 3 男 小通 2017-04-16 05:16:24 4 女 小閃 2017-04-14 03:18:20 5 男 小鎮 2017-04-13 02:16:25 6 女 小振 2017-04-13 01:15:20 7 男 小秀 2017-04-12 08:16:34 9 不知道 小微 2017-04-10 05:16:50 8 女 小懂 2017-04-11 09:16:20
思路:
map端join:map端join
核心思想:將小表文件快取到分散式快取中,然後再map端進行連線處理。
適用場景:有一個或者多個小表 和 一個或者多個大表檔案。
優點:map端使用記憶體快取小表資料,載入速度快;大大減少map端到reduce端的傳輸量;大大較少shuffle過程耗時。
缺點:解決的業務需要有小表。
semi join:半連線
解決map端的缺點,當多個大檔案同時存在,且一個大檔案中有效資料抽取出來是小檔案時,
則可以單獨抽取出來並快取到分散式快取中,然後再使用map端join來進行連線。
自定義一個writable類User
import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import org.apache.hadoop.io.Writable; /** * user 資訊bean * @author lyd * */ public class User implements Writable{ public String uid; public String uname; public String gender; public String ldt; public User(){ } public User(String uid, String uname, String gender, String ldt) { this.uid = uid; this.uname = uname; this.gender = gender; this.ldt = ldt; } @Override public void write(DataOutput out) throws IOException { out.writeUTF(uid); out.writeUTF(uname); out.writeUTF(gender); out.writeUTF(ldt); } @Override public void readFields(DataInput in) throws IOException { this.uid = in.readUTF(); this.uname = in.readUTF(); this.gender = in.readUTF(); this.ldt = in.readUTF(); } /** * @return the uid */ public String getUid() { return uid; } /** * @param uid the uid to set */ public void setUid(String uid) { this.uid = uid; } /** * @return the uname */ public String getUname() { return uname; } /** * @param uname the uname to set */ public void setUname(String uname) { this.uname = uname; } /** * @return the gender */ public String getGender() { return gender; } /** * @param gender the gender to set */ public void setGender(String gender) { this.gender = gender; } /** * @return the ldt */ public String getLdt() { return ldt; } /** * @param ldt the ldt to set */ public void setLdt(String ldt) { this.ldt = ldt; } /* (non-Javadoc) * @see java.lang.Object#toString() */ @Override public String toString() { return uid + "\t" + uname + "\t" + gender + "\t" + ldt; } }
MapReduce類MultipleTableJoin
import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
import java.io.IOException;
import java.net.URI;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.Reducer.Context;
import org.apache.hadoop.mapreduce.filecache.DistributedCache;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class MultipleTableJoin extends ToolRunner implements Tool{
/**
* 自定義的myMapper
* @author lyd
*
*/
static class MyMapper extends Mapper<LongWritable, Text, User, NullWritable>{
Map<String,String> sexMap = new ConcurrentHashMap<String, String>();
Map<String,String> userMap = new ConcurrentHashMap<String, String>();
//讀取快取檔案
@Override
protected void setup(Context context)throws IOException, InterruptedException {
Path [] paths = DistributedCache.getLocalCacheFiles(context.getConfiguration());
for (Path p : paths) {
String fileName = p.getName();
if(fileName.equals("sex")){//讀取 “性別表”
BufferedReader sb = new BufferedReader(new FileReader(new File(p.toString())));
String str = null;
while((str = sb.readLine()) != null){
String [] strs = str.split("\t");
sexMap.put(strs[0], strs[1]);
}
sb.close();
} else if(fileName.equals("user")){//讀取“使用者表”
BufferedReader sb = new BufferedReader(new FileReader(new File(p.toString())));
String str = null;
while((str = sb.readLine()) != null){
String [] strs = str.split("\t");
userMap.put(strs[0], strs[1]);
}
sb.close();
}
}
}
@Override
protected void map(LongWritable key, Text value,Context context)
throws IOException, InterruptedException {
String line = value.toString();
String lines [] = line.split("\t");
String uid = lines[0];
String sexid = lines[1];
String logindate = lines[2];
//join連線操作
if(sexMap.containsKey(sexid) && userMap.containsKey(uid)){
String uname = userMap.get(uid);
String gender = sexMap.get(sexid);
//User user = new User(uid, uname, gender, logindate);
//context.write(new Text(uid+"\t"+uname+"\t"+gender+"\t"+logindate), NullWritable.get());
User user = new User(uid, uname, gender, logindate);
context.write(user, NullWritable.get());
}
}
@Override
protected void cleanup(Context context)throws IOException, InterruptedException {
}
}
/**
* 自定義MyReducer
* @author lyd
*
*/
/*static class MyReducer extends Reducer<Text, Text, Text, Text>{
@Override
protected void setup(Context context)throws IOException, InterruptedException {
}
@Override
protected void reduce(Text key, Iterable<Text> value,Context context)
throws IOException, InterruptedException {
}
@Override
protected void cleanup(Context context)throws IOException, InterruptedException {
}
}*/
@Override
public void setConf(Configuration conf) {
conf.set("fs.defaultFS", "hdfs://hadoop01:9000");
}
@Override
public Configuration getConf() {
return new Configuration();
}
/**
* 驅動方法
*/
@Override
public int run(String[] args) throws Exception {
//1、獲取conf物件
Configuration conf = getConf();
//2、建立job
Job job = Job.getInstance(conf, "model01");
//3、設定執行job的class
job.setJarByClass(MultipleTableJoin.class);
//4、設定map相關屬性
job.setMapperClass(MyMapper.class);
job.setMapOutputKeyClass(User.class);
job.setMapOutputValueClass(NullWritable.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
//設定快取檔案
job.addCacheFile(new URI(args[2]));
job.addCacheFile(new URI(args[3]));
// URI [] uris = {new URI(args[2]),new URI(args[3])};
// job.setCacheFiles(uris);
/* DistributedCache.addCacheFile(new URI(args[2]), conf);
DistributedCache.addCacheFile(new URI(args[3]), conf);*/
/*//5、設定reduce相關屬性
job.setReducerClass(MyReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);*/
//判斷輸出目錄是否存在,若存在則刪除
FileSystem fs = FileSystem.get(conf);
if(fs.exists(new Path(args[1]))){
fs.delete(new Path(args[1]), true);
}
FileOutputFormat.setOutputPath(job, new Path(args[1]));
//6、提交執行job
int isok = job.waitForCompletion(true) ? 0 : 1;
return isok;
}
/**
* job的主入口
* @param args
*/
public static void main(String[] args) {
try {
//對輸入引數作解析
String [] argss = new GenericOptionsParser(new Configuration(), args).getRemainingArgs();
System.exit(ToolRunner.run(new MultipleTableJoin(), argss));
} catch (Exception e) {
e.printStackTrace();
}
}
}