1. 程式人生 > >MapReduce在Reduce中實現LEFT JOIN

MapReduce在Reduce中實現LEFT JOIN

本文以訂單和商品演示如何實現left join。

一:準備資料

訂單資料表t_order:

id

date

pid

amount

1001

20150710

P0001

2

1002

20150710

P0001

3

1002

20150710

P0002

3

 

商品資訊表t_product

id

pname

category_id

price

P0001

小米5

1000

2000

P0002

錘子T1

1000

3000

實現類似如下sql的left join效果:

select  a.id,a.date,b.name,b.category_id,b.price from t_order a join t_product b on a.pid = b.id

二:mapreduce實現

將訂單表的pid和商品表的id作為map階段的k2,為了實現上面sql輸出的效果,所以v2則是我們自定義的一個orderBean,封裝了sql輸出欄位,也就是封裝了兩張表的欄位

根據上下文物件context,從中獲取當前map讀取的檔名,從而v2就是OrderBean,利用讀取不同的檔案作為判斷條件,將不同表的資訊封裝到orderBean中,在reduce階段利用相同key在同一reduceTask,value合併的規則,可知,在reduce階段相同k2的v2是一個orderBean的集合,該集合中的orderBean分別儲存了訂單bean和商品bean,遍歷v2將組合orderBean進行輸出


import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;

import java.io.IOException;

/**
 * 讀取多個檔案
 * 自定義map
 */
public class OrderJoinMapper extends Mapper<LongWritable,Text,Text,OrderJoinBean>{
    private OrderJoinBean orderJoinBean = new OrderJoinBean();
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        //通過獲取檔名來區分兩個不同的檔案
        String[] split = value.toString().split(",");
        FileSplit inputSplit = (FileSplit) context.getInputSplit();//獲取輸入分割槽
        String fileName = inputSplit.getPath().getName();
        System.out.println("當前指定的檔名稱是:"+fileName);
        if("orders.txt".equals(fileName)){
            //訂單資料
            orderJoinBean.setId(split[0]);
            orderJoinBean.setDate(split[1]);
            orderJoinBean.setPid(split[2]);
            orderJoinBean.setAmount(split[3]);
            context.write(new Text(split[2]),orderJoinBean);
        }else{
            //商品資料
            orderJoinBean.setName(split[1]);
            orderJoinBean.setCategoryId(split[2]);
            orderJoinBean.setPrice(split[3]);
            context.write(new Text(split[0]),orderJoinBean);
        }
    }
}

三:reduce實現


import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

public class OrderJoinReduce extends Reducer<Text,OrderJoinBean,OrderJoinBean,NullWritable> {
    private OrderJoinBean orderJoinBean;
    @Override
    protected void reduce(Text key, Iterable<OrderJoinBean> values, Context context) throws IOException, InterruptedException {
        List<OrderJoinBean> temp = new ArrayList<>();
        List<OrderJoinBean> temp1 = new ArrayList<>();
         for (OrderJoinBean value : values) {//注意:temp集合不能直接新增value
             orderJoinBean = new OrderJoinBean();
            //相同的key的物件都發送到了這裡,在這裡將資料拼接完整
          if(null !=value.getId() && !value.getId().equals("null") ){//通過自定義map可知,訂單資料設定了id,商品資料沒有設定id
              orderJoinBean.setId(value.getId());
              orderJoinBean.setDate(value.getDate());
              orderJoinBean.setPid(value.getPid());
              orderJoinBean.setAmount(value.getAmount());
              temp.add(orderJoinBean);
          }else{
              orderJoinBean.setName(value.getName());
              orderJoinBean.setCategoryId(value.getCategoryId());
              orderJoinBean.setPrice(value.getPrice());
              temp1.add(orderJoinBean);
          }
        }
        if(temp != null && !temp.isEmpty()){
            for (OrderJoinBean bean : temp) {
                if(temp1 != null && !temp1.isEmpty()){
                    for (OrderJoinBean joinBean : temp1) {
                        bean.setName(joinBean.getName());
                        bean.setCategoryId(joinBean.getCategoryId());
                        bean.setPrice(joinBean.getPrice());
                        context.write(bean,NullWritable.get());
                    }
                }else{
                    context.write(bean,NullWritable.get());
                }
            }
        }
    }
}

四:注意map端獲取檔名稱的方式:

//通過獲取檔名來區分兩個不同的檔案

FileSplit inputSplit = (FileSplit) context.getInputSplit();//獲取輸入分割槽

String fileName = inputSplit.getPath().getName();

System.out.println("當前指定的檔名稱是:"+fileName);