1. 程式人生 > >mapreduce之mapper、reducer個數

mapreduce之mapper、reducer個數

這裡寫圖片描述

這個圖大概可以描述mapreduce計算模型的執行過程,下面我們就圍繞這個圖聊幾個問題,其中有工作中非常有用的問題:

1. mapper的個數
結論:mapper的個數是由輸入資料的大小決定的,一般不需要我們去設定,如果你想控制mapper的個數,那麼需要先了解hadoop是怎麼控制mapper的個數。
如圖所示,每個Mapper Tasker對應一個split(切片),要處理的file被FileInputFormat分成了幾個切片就會有幾個mapper;FileInputFormat怎麼獲取的切面呢,直接上原始碼:

public List<InputSplit> getSplits
(JobContext job) { long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job)); long maxSize = getMaxSplitSize(job); long splitSize = computeSplitSize(blockSize, minSize, maxSize); long blockSize = file.getBlockSize(); long splitSize=computeSplitSize(blockSize,minSize, maxSize); } protected
long getFormatMinSplitSize() { return 1; } public static long getMinSplitSize(JobContext job) { return job.getConfiguration().getLong(SPLIT_MINSIZE, 1L); } public static final String SPLIT_MINSIZE = "mapreduce.input.fileinputformat.split.minsize"; public static long getMaxSplitSize
(JobContext context) { return context.getConfiguration().getLong(SPLIT_MAXSIZE, Long.MAX_VALUE); } public static final String SPLIT_MAXSIZE = "mapreduce.input.fileinputformat.split.maxsize"; protected long computeSplitSize(long blockSize, long minSize,long maxSize) { return Math.max(minSize, Math.min(maxSize, blockSize)); }

通過原始碼得出切片數splitSize由三個元素決定:

  1. blockSize block大小,hadoop1預設64M,hadoop2預設128M
  2. minSize 最小值,預設是 1,我可以通過FileInputFormat.setMinInputSplitSize(job, size)方法來修改最小值;
  3. maxSize 最大值,預設是MAX_VALUE = 0x7fffffffffffffffL,可以通過FileInputFormat.setMaxInputSplitSize(job, size)修改最大值
    最後做計算:Math.max(minSize, Math.min(maxSize, blockSize))
    預設情況下:Math.max(1, Math.min(0x7fffffffffffffffL, 128))顯然結果為128,也就是說預設情況下有幾個block就有幾個切片,這也是為了提高mapreduce的執行效率。

2.reducer個數
結論:reducer個數是由partition個數決定。
mapper產生的中間資料經過shuffer過程,根據我們的業務把資料分成若干partition,每個partition的資料由對應的一個reducer來處理。mapreduce決定partition的是:Partitioner類中的intgetPartition(KEY key, VALUE value, int numPartitions)方法,我們來看下預設的分割槽方法:

public class HashPartitioner<K, V> extends Partitioner<K, V> {

  /** Use {@link Object#hashCode()} to partition. */
  public int getPartition(K key, V value,
                          int numReduceTasks) {
    return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks;
  }

}

由程式碼看出預設的分割槽有兩個元素決定:

  • key的hash值
  • numReduceTasks,需要我們通過job.setNumReduceTasks(reduceNum) 方法來設定的reducer個數
    最終得出partition個數就是我們設定的個數,比如我們設定job.setNumReduceTasks(3) hashcode除以10的餘數就是0、1、2三個值,預設之所以用key的hash值是為了把資料均勻的分佈到reducer防止資料傾斜。
    當然了我們可以根據我們自己的業務來繼承Partition類重寫getPartition方法來決定partition數。