1. 程式人生 > >MapReduce之Map Join

MapReduce之Map Join

一 介紹

之所以存在Reduce Join,是因為在map階段不能獲取所有需要的join欄位,即:同一個key對應的欄位可能位於不同map中。Reduce side join是非常低效的,因為shuffle階段要進行大量的資料傳輸。

Map Join是針對以下場景進行的優化:兩個待連線表中,有一個表非常大,而另一個表非常小,以至於小表可以直接存放到記憶體中。這樣,我們可以將小表複製多份,讓每個map task記憶體中存在一份(比如存放到hash table中),然後只掃描大表:對於大表中的每一條記錄key/value,在hash table中查詢是否有相同的key的記錄,如果有,則連線後輸出即可。

為了支援檔案的共享,Hadoop用到了分散式快取的概念,在MapReduce中稱為DistributedCache(目前已被標註為棄用,分散式快取的API可在Job類本身呼叫),它可以方便Map Task之間或Reduce Task之間共享一些資訊,同時也可以將第三方Jar包新增到其Classpass路徑中。Hadoop會將快取資料分發到叢集中所有準備啟動的節點上,複製到mapreduce.temp.dir中的配置目錄。

使用該類的方法如下:

job.addArchiveToClassPath(archive); //快取jar包到task執行節點的classpath中
ob.addCacheArchive(uri); //快取壓縮包到task執行節點的工作目錄
job.addFileToClassPath(file); //快取普通檔案到task執行節點的classpath中
job.addCacheFile(url); //將產品表文件快取到task工作節點的工作目錄中去

傳參格式:hdfs://namenode:9000/home/XXX/file,即Jar包、壓縮包、普通檔案所在hdfs路徑。

同時DistributedCache(分散式快取)可用來解決join演算法實現中的資料傾斜問題,例如兩張表:訂單表和產品表。

訂單表:

訂單號 時間 商品id 購買數量 
1001,20170710,P0001,1 
1002,20170710,P0001,3 
1003,20170710,P0002,3 
1004,20170710,P0002,4

產品表: 

商品id 商品名稱 
P0001,xiaomi
P0002,huawei

需求就是根據外來鍵商品id來將兩張表資訊合併,拼接成 :

1001 ,20170710,P0001,1 xiaomi
1002,20170710,P0001,3 xiaomi
1003,20170710,P0002,3,huawei
1004,20170710,P0002,4,huawei

考慮問題:在mapreduce程式中,如果某些產品非常暢銷,肯定會產生很多訂單,但是剛好這些訂單資訊都傳到了一個reduce中(分割槽預設就是使用hashcode%reducetask數量,所以這種情況是正常的)。那麼這個reducetask壓力就很大了,而其他的reducetask處理的資訊就很小,有的甚至就處理幾條資料,這就出現了資料傾斜問題。

解決方案:一般來說訂單表的資料遠遠多於產品表資料,畢竟產品的種類就那些,所以我們可以把產品資訊都交給Map Task就行了邏輯都讓Map Task來處理,也就是說不使用Reduce了,而讓每個Map Task持有個product.data(儲存產品資訊的檔案)即可。那麼maptask怎麼獲得這個檔案呢?剛好hadoop提供了DistributedCache,我們將檔案交給這個分散式快取,它會將我們的檔案放到Map Task的工作目錄中,那麼Map 端可以直接從工作目錄中去拿。

