MapReduce之MapJoin案例
阿新 • • 發佈:2020-08-17
@[toc]
## **使用場景**
Map Join 適用於一張表十分小、一張表很大的場景。
## **優點**
思考:在Reduce 端處理過多的表,非常容易產生資料傾斜。怎麼辦?
在Map端快取多張表,提前處理業務邏輯,這樣增加Map 端業務,減少Reduce 端資料的壓力,儘可能的減少資料傾斜。
## **具體辦法**:採用`DistributedCache`
(1)在Mapper的setup階段,將檔案讀取到快取集合中。
(2)在驅動函式中載入快取。
```java
/快取普通檔案到Task執行節點。
job.addCacheFile(new URI("file://e:/cache/pd.txt");
```
## 案例
![在這裡插入圖片描述](https://img-blog.csdnimg.cn/20200817131815367.png?x-oss-process=image/watermark,type_ZmFuZ3poZW5naGVpdGk,shadow_10,text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L0RURlRf,size_16,color_FFFFFF,t_70#pic_center)
每個MapTask在map()中完成Join
==注意:==
- 只需要將要Join的資料order.txt作為切片,讓MapTask讀取
- pd.txt不以切片形式讀入,而直接在MapTask中使用HDFS下載此檔案,下載後,使用輸入流手動讀取其中的資料
- 在map()之前通常是將大檔案以切片形式讀取,小檔案手動讀取!
order.txt---->切片(orderId,pid,amount)----JoinMapper.map()
pd.txt----->切片(pid,pname)----JoinMapper.map()
## 需求分析
`MapJoin`適用於關聯表中有小表的情形
## 程式碼實現
JoinBean.java
```java
public class JoinBean {
private String orderId;
private String pid;
private String pname;
private String amount;
@Override
public String toString() {
return orderId + "\t" + pname + "\t" + amount ;
}
public String getOrderId() {
return orderId;
}
public void setOrderId(String orderId) {
this.orderId = orderId;
}
public String getPid() {
return pid;
}
public void setPid(String pid) {
this.pid = pid;
}
public String getPname() {
return pname;
}
public void setPname(String pname) {
this.pname = pname;
}
public String getAmount() {
return amount;
}
public void setAmount(String amount) {
this.amount = amount;
}
}
```
MapJoinMapper.java
```java
/*
* 1. 在Hadoop中,hadoop為MR提供了分散式快取
* ①用來快取一些Job執行期間的需要的檔案(普通檔案,jar,歸檔檔案(har))
* ②通過在Job的Configuration中,使用uri代替要快取的檔案
* ③分散式快取會假設當前的檔案已經上傳到了HDFS,並且在叢集的任意一臺機器都可以訪問到這個URI所代表的檔案
* ④分散式快取會在每個節點的task執行之前,提前將檔案傳送到節點
* ⑤分散式快取的高效是由於每個Job只會複製一次檔案,且可以自動在從節點對歸檔檔案解歸檔
*
*
*
*
*/
public class MapJoinMapper extends Mapper{
private JoinBean out_key=new JoinBean();
private Map pdDatas=new HashMap();
//在map之前手動讀取pd.txt中的內容
@Override
protected void setup(Mapper.Context context)
throws IOException, InterruptedException {
//從分散式快取中讀取資料
URI[] files = context.getCacheFiles();
for (URI uri : files) {
BufferedReader reader = new BufferedReader(new FileReader(new File(uri)));
String line="";
//迴圈讀取pd.txt中的每一行
while(StringUtils.isNotBlank(line=reader.readLine())) {
String[] words = line.split("\t");
pdDatas.put(words[0], words[1]);
}
reader.close();
}
}
//對切片中order.txt的資料進行join,輸出
@Override
protected void map(LongWritable key, Text value, Mapper.Context context)
throws IOException, InterruptedException {
String[] words = value.toString().split("\t");
out_key.setOrderId(words[0]);
out_key.setPname(pdDatas.get(words[1]));
out_key.setAmount(words[2]);
context.write(out_key, NullWritable.get());
}
}
```
MapJoinDriver.java
```java
public class MapJoinDriver {
public static void main(String[] args) throws Exception {
Path inputPath=new Path("e:/mrinput/mapjoin");
Path outputPath=new Path("e:/mroutput/mapjoin");
//作為整個Job的配置
Configuration conf = new Configuration();
//保證輸出目錄不存在
FileSystem fs=FileSystem.get(conf);
if (fs.exists(outputPath)) {
fs.delete(outputPath, true);
}
// ①建立Job
Job job = Job.getInstance(conf);
job.setJarByClass(MapJoinDriver.class);
// 為Job建立一個名字
job.setJobName("wordcount");
// ②設定Job
// 設定Job執行的Mapper,Reducer型別,Mapper,Reducer輸出的key-value型別
job.setMapperClass(MapJoinMapper.class);
// 設定輸入目錄和輸出目錄
FileInputFormat.setInputPaths(job, inputPath);
FileOutputFormat.setOutputPath(job, outputPath);
// 設定分散式快取
job.addCacheFile(new URI("file:///e:/pd.txt"));
//取消reduce階段
job.setNumReduceTasks(0);
// ③執行Job
job.waitForCompletion(true);
}
}
```