深入理解 Reduce-side Join
在《MapReduce Design Patterns》一書中,作者給出了Reduce-side Join的實現方法,大致步驟如下:
- 使用MultipleInputs指定不同的來源表和相應的Mapper類;
- Mapper輸出的Key為Join的欄位內容,Value為打了來源表標籤的記錄;
- Reducer在接收到同一個Key的記錄後,執行以下兩步:
- 遍歷Values,根據標籤將來源表的記錄分別放到兩個List中;
- 遍歷兩個List,輸出Join結果。
具體實現可以參考這段程式碼。但是這種實現方法有一個問題:如果同一個Key的記錄數過多,存放在List中就會佔用很多記憶體,嚴重的會造成記憶體溢位(Out of Memory, OOM)。這種方法在一對一的情況下沒有問題,而一對多、多對多的情況就會有隱患。那麼,Hive在做Reduce-side Join時是如何避免OOM的呢?兩個關鍵點:
- Reducer在遍歷Values時,會將前面的表快取在記憶體中,對於最後一張表則邊掃描邊輸出;
- 如果前面幾張表記憶體中放不下,就寫入磁碟。
按照我們的實現,Mapper輸出的Key是product_id
,Values是打了標籤的產品表(Product)和訂單表(Order)的記錄。從資料量來看,應該快取產品表,掃描訂單表。這就要求兩表記錄到達Reducer時是有序的,產品表在前,邊掃描邊放入記憶體;訂單表在後,邊掃描邊結合產品表的記錄進行輸出。要讓Hadoop在Shuffle&Sort階段先按product_id
排序、再按表的標籤排序,就需要用到二次排序。
二次排序的概念很簡單,將Mapper輸出的Key由單一的product_id
product_id+tag
的複合Key就可以了,但需通過以下幾步實現:
自定義Key型別
原來product_id
是Text型別,我們的複合Key則要包含product_id
和tag
兩個資料,並實現WritableComparable
介面:
public class TaggedKey implements WritableComparable<TaggedKey> {
private Text joinKey = new Text();
private IntWritable tag = new IntWritable();
@Override
public int compareTo(TaggedKey taggedKey) {
int compareValue = joinKey.compareTo(taggedKey.getJoinKey());
if (compareValue == 0) {
compareValue = tag.compareTo(taggedKey.getTag());
}
return compareValue;
}
// 此處省略部分程式碼
}
可以看到,在比較兩個TaggedKey時,會先比較joinKey(即product_id
),再比較tag
。
自定義分割槽方法
預設情況下,Hadoop會對Key進行雜湊,以保證相同的Key會分配到同一個Reducer中。由於我們改變了Key的結構,因此需要重新編 寫分割槽函式:
public class TaggedJoiningPartitioner extends Partitioner<TaggedKey, Text> {
@Override
public int getPartition(TaggedKey taggedKey, Text text, int numPartitions) {
return taggedKey.getJoinKey().hashCode() % numPartitions;
}
}
自定義分組方法
同理,呼叫reduce函式需要傳入同一個Key的所有記錄,這就需要重新定義分組函式:
public class TaggedJoiningGroupingComparator extends WritableComparator {
public TaggedJoiningGroupingComparator() {
super(TaggedKey.class, true);
}
@SuppressWarnings("rawtypes")
@Override
public int compare(WritableComparable a, WritableComparable b) {
TaggedKey taggedKey1 = (TaggedKey) a;
TaggedKey taggedKey2 = (TaggedKey) b;
return taggedKey1.getJoinKey().compareTo(taggedKey2.getJoinKey());
}
}
配置Job
job.setMapOutputKeyClass(TaggedKey.class);
job.setMapOutputValueClass(Text.class);
job.setPartitionerClass(TaggedJoiningPartitioner.class);
job.setGroupingComparatorClass(TaggedJoiningGroupingComparator.class);
MapReduce過程
最後,我們在Mapper階段使用TaggedKey,在Reducer階段按照tag進行不同的操作就可以了:
@Override
protected void reduce(TaggedKey key, Iterable<Text> values, Context context)
throws IOException, InterruptedException {
List<String> products = new ArrayList<String>();
for (Text value : values) {
switch (key.getTag().get()) {
case 1: // Product
products.add(value.toString());
break;
case 2: // Order
String[] order = value.toString().split(",");
for (String productString : products) {
String[] product = productString.split(",");
List<String> output = new ArrayList<String>();
output.add(order[0]);
// ...
context.write(NullWritable.get(), new Text(StringUtils.join(output, ",")));
}
break;
default:
assert false;
}
}
}
遍歷values時,開始都是tag=1的記錄,之後都是tag=2的記錄。以上程式碼可以在這裡檢視。
對於第二個問題,超過快取大小的記錄(預設25000條)就會存入臨時檔案,由Hive的RowContainer類實現,具體可以看這個連結。
需要注意的是,Hive預設是按SQL中表的書寫順序來決定排序的,因此應該將大表放在最後。如果要人工改變順序,可以使用STREAMTABLE配置:
SELECT /*+ STREAMTABLE(a) */ a.val, b.val, c.val FROM a JOIN b ON (a.key = b.key1) JOIN c ON (c.key = b.key1)
但不要將這點和Map-side Join混淆,在配置了hive.auto.convert.join=true
後,是不需要注意表的順序的,Hive會自動將小表快取在Mapper的記憶體中。