MapReduce之Map Join
一 介紹
之所以存在Reduce Join,是因為在map階段不能獲取所有需要的join欄位,即:同一個key對應的欄位可能位於不同map中。Reduce side join是非常低效的,因為shuffle階段要進行大量的資料傳輸。
Map Join是針對以下場景進行的優化:兩個待連線表中,有一個表非常大,而另一個表非常小,以至於小表可以直接存放到記憶體中。這樣,我們可以將小表複製多份,讓每個map task記憶體中存在一份(比如存放到hash table中),然後只掃描大表:對於大表中的每一條記錄key/value,在hash table中查詢是否有相同的key的記錄,如果有,則連線後輸出即可。
為了支援檔案的共享,Hadoop用到了分散式快取的概念,在MapReduce中稱為DistributedCache(目前已被標註為棄用,分散式快取的API可在Job類本身呼叫),它可以方便Map Task之間或Reduce Task之間共享一些資訊,同時也可以將第三方Jar包新增到其Classpass路徑中。Hadoop會將快取資料分發到叢集中所有準備啟動的節點上,複製到mapreduce.temp.dir中的配置目錄。
使用該類的方法如下:
job.addArchiveToClassPath(archive); //快取jar包到task執行節點的classpath中 ob.addCacheArchive(uri); //快取壓縮包到task執行節點的工作目錄 job.addFileToClassPath(file); //快取普通檔案到task執行節點的classpath中 job.addCacheFile(url); //將產品表文件快取到task工作節點的工作目錄中去
傳參格式:hdfs://namenode:9000/home/XXX/file,即Jar包、壓縮包、普通檔案所在hdfs路徑。
同時DistributedCache(分散式快取)可用來解決join演算法實現中的資料傾斜問題,例如兩張表:訂單表和產品表。
訂單表:
訂單號 時間 商品id 購買數量 1001,20170710,P0001,1 1002,20170710,P0001,3 1003,20170710,P0002,3 1004,20170710,P0002,4
產品表:
商品id 商品名稱 P0001,xiaomi P0002,huawei
需求就是根據外來鍵商品id來將兩張表資訊合併,拼接成 :
1001 ,20170710,P0001,1 xiaomi 1002,20170710,P0001,3 xiaomi 1003,20170710,P0002,3,huawei 1004,20170710,P0002,4,huawei
考慮問題:在mapreduce程式中,如果某些產品非常暢銷,肯定會產生很多訂單,但是剛好這些訂單資訊都傳到了一個reduce中(分割槽預設就是使用hashcode%reducetask數量,所以這種情況是正常的)。那麼這個reducetask壓力就很大了,而其他的reducetask處理的資訊就很小,有的甚至就處理幾條資料,這就出現了資料傾斜問題。
解決方案:一般來說訂單表的資料遠遠多於產品表資料,畢竟產品的種類就那些,所以我們可以把產品資訊都交給Map Task就行了邏輯都讓Map Task來處理,也就是說不使用Reduce了,而讓每個Map Task持有個product.data(儲存產品資訊的檔案)即可。那麼maptask怎麼獲得這個檔案呢?剛好hadoop提供了DistributedCache,我們將檔案交給這個分散式快取,它會將我們的檔案放到Map Task的工作目錄中,那麼Map 端可以直接從工作目錄中去拿。
二 程式碼部分
1 package mapreduce.DistributedCache; 2 3 import java.io.BufferedReader; 4 import java.io.FileInputStream; 5 import java.io.IOException; 6 import java.io.InputStream; 7 import java.io.InputStreamReader; 8 import java.net.URI; 9 import java.util.HashMap; 10 import java.util.Map; 11 import org.apache.hadoop.conf.Configuration; 12 import org.apache.hadoop.conf.Configured; 13 import org.apache.hadoop.fs.Path; 14 import org.apache.hadoop.io.LongWritable; 15 import org.apache.hadoop.io.NullWritable; 16 import org.apache.hadoop.io.Text; 17 import org.apache.hadoop.mapreduce.Job; 18 import org.apache.hadoop.mapreduce.Mapper; 19 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 20 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 21 import org.apache.hadoop.util.Tool; 22 import org.apache.hadoop.util.ToolRunner; 23 24 public class MapJoin extends Configured implements Tool{ 25 static class MapJoinMapper extends Mapper<LongWritable, Text, NullWritable, Text>{ 26 //用來快取小檔案(商品檔案中的資料) 27 Map<String, String> produceMap = new HashMap<String,String>(); 28 Text k = new Text(); 29 /* 30 * 原始碼中能看到在迴圈執行map()之前會執行一次setUp方法,可以用來做初始化 31 */ 32 @Override 33 protected void setup(Context context) 34 throws IOException, InterruptedException { 35 36 //將商品檔案中的資料寫到快取中 37 FileInputStream fileInput = new FileInputStream("product.data"); 38 //read data 39 InputStreamReader readFile = new InputStreamReader(fileInput ); 40 BufferedReader br = new BufferedReader(readFile); 41 String line = null; 42 while((line=br.readLine())!=null){ 43 //一行資料格式為P0001,xiaomi(商品id,商品名稱) 44 String[] fields = line.split(","); 45 produceMap.put(fields[0], fields[1]); 46 } 47 } 48 @Override 49 protected void map(LongWritable key, Text value, Context context) 50 throws IOException, InterruptedException { 51 //一行訂單資料 格式為 1001,20170710,P0001,1(訂單id,建立時間,商品id,購買商品數量) 52 String line = value.toString(); 53 String[] fields = line.split(","); 54 //根據訂單資料中商品id在快取中找出來對應商品資訊(商品名稱),進行串接 55 String productName = produceMap.get(fields[2]); 56 k.set(line+","+productName); 57 context.write(NullWritable.get(), k ); 58 } 59 } 60 61 public int run(String[] args) throws Exception { 62 63 // step 1:get configuration 64 Configuration conf = this.getConf(); 65 //set job 66 Job job = Job.getInstance(conf); 67 job.setJarByClass(MapJoin.class); 68 69 job.setMapperClass(MapJoinMapper.class); 70 job.setMapOutputKeyClass(Text.class); 71 job.setMapOutputValueClass(NullWritable.class); 72 73 //設定最終輸出型別 74 job.setOutputKeyClass(Text.class); 75 job.setOutputValueClass(NullWritable.class); 76 77 //將產品表文件快取到task工作節點的工作目錄中去 78 //快取普通檔案到task執行節點的工作目錄(hadoop幫我們完成) 79 job.addCacheFile(new URI("hdfs://beifeng01:8020/user/beifeng01/mapreduce/input/mapjoin/product.data")); 80 81 //不需要reduce,那麼也就沒有了shuffle過程 82 job.setNumReduceTasks(0); 83 84 FileInputFormat.setInputPaths(job, new Path(args[0])); 85 FileOutputFormat.setOutputPath(job, new Path(args[1])); 86 87 boolean isSuccess = job.waitForCompletion(true); 88 89 return isSuccess ? 0 : 1; 90 } 91 92 public static void main(String[] args) throws Exception { 93 args = new String[]{ 94 "hdfs://beifeng01:8020/user/beifeng01/mapreduce/input/mapjoin/orderid.data", 95 "hdfs://beifeng01:8020/user/beifeng01/mapreduce/output4" 96 }; 97 98 Configuration conf = new Configuration(); 99 100 // run mapreduce 101 int status = ToolRunner.run(conf, new MapJoin(), args); 102 103 // exit program 104 System.exit(status); 105 } 106 }
執行程式碼後檢視輸出結果
[[email protected] hadoop-2.5.0-cdh5.3.6]$ bin/hdfs dfs -text /user/beifeng01/mapreduce/output4/p* 1001,20170710,P0001,1,xiaomi 1002,20170710,P0001,3,xiaomi 1003,20170710,P0002,3,huawei 1004,20170710,P0002,4,huawei