MapReduce在Reduce中實現LEFT JOIN
本文以訂單和商品演示如何實現left join。
一:準備資料
訂單資料表t_order:
id |
date |
pid |
amount |
1001 |
20150710 |
P0001 |
2 |
1002 |
20150710 |
P0001 |
3 |
1002 |
20150710 |
P0002 |
3 |
商品資訊表t_product
id |
pname |
category_id |
price |
P0001 |
小米5 |
1000 |
2000 |
P0002 |
錘子T1 |
1000 |
3000 |
實現類似如下sql的left join效果:
select a.id,a.date,b.name,b.category_id,b.price from t_order a join t_product b on a.pid = b.id
二:mapreduce實現
將訂單表的pid和商品表的id作為map階段的k2,為了實現上面sql輸出的效果,所以v2則是我們自定義的一個orderBean,封裝了sql輸出欄位,也就是封裝了兩張表的欄位
根據上下文物件context,從中獲取當前map讀取的檔名,從而v2就是OrderBean,利用讀取不同的檔案作為判斷條件,將不同表的資訊封裝到orderBean中,在reduce階段利用相同key在同一reduceTask,value合併的規則,可知,在reduce階段相同k2的v2是一個orderBean的集合,該集合中的orderBean分別儲存了訂單bean和商品bean,遍歷v2將組合orderBean進行輸出
import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.lib.input.FileSplit; import java.io.IOException; /** * 讀取多個檔案 * 自定義map */ public class OrderJoinMapper extends Mapper<LongWritable,Text,Text,OrderJoinBean>{ private OrderJoinBean orderJoinBean = new OrderJoinBean(); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { //通過獲取檔名來區分兩個不同的檔案 String[] split = value.toString().split(","); FileSplit inputSplit = (FileSplit) context.getInputSplit();//獲取輸入分割槽 String fileName = inputSplit.getPath().getName(); System.out.println("當前指定的檔名稱是:"+fileName); if("orders.txt".equals(fileName)){ //訂單資料 orderJoinBean.setId(split[0]); orderJoinBean.setDate(split[1]); orderJoinBean.setPid(split[2]); orderJoinBean.setAmount(split[3]); context.write(new Text(split[2]),orderJoinBean); }else{ //商品資料 orderJoinBean.setName(split[1]); orderJoinBean.setCategoryId(split[2]); orderJoinBean.setPrice(split[3]); context.write(new Text(split[0]),orderJoinBean); } } }
三:reduce實現
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
public class OrderJoinReduce extends Reducer<Text,OrderJoinBean,OrderJoinBean,NullWritable> {
private OrderJoinBean orderJoinBean;
@Override
protected void reduce(Text key, Iterable<OrderJoinBean> values, Context context) throws IOException, InterruptedException {
List<OrderJoinBean> temp = new ArrayList<>();
List<OrderJoinBean> temp1 = new ArrayList<>();
for (OrderJoinBean value : values) {//注意:temp集合不能直接新增value
orderJoinBean = new OrderJoinBean();
//相同的key的物件都發送到了這裡,在這裡將資料拼接完整
if(null !=value.getId() && !value.getId().equals("null") ){//通過自定義map可知,訂單資料設定了id,商品資料沒有設定id
orderJoinBean.setId(value.getId());
orderJoinBean.setDate(value.getDate());
orderJoinBean.setPid(value.getPid());
orderJoinBean.setAmount(value.getAmount());
temp.add(orderJoinBean);
}else{
orderJoinBean.setName(value.getName());
orderJoinBean.setCategoryId(value.getCategoryId());
orderJoinBean.setPrice(value.getPrice());
temp1.add(orderJoinBean);
}
}
if(temp != null && !temp.isEmpty()){
for (OrderJoinBean bean : temp) {
if(temp1 != null && !temp1.isEmpty()){
for (OrderJoinBean joinBean : temp1) {
bean.setName(joinBean.getName());
bean.setCategoryId(joinBean.getCategoryId());
bean.setPrice(joinBean.getPrice());
context.write(bean,NullWritable.get());
}
}else{
context.write(bean,NullWritable.get());
}
}
}
}
}
四:注意map端獲取檔名稱的方式:
//通過獲取檔名來區分兩個不同的檔案
FileSplit inputSplit = (FileSplit) context.getInputSplit();//獲取輸入分割槽
String fileName = inputSplit.getPath().getName();
System.out.println("當前指定的檔名稱是:"+fileName);