MapReduce之mapper以及reducer的個數決定性因素
阿新 • • 發佈:2018-11-12
這個圖大概可以描述mapreduce計算模型的執行過程,下面我們就圍繞這個圖聊幾個問題,其中有工作中非常有用的問題:
1. mapper的個數
結論:mapper的個數是由輸入資料的大小決定的,一般不需要我們去設定,如果你想控制mapper的個數,那麼需要先了解hadoop是怎麼控制mapper的個數。
如圖所示,每個Mapper Tasker對應一個split(切片),要處理的file被FileInputFormat分成了幾個切片就會有幾個mapper;FileInputFormat怎麼獲取的切面呢,直接上原始碼:
public List<InputSplit> getSplits(JobContext job) { long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job)); long maxSize = getMaxSplitSize(job); long splitSize = computeSplitSize(blockSize, minSize, maxSize); long blockSize = file.getBlockSize(); long splitSize=computeSplitSize(blockSize,minSize, maxSize); } protected long getFormatMinSplitSize() { return 1; } public static long getMinSplitSize(JobContext job) { return job.getConfiguration().getLong(SPLIT_MINSIZE, 1L); } public static final String SPLIT_MINSIZE = "mapreduce.input.fileinputformat.split.minsize"; public static long getMaxSplitSize(JobContext context) { return context.getConfiguration().getLong(SPLIT_MAXSIZE, Long.MAX_VALUE); } public static final String SPLIT_MAXSIZE = "mapreduce.input.fileinputformat.split.maxsize"; protected long computeSplitSize(long blockSize, long minSize,long maxSize) { return Math.max(minSize, Math.min(maxSize, blockSize)); }
通過原始碼得出切片數splitSize由三個元素決定:
- blockSize block大小,hadoop1預設64M,hadoop2預設128M
- minSize 最小值,預設是 1,我可以通過FileInputFormat.setMinInputSplitSize(job, size)方法來修改最小值;
- maxSize 最大值,預設是MAX_VALUE = 0x7fffffffffffffffL,可以通過FileInputFormat.setMaxInputSplitSize(job, size)修改最大值
最後做計算:Math.max(minSize, Math.min(maxSize, blockSize))
預設情況下:Math.max(1, Math.min(0x7fffffffffffffffL, 128))顯然結果為128,也就是說預設情況下有幾個block就有幾個切片,這也是為了提高mapreduce的執行效率。
2.reducer個數
結論:reducer個數是由partition個數決定。
mapper產生的中間資料經過shuffer過程,根據我們的業務把資料分成若干partition,每個partition的資料由對應的一個reducer來處理。mapreduce決定partition的是:Partitioner類中的intgetPartition(KEY key, VALUE value, int numPartitions)
方法,我們來看下預設的分割槽方法:
public class HashPartitioner<K, V> extends Partitioner<K, V> { /** Use {@link Object#hashCode()} to partition. */ public int getPartition(K key, V value, int numReduceTasks) { return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks; } }
由程式碼看出預設的分割槽有兩個元素決定:
- key的hash值
- numReduceTasks,需要我們通過
job.setNumReduceTasks(reduceNum)
方法來設定的reducer個數
最終得出partition個數就是我們設定的個數,比如我們設定job.setNumReduceTasks(3)
hashcode除以10的餘數就是0、1、2三個值,預設之所以用key的hash值是為了把資料均勻的分佈到reducer防止資料傾斜。
當然了我們可以根據我們自己的業務來繼承Partition類重寫getPartition方法來決定partition數。