MapReduce模型中常用join方法小結
這篇文章是在閱讀《A Comparison of Join Algorithms for Log Processing in MapReduce》後對該文的一個小結。該文章詳細介紹幾種在mapreduce模型下的join演算法,並且對他們的效能作出系統的評測。目前像pig,hive等上層應用在進行join計算時也是利用該文中提出的演算法或者演算法變種。
需求:
假設有兩個資料來源L,R,目前需要對這兩個資料來源中的記錄根據某一共同屬性k做equi-join操作,即將所有的L.k==R.k的記錄連線起來。
1、 Repartition Join(再分配連線)
該演算法是目前最為通用的一種join演算法。該演算法的核心思想是在shuffle階段將k值相同的記錄分配到同一個reduce任務中,並且在reduce中作出連線操作。該演算法已經在org.apache.hadoop.contrib.utils.join中實現。
標準版
首先,在map階段,每個map對一個L的分片或者R的分片進行處理,並且將k的值作為中間記錄的鍵,將記錄的其他部分以及記錄的來源(即是來自於L還是來自於R)做為中間記錄的值。這樣,所有k的值相同的記錄都將被分配到同一個reduce函式中處理。在reduce函式中,我們只需要將所有的記錄取出,並且根據值來判斷該記錄是來自於L還是R,然後將屬於L的記錄和屬於R的記錄進行連線操作即可。
這種方法有一個缺點,需要在reduce函式中對記錄進行快取。當L和R中具有相同k的值的記錄非常多的時候,這將成為一個嚴重的負擔。一種改進的演算法減少了這種負擔。
改進版:
在map階段,每個map還是對一個L的分片或者R的分片進行處理。不過,中間記錄的鍵由標準版的僅僅將k的值作為鍵的值改為由k的值加上資料來源(即是來自於L還是來自於R)做為鍵的值,餘下的部分做為記錄的值。在此,構造鍵的時候使用者需要保證在reduce端進行排序的時候,R的記錄一定都能夠排在L的記錄前面。然後,使用者需要自定義partition函式,確保k值相同的記錄能夠被分到同一個reduce當中。同時,使用者還需要自定義group函式,使group函式只對中間記錄鍵的k部分的值做處理,以確保具有相同k的值的記錄能夠被送到同一次的reduce呼叫當中。
這樣,由於R的記錄都排在L的記錄前面,reduce僅僅需要對R的記錄作出快取即可。
預處理:
如果L和R在進行join操作之前就已經完成排序,並根據連線的鍵進行了劃分,那麼在進行連線的時候只需要直接把兩者對應的資料讀入記憶體,做連線操作即可。在具體實現時,一方的資料可以從map的輸入獲取,另一方的資料必須手動從HDFS獲取。
2、 Broadcast Join(廣播連線)
前提假設:
L的容量遠大於R的。
演算法:將R放到所有的機器上,然後在Map過程中,將L的分片與R進行連線
3、 Semi-Join (半連線)
半聯結實際上是對廣播連線做預處理,因為在很多情況下,不是R中所有的記錄都需要和L做連線,因此,可以把R中不需要的記錄去掉。具體過程不再詳述