mapreduce之mapper、reducer個數
阿新 • • 發佈:2019-02-18
這個圖大概可以描述mapreduce計算模型的執行過程,下面我們就圍繞這個圖聊幾個問題,其中有工作中非常有用的問題:
1. mapper的個數
結論:mapper的個數是由輸入資料的大小決定的,一般不需要我們去設定,如果你想控制mapper的個數,那麼需要先了解hadoop是怎麼控制mapper的個數。
如圖所示,每個Mapper Tasker對應一個split(切片),要處理的file被FileInputFormat分成了幾個切片就會有幾個mapper;FileInputFormat怎麼獲取的切面呢,直接上原始碼:
public List<InputSplit> getSplits (JobContext job) {
long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));
long maxSize = getMaxSplitSize(job);
long splitSize = computeSplitSize(blockSize, minSize, maxSize);
long blockSize = file.getBlockSize();
long splitSize=computeSplitSize(blockSize,minSize, maxSize);
}
protected long getFormatMinSplitSize() {
return 1;
}
public static long getMinSplitSize(JobContext job) {
return job.getConfiguration().getLong(SPLIT_MINSIZE, 1L);
}
public static final String SPLIT_MINSIZE =
"mapreduce.input.fileinputformat.split.minsize";
public static long getMaxSplitSize (JobContext context) {
return context.getConfiguration().getLong(SPLIT_MAXSIZE, Long.MAX_VALUE);
}
public static final String SPLIT_MAXSIZE =
"mapreduce.input.fileinputformat.split.maxsize";
protected long computeSplitSize(long blockSize, long minSize,long maxSize) {
return Math.max(minSize, Math.min(maxSize, blockSize));
}
通過原始碼得出切片數splitSize由三個元素決定:
- blockSize block大小,hadoop1預設64M,hadoop2預設128M
- minSize 最小值,預設是 1,我可以通過FileInputFormat.setMinInputSplitSize(job, size)方法來修改最小值;
- maxSize 最大值,預設是MAX_VALUE = 0x7fffffffffffffffL,可以通過FileInputFormat.setMaxInputSplitSize(job, size)修改最大值
最後做計算:Math.max(minSize, Math.min(maxSize, blockSize))
預設情況下:Math.max(1, Math.min(0x7fffffffffffffffL, 128))顯然結果為128,也就是說預設情況下有幾個block就有幾個切片,這也是為了提高mapreduce的執行效率。
2.reducer個數
結論:reducer個數是由partition個數決定。
mapper產生的中間資料經過shuffer過程,根據我們的業務把資料分成若干partition,每個partition的資料由對應的一個reducer來處理。mapreduce決定partition的是:Partitioner類中的intgetPartition(KEY key, VALUE value, int numPartitions)
方法,我們來看下預設的分割槽方法:
public class HashPartitioner<K, V> extends Partitioner<K, V> {
/** Use {@link Object#hashCode()} to partition. */
public int getPartition(K key, V value,
int numReduceTasks) {
return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks;
}
}
由程式碼看出預設的分割槽有兩個元素決定:
- key的hash值
- numReduceTasks,需要我們通過
job.setNumReduceTasks(reduceNum)
方法來設定的reducer個數
最終得出partition個數就是我們設定的個數,比如我們設定job.setNumReduceTasks(3)
hashcode除以10的餘數就是0、1、2三個值,預設之所以用key的hash值是為了把資料均勻的分佈到reducer防止資料傾斜。
當然了我們可以根據我們自己的業務來繼承Partition類重寫getPartition方法來決定partition數。