二 程式碼部分

  1 package mapreduce.DistributedCache;
  2 
  3 import java.io.BufferedReader;
  4 import java.io.FileInputStream;
  5 import java.io.IOException;
  6 import java.io.InputStream;
  7 import java.io.InputStreamReader;
  8 import java.net.URI;
  9 import java.util.HashMap;
 10 import java.util.Map;
 11 import org.apache.hadoop.conf.Configuration;
 12 import org.apache.hadoop.conf.Configured;
 13 import org.apache.hadoop.fs.Path;
 14 import org.apache.hadoop.io.LongWritable;
 15 import org.apache.hadoop.io.NullWritable;
 16 import org.apache.hadoop.io.Text;
 17 import org.apache.hadoop.mapreduce.Job;
 18 import org.apache.hadoop.mapreduce.Mapper;
 19 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 20 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 21 import org.apache.hadoop.util.Tool;
 22 import org.apache.hadoop.util.ToolRunner;
 23 
 24 public class MapJoin extends Configured implements Tool{
 25     static class MapJoinMapper extends Mapper<LongWritable, Text, NullWritable, Text>{
 26         //用來快取小檔案(商品檔案中的資料)
 27         Map<String, String> produceMap = new HashMap<String,String>();
 28         Text k = new Text();
 29         /*
 30          * 原始碼中能看到在迴圈執行map()之前會執行一次setUp方法,可以用來做初始化
 31          */
 32         @Override
 33         protected void setup(Context context)
 34                 throws IOException, InterruptedException {
 35 
 36             //將商品檔案中的資料寫到快取中  
 37             FileInputStream fileInput = new FileInputStream("product.data");
 38             //read data
 39             InputStreamReader readFile = new InputStreamReader(fileInput );
 40             BufferedReader br = new BufferedReader(readFile);
 41             String line = null;
 42             while((line=br.readLine())!=null){
 43                 //一行資料格式為P0001,xiaomi(商品id,商品名稱)
 44                 String[] fields = line.split(",");
 45                 produceMap.put(fields[0], fields[1]);
 46             }
 47         }
 48         @Override
 49         protected void map(LongWritable key, Text value, Context context)
 50                 throws IOException, InterruptedException {
 51             //一行訂單資料    格式為 1001,20170710,P0001,1(訂單id,建立時間,商品id,購買商品數量)
 52             String line = value.toString();
 53             String[] fields = line.split(",");
 54             //根據訂單資料中商品id在快取中找出來對應商品資訊(商品名稱),進行串接
 55             String productName = produceMap.get(fields[2]);
 56             k.set(line+","+productName);
 57             context.write(NullWritable.get(), k );
 58         }
 59     }
 60     
 61     public int run(String[] args) throws Exception {
 62 
 63     // step 1:get configuration
 64     Configuration conf = this.getConf();
 65         //set job
 66     Job job = Job.getInstance(conf);
 67         job.setJarByClass(MapJoin.class);
 68 
 69         job.setMapperClass(MapJoinMapper.class);
 70         job.setMapOutputKeyClass(Text.class);
 71         job.setMapOutputValueClass(NullWritable.class);
 72 
 73         //設定最終輸出型別
 74         job.setOutputKeyClass(Text.class);
 75         job.setOutputValueClass(NullWritable.class);
 76 
 77         //將產品表文件快取到task工作節點的工作目錄中去
 78         //快取普通檔案到task執行節點的工作目錄(hadoop幫我們完成)
 79         job.addCacheFile(new URI("hdfs://beifeng01:8020/user/beifeng01/mapreduce/input/mapjoin/product.data"));
 80 
 81         //不需要reduce,那麼也就沒有了shuffle過程
 82         job.setNumReduceTasks(0);
 83 
 84         FileInputFormat.setInputPaths(job, new Path(args[0]));
 85         FileOutputFormat.setOutputPath(job, new Path(args[1]));
 86 
 87         boolean isSuccess = job.waitForCompletion(true);
 88         
 89         return isSuccess ? 0 : 1;
 90     }
 91     
 92     public static void main(String[] args) throws Exception {
 93         args = new String[]{
 94                 "hdfs://beifeng01:8020/user/beifeng01/mapreduce/input/mapjoin/orderid.data",
 95                 "hdfs://beifeng01:8020/user/beifeng01/mapreduce/output4"
 96         };
 97         
 98         Configuration conf = new Configuration();
 99         
100         // run mapreduce
101         int status = ToolRunner.run(conf, new MapJoin(), args);
102         
103         // exit program
104         System.exit(status);
105     }
106 }

執行程式碼後檢視輸出結果

[[email protected] hadoop-2.5.0-cdh5.3.6]$ bin/hdfs dfs -text /user/beifeng01/mapreduce/output4/p*
1001,20170710,P0001,1,xiaomi 
1002,20170710,P0001,3,xiaomi 
1003,20170710,P0002,3,huawei
1004,20170710,P0002,4,huawei