Hadoop MapReduce二次排序演算法與實現之演算法解析
阿新 • • 發佈:2018-12-14
MapReduce二次排序的原理
1.在Mapper階段,會通過inputFormat的getSplits來把資料集分割成split
public abstract class InputFormat<K, V> {
public InputFormat() {}
public abstract List<InputSplit> getSplits(JobContext var1) throws IOException, InterruptedException;
2.inputFormat會提供RecordReader來讀取每一條Record,讀取之後會傳給map來接收和處理
public abstract RecordReader<K, V> createRecordReader(InputSplit var1, TaskAttemptContext var2) throws IOException, InterruptedException;
3.子啊Mapper階段最後通過partition對Mapper的計算記過進行分割槽,可以通過job的setPartitionerClass來自定義Partitioner
public abstract class Partitioner<KEY, VALUE> { public Partitioner() {} public abstract int getPartition(KEY var1, VALUE var2, int var3); } public void setPartitionerClass(Class<? extends Partitioner> cls) throws IllegalStateException { this.ensureState(Job.JobState.DEFINE); this.conf.setClass("mapreduce.job.partitioner.class", cls, Partitioner.class); }
4.在每個分割槽內可以呼叫Job的setSortComparatorClass來對分割槽內基於Key比較函式進行排序
public void setSortComparatorClass(Class<? extends RawComparator> cls) throws IllegalStateException { this.ensureState(Job.JobState.DEFINE); this.conf.setOutputKeyComparatorClass(cls); } public interface RawComparator<T> extends Comparator<T> { int compare(byte[] var1, int var2, int var3, byte[] var4, int var5, int var6); }
如果沒有設定則會呼叫key 的compareTo方法 5. 在Reducer端會接收到所有Mapper端中屬於自己的資料,其實我們可以通過Job的setComparatorClass來對當前Reducer收到的所有的資料基於key比較函式進行排序,由於Reducer端每一個key對應的是valueList,因此需要Job的的setGroupingComparator來設定分組函式的類;
public void setCombinerKeyGroupingComparatorClass(Class<? extends RawComparator> cls) throws IllegalStateException {
this.ensureState(Job.JobState.DEFINE);
this.conf.setCombinerKeyGroupingComparator(cls);
}
6.最後呼叫Reducer對自己收到的資料進行最後的處理
總結: Mapper端: 設定partitioner控制partition的過程; 設定Comparator控制每個分割槽內的排序; Reducer端: 設定Comparator控制reduce收到的所有資料的排序; 設定GroupingComparator對資料進行Group操作;