shuffle的關鍵階段sort(Map端和Reduce端)原始碼分析
原始碼中有這樣一段程式碼
1. Map端排序獲取的比較器
public RawComparator getOutputKeyComparator() { // 獲取mapreduce.job.output.key.comparator.class,必須是RawComparator型別,如果沒設定,是null Class<? extends RawComparator> theClass = getClass( JobContext.KEY_COMPARATOR, null, RawComparator.class); // 如果使用者自定義了這個引數,那麼例項化使用者自定義的比較器 if (theClass != null) return ReflectionUtils.newInstance(theClass, this); // 預設情況,使用者是沒用自定義這個引數 // 判斷Map輸出的key,是否是WritableComparable的子類 // 如果是,呼叫當前類的內部的Comparator! return WritableComparator.get(getMapOutputKeyClass().asSubclass(WritableComparable.class), this); }
總結: 如何對感興趣的資料進行排序?
① 資料必須作為key
② 排序是框架自動排序,我們提供基於key的比較器,也就是Comparator,必須是RawComparator型別
a) 自定義類,實現RawComparator,重寫compare()
指定mapreduce.job.output.key.comparator.class為自定義的比較器型別
b)key實現WritableComparable(推薦)
③ 實質都是呼叫相關的comparaTo()方法,進行比較
2. Reduce端進行分組的比較器
RawComparator comparator = job.getOutputValueGroupingComparator();
// 獲取mapreduce.job.output.group.comparator.class,必須是RawComparator型別
// 如果沒用設定,直接獲取MapTask排序使用的比較器
// 也是比較key
public RawComparator getOutputValueGroupingComparator() {
Class<? extends RawComparator> theClass = getClass(
JobContext.GROUP_COMPARATOR_CLASS, null, RawComparator.class);
if (theClass == null) {
return getOutputKeyComparator();
}
// 如果設定了,就使用設定的比較器
return ReflectionUtils.newInstance(theClass, this);
}