1. 程式人生 > >深入理解 Reduce-side Join

深入理解 Reduce-side Join

在《MapReduce Design Patterns》一書中,作者給出了Reduce-side Join的實現方法,大致步驟如下:

  1. 使用MultipleInputs指定不同的來源表和相應的Mapper類;
  2. Mapper輸出的Key為Join的欄位內容,Value為打了來源表標籤的記錄;
  3. Reducer在接收到同一個Key的記錄後,執行以下兩步:
    1. 遍歷Values,根據標籤將來源表的記錄分別放到兩個List中;
    2. 遍歷兩個List,輸出Join結果。

具體實現可以參考這段程式碼。但是這種實現方法有一個問題:如果同一個Key的記錄數過多,存放在List中就會佔用很多記憶體,嚴重的會造成記憶體溢位(Out of Memory, OOM)。這種方法在一對一的情況下沒有問題,而一對多、多對多的情況就會有隱患。那麼,Hive在做Reduce-side Join時是如何避免OOM的呢?兩個關鍵點:

  1. Reducer在遍歷Values時,會將前面的表快取在記憶體中,對於最後一張表則邊掃描邊輸出;
  2. 如果前面幾張表記憶體中放不下,就寫入磁碟。

按照我們的實現,Mapper輸出的Key是product_id,Values是打了標籤的產品表(Product)和訂單表(Order)的記錄。從資料量來看,應該快取產品表,掃描訂單表。這就要求兩表記錄到達Reducer時是有序的,產品表在前,邊掃描邊放入記憶體;訂單表在後,邊掃描邊結合產品表的記錄進行輸出。要讓Hadoop在Shuffle&Sort階段先按product_id排序、再按表的標籤排序,就需要用到二次排序。

二次排序的概念很簡單,將Mapper輸出的Key由單一的product_id

修改為product_id+tag的複合Key就可以了,但需通過以下幾步實現:

自定義Key型別

原來product_id是Text型別,我們的複合Key則要包含product_idtag兩個資料,並實現WritableComparable介面:

public class TaggedKey implements WritableComparable<TaggedKey> {

    private Text joinKey = new Text();
    private IntWritable tag = new IntWritable();

    @Override
public int compareTo(TaggedKey taggedKey) { int compareValue = joinKey.compareTo(taggedKey.getJoinKey()); if (compareValue == 0) { compareValue = tag.compareTo(taggedKey.getTag()); } return compareValue; } // 此處省略部分程式碼 }

可以看到,在比較兩個TaggedKey時,會先比較joinKey(即product_id),再比較tag

自定義分割槽方法

預設情況下,Hadoop會對Key進行雜湊,以保證相同的Key會分配到同一個Reducer中。由於我們改變了Key的結構,因此需要重新編 寫分割槽函式:

public class TaggedJoiningPartitioner extends Partitioner<TaggedKey, Text> {

    @Override
    public int getPartition(TaggedKey taggedKey, Text text, int numPartitions) {
        return taggedKey.getJoinKey().hashCode() % numPartitions;
    }

}

自定義分組方法

同理,呼叫reduce函式需要傳入同一個Key的所有記錄,這就需要重新定義分組函式:

public class TaggedJoiningGroupingComparator extends WritableComparator {

    public TaggedJoiningGroupingComparator() {
        super(TaggedKey.class, true);
    }

    @SuppressWarnings("rawtypes")
    @Override
    public int compare(WritableComparable a, WritableComparable b) {
        TaggedKey taggedKey1 = (TaggedKey) a;
        TaggedKey taggedKey2 = (TaggedKey) b;
        return taggedKey1.getJoinKey().compareTo(taggedKey2.getJoinKey());
    }

}

配置Job

job.setMapOutputKeyClass(TaggedKey.class);
job.setMapOutputValueClass(Text.class);

job.setPartitionerClass(TaggedJoiningPartitioner.class);
job.setGroupingComparatorClass(TaggedJoiningGroupingComparator.class);

MapReduce過程

最後,我們在Mapper階段使用TaggedKey,在Reducer階段按照tag進行不同的操作就可以了:

@Override
protected void reduce(TaggedKey key, Iterable<Text> values, Context context)
        throws IOException, InterruptedException {

    List<String> products = new ArrayList<String>();

    for (Text value : values) {
        switch (key.getTag().get()) {
        case 1: // Product
            products.add(value.toString());
            break;

        case 2: // Order
            String[] order = value.toString().split(",");
            for (String productString : products) {
                String[] product = productString.split(",");
                List<String> output = new ArrayList<String>();
                output.add(order[0]);
                // ...
                context.write(NullWritable.get(), new Text(StringUtils.join(output, ",")));
            }
            break;

        default:
            assert false;
        }
    }
}

遍歷values時,開始都是tag=1的記錄,之後都是tag=2的記錄。以上程式碼可以在這裡檢視。

對於第二個問題,超過快取大小的記錄(預設25000條)就會存入臨時檔案,由Hive的RowContainer類實現,具體可以看這個連結

需要注意的是,Hive預設是按SQL中表的書寫順序來決定排序的,因此應該將大表放在最後。如果要人工改變順序,可以使用STREAMTABLE配置:

SELECT /*+ STREAMTABLE(a) */ a.val, b.val, c.val FROM a JOIN b ON (a.key = b.key1) JOIN c ON (c.key = b.key1)

但不要將這點和Map-side Join混淆,在配置了hive.auto.convert.join=true後,是不需要注意表的順序的,Hive會自動將小表快取在Mapper的記憶體中。

參考資料