1. 程式人生 > >案例-使用MapReduce實現join操作

案例-使用MapReduce實現join操作

哈嘍~各位小夥伴們中秋快樂,好久沒更新新的文章啦,今天分享如何使用mapreduce進行join操作。

在離線計算中,我們常常不只是會對單一一個檔案進行操作,進行需要進行兩個或多個檔案關聯出更多資料,類似與sql中的join操作。
今天就跟大家分享一下如何在MapReduce中實現join操作

需求

現有兩張,一張是產品資訊表,一張是訂單表。訂單表中只表存了產品ID,如果想要查出訂單以及產品的相關資訊就必須使用關聯。

實現

根據MapReduce特性,大家都知道在reduce端,相同key的key,value對會被放到同一個reduce方法中(不設定partition的話)。
利用這個特點我們可以輕鬆實現join操作,請看下面示例。

產品表

ID brand model
p0001 蘋果 iphone11 pro max
p0002 華為 p30
p0003 小米 mate10

訂單表

id name address produceID num
00001 kris 深圳市福田區 p0001 1
00002 pony 深圳市南山區 p0001 2
00003 jack 深圳市阪田區 p0001 3

假如資料量巨大,兩表的資料是以檔案的形式儲存在HDFS中,需要用mapreduce程式來實現一下SQL查詢運算:

select a.id,a.name,a.address,a.num from t_orders a join t_products on a.productID=b.ID

MapReduce實現思路

通過將關聯的條件(prodcueID)作為map輸出的key,將兩表滿足join條件的資料並攜帶資料所來源的檔案資訊,發往同一個
reduce task,在reduce中進行資料的串聯

實現方式一-reduce端join

定義一個Bean

public class RJoinInfo implements Writable{
    private String customerName="";
    private String customerAddr="";
    private String orderID="";
    private int orderNum;
    private String productID="";
    private String productBrand="";
    private String productModel="";
//    0是產品,1是訂單
    private int flag;
    
    setter/getter

編寫Mapper

public class RJoinMapper extends Mapper<LongWritable,Text,Text,RJoinInfo> {
    private static Logger logger = LogManager.getLogger(RJoinMapper.class);
    private RJoinInfo rJoinInfo = new RJoinInfo();
    private Text k = new Text();
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
//        輸入方式支援很多中包括資料庫等等。這裡用的是檔案,因此可以直接強轉為檔案切片
        FileSplit fileSplit = (FileSplit) context.getInputSplit();
//        獲取檔名稱
        String name = fileSplit.getPath().getName();
        logger.info("splitPathName:"+name);

        String line = value.toString();
        String[] split = line.split("\t");


        String productID = "";

            if(name.contains("product")){
                productID = split[0];
                String setProductBrand = split[1];
                String productModel = split[2];

                rJoinInfo.setProductID(productID);
                rJoinInfo.setProductBrand(setProductBrand);
                rJoinInfo.setProductModel(productModel);
                rJoinInfo.setFlag(0);
            }else if(name.contains("orders")){
                String orderID = split[0];
                String customerName = split[1];
                String cutsomerAddr = split[2];
                productID = split[3];
                String orderNum = split[4];

                rJoinInfo.setProductID(productID);
                rJoinInfo.setCustomerName(customerName);
                rJoinInfo.setCustomerAddr(cutsomerAddr);
                rJoinInfo.setOrderID(orderID);
                rJoinInfo.setOrderNum(Integer.parseInt(orderNum));
                rJoinInfo.setFlag(1);
            }

        k.set(productID);
        context.write(k,rJoinInfo);
    }
}

程式碼解釋,這裡根據split的檔名,判斷是products還是orders,
然後根據是product還是orders獲取不同的資料,最用都以productID為Key傳送給Reduce端

編寫Reducer

public class RJoinReducer extends Reducer<Text,RJoinInfo,RJoinInfo,NullWritable> {
    private static Logger logger = LogManager.getLogger(RJoinReducer.class);
    @Override
    protected void reduce(Text key, Iterable<RJoinInfo> values, Context context) throws IOException, InterruptedException {
        List<RJoinInfo> orders = new ArrayList<>();

        String productID = key.toString();
        logger.info("productID:"+productID);
        RJoinInfo rJoinInfo = new RJoinInfo();

        for (RJoinInfo value : values) {
            int flag = value.getFlag();
            if (flag == 0) {
//                產品
                try {
                    BeanUtils.copyProperties(rJoinInfo,value);
                } catch (IllegalAccessException e) {
                    logger.error(e.getMessage());
                } catch (InvocationTargetException e) {
                    logger.error(e.getMessage());
                }
            }else {
//                訂單
                RJoinInfo orderInfo = new RJoinInfo();
                try {
                    BeanUtils.copyProperties(orderInfo,value);
                } catch (IllegalAccessException e) {
                    logger.error(e.getMessage());
                } catch (InvocationTargetException e) {
                    logger.error(e.getMessage());
                }
                orders.add(orderInfo);
            }
        }

        for (RJoinInfo order : orders) {
            rJoinInfo.setOrderNum(order.getOrderNum());
            rJoinInfo.setOrderID(order.getOrderID());
            rJoinInfo.setCustomerName(order.getCustomerName());
            rJoinInfo.setCustomerAddr(order.getCustomerAddr());

//          只輸出key即可,value可以使用nullwritable
            context.write(rJoinInfo,NullWritable.get());
        }
    }
}

程式碼解釋:根據productID會分為不同的組發到reduce端,reduce端拿到後一組資料後,其中有一個產品物件和多個訂單物件。
遍歷每一個物件,根據flag區分產品和訂單。儲存產品物件,獲取每個訂單物件到一個集合中。當我們對每個物件都分好
類後,遍歷訂單集合將訂單和產品資訊集合,然後輸出。

注意:我們這裡效率雖然不是最高的,主要是想說明join的思路。

編寫Driver

public class RJoinDriver {
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {

        Configuration conf = new Configuration();
//        conf.set("mapreduce.framework.name","yarn");
//        conf.set("yarn.resourcemanager.hostname","server1");
//        conf.set("fs.defaultFS","hdfs://server1:9000");
        conf.set("mapreduce.framework.name","local");
        conf.set("fs.defaultFS","file:///");

        Job job = Job.getInstance(conf);

//       如果是本地執行,可以不用設定jar包的路徑,因為不用拷貝jar到其他地方
        job.setJarByClass(RJoinDriver.class);
//        job.setJar("/Users/kris/IdeaProjects/bigdatahdfs/target/rjoin.jar");

        job.setMapperClass(RJoinMapper.class);
        job.setReducerClass(RJoinReducer.class);

        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(RJoinInfo.class);
        job.setOutputKeyClass(RJoinInfo.class);
        job.setOutputValueClass(NullWritable.class);

        FileInputFormat.setInputPaths(job,new Path("/Users/kris/Downloads/rjoin/input"));
        FileOutputFormat.setOutputPath(job,new Path("/Users/kris/Downloads/rjoin/output"));

        boolean waitForCompletion = job.waitForCompletion(true);
        System.out.println(waitForCompletion);
    }
}

==上面實現的這種方式有個缺點,就是join操作是在reduce階段完成的,reduce端的處理壓力太大,map節點的運算負載則很低,資源利用率不高,且在reduce階段極易產生資料傾斜==

實現方式二-map端join

這種方式適用於關聯表中有小表的情形:
可以將小表分發到所有的map節點,這樣,map節點就可以在本地對自己所讀到的大表資料進行join操作並輸出結果,
可以大大提高join操作的併發度,加快處理速度。

編寫Mapper

在Mapper端我們一次性載入資料或者用Distributedbache將檔案拷貝到每一個執行的maptask的節點上載入

這裡我們使用第二種,在mapper類中定義好小表進行join
static class RjoinMapper extends Mapper<LongWritable,Text,RJoinInfo,NullWritable>{

        private static Map<String, RJoinInfo> productMap = new HashMap<>();

//      在迴圈呼叫map方法之前會先呼叫setup方法。因此我們可以在setup方法中,先對檔案進行處理
        @Override
        protected void setup(Context context) throws IOException, InterruptedException {

            //通過這幾句程式碼可以獲取到cache file的本地絕對路徑,測試驗證用
            URI[] cacheFiles = context.getCacheFiles();
            System.out.println(Arrays.toString(new URI[]{cacheFiles[0]}));

//          直接指定名字,預設在工作資料夾的目錄下查詢 1⃣
            try (BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(new FileInputStream("products.txt")))){

                String line;
                while ((line = bufferedReader.readLine())!=null){
                    String[] split = line.split("\t");
                    String productID = split[0];
                    String setProductBrand = split[1];
                    String productModel = split[2];

                    RJoinInfo rJoinInfo = new RJoinInfo();
                    rJoinInfo.setProductID(productID);
                    rJoinInfo.setProductBrand(setProductBrand);
                    rJoinInfo.setProductModel(productModel);
                    rJoinInfo.setFlag(0);
                    productMap.put(productID, rJoinInfo);
                }
            }

            super.setup(context);
        }

        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            FileSplit fileSplit = (FileSplit)context.getInputSplit();

            String name = fileSplit.getPath().getName();

            if (name.contains("orders")) {
                String line = value.toString();

                String[] split = line.split("\t");
                String orderID = split[0];
                String customerName = split[1];
                String cutsomerAddr = split[2];
                String productID = split[3];
                String orderNum = split[4];

                RJoinInfo rJoinInfo = productMap.get(productID);
                rJoinInfo.setProductID(productID);
                rJoinInfo.setCustomerName(customerName);
                rJoinInfo.setCustomerAddr(cutsomerAddr);
                rJoinInfo.setOrderID(orderID);
                rJoinInfo.setOrderNum(Integer.parseInt(orderNum));
                rJoinInfo.setFlag(1);

                context.write(rJoinInfo, NullWritable.get());
            }
        }
    }

程式碼解釋:這裡我們又重寫了一個setup()方法,這個方法會在執行map()方法前先執行,因此我們可以在這個方法中事先載入好資料。
在上述程式碼中,我們直接指定名字就拿到了product.txt檔案,這個究竟這個檔案是怎麼複製在maptask的節點上的呢,還要看下面的driver

編寫Driver

public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException, URISyntaxException {
        Configuration conf = new Configuration();
        conf.set("mapreduce.framework.name","local");
        conf.set("fs.defaultFS","file:///");

        Job job = Job.getInstance(conf);
        job.setJarByClass(RJoinDemoInMapDriver.class);

        job.setMapperClass(RjoinMapper.class);
        job.setOutputKeyClass(RJoinInfo.class);
        job.setOutputValueClass(NullWritable.class);

        FileInputFormat.setInputPaths(job,new Path("/Users/kris/Downloads/rjoin/input"));
        FileOutputFormat.setOutputPath(job,new Path("/Users/kris/Downloads/rjoin/output2"));

//        指定需要快取一個檔案到所有的maptask執行節點工作目錄
//        job.addFileToClassPath(); 將普通檔案快取到task執行節點的classpath下
//        job.addArchiveToClassPath();快取jar包到task執行節點的classpath下
//        job.addCacheArchive();快取壓縮包檔案到task執行節點的工作目錄
//        job.addCacheFile();將普通檔案 1⃣
        job.addCacheFile(new URI("/Users/kris/Downloads/rjoin/products.txt"));

//      設定reduce的數量為0
        job.setNumReduceTasks(0);


        boolean waitForCompletion = job.waitForCompletion(true);
        System.out.println(waitForCompletion);

    }

程式碼解釋:上述Driver中,我們通過job.addCacheFile()指定了一個URI本地地址,執行時mapreduce就會將這個檔案拷貝到maptask的執行工作目錄中。

好啦~本期分享程式碼量偏多,主要是想分享如何使用mapreduce進行join操作的思路。下一篇我會再講一下 計算共同好友的思路以及程式碼~

        公眾號搜尋:喜訊XiCent    獲取更多福利資源~~~~

本文由部落格一文多發平臺 OpenWrite 釋出!