1. 程式人生 > >MapReduce之mapper以及reducer的個數決定性因素

MapReduce之mapper以及reducer的個數決定性因素

這裡寫圖片描述

這個圖大概可以描述mapreduce計算模型的執行過程,下面我們就圍繞這個圖聊幾個問題,其中有工作中非常有用的問題:

1. mapper的個數 
結論:mapper的個數是由輸入資料的大小決定的,一般不需要我們去設定,如果你想控制mapper的個數,那麼需要先了解hadoop是怎麼控制mapper的個數。 
如圖所示,每個Mapper Tasker對應一個split(切片),要處理的file被FileInputFormat分成了幾個切片就會有幾個mapper;FileInputFormat怎麼獲取的切面呢,直接上原始碼:

public List<InputSplit> getSplits(JobContext job)  {
    long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));
    long maxSize = getMaxSplitSize(job);
    long splitSize = computeSplitSize(blockSize, minSize, maxSize);
    long blockSize = file.getBlockSize();
    long splitSize=computeSplitSize(blockSize,minSize, maxSize);

    }
  protected long getFormatMinSplitSize() {
    return 1;
  }
  public static long getMinSplitSize(JobContext job) {
    return job.getConfiguration().getLong(SPLIT_MINSIZE, 1L);
  }
  public static final String SPLIT_MINSIZE = 
    "mapreduce.input.fileinputformat.split.minsize";
 public static long getMaxSplitSize(JobContext context) {
    return context.getConfiguration().getLong(SPLIT_MAXSIZE, Long.MAX_VALUE);
  }
 public static final String SPLIT_MAXSIZE = 
    "mapreduce.input.fileinputformat.split.maxsize";

 protected long computeSplitSize(long blockSize, long minSize,long maxSize) {
    return Math.max(minSize, Math.min(maxSize, blockSize));
  }

通過原始碼得出切片數splitSize由三個元素決定:

  1. blockSize block大小,hadoop1預設64M,hadoop2預設128M
  2. minSize 最小值,預設是 1,我可以通過FileInputFormat.setMinInputSplitSize(job, size)方法來修改最小值;
  3. maxSize 最大值,預設是MAX_VALUE = 0x7fffffffffffffffL,可以通過FileInputFormat.setMaxInputSplitSize(job, size)修改最大值 
    最後做計算:Math.max(minSize, Math.min(maxSize, blockSize)) 
    預設情況下:Math.max(1, Math.min(0x7fffffffffffffffL, 128))顯然結果為128,也就是說預設情況下有幾個block就有幾個切片,這也是為了提高mapreduce的執行效率。

2.reducer個數 
結論:reducer個數是由partition個數決定。 
mapper產生的中間資料經過shuffer過程,根據我們的業務把資料分成若干partition,每個partition的資料由對應的一個reducer來處理。mapreduce決定partition的是:Partitioner類中的intgetPartition(KEY key, VALUE value, int numPartitions)方法,我們來看下預設的分割槽方法:

public class HashPartitioner<K, V> extends Partitioner<K, V> {

  /** Use {@link Object#hashCode()} to partition. */
  public int getPartition(K key, V value,
                          int numReduceTasks) {
    return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks;
  }

}

由程式碼看出預設的分割槽有兩個元素決定:

  • key的hash值
  • numReduceTasks,需要我們通過job.setNumReduceTasks(reduceNum) 方法來設定的reducer個數 
    最終得出partition個數就是我們設定的個數,比如我們設定job.setNumReduceTasks(3) hashcode除以10的餘數就是0、1、2三個值,預設之所以用key的hash值是為了把資料均勻的分佈到reducer防止資料傾斜。 
    當然了我們可以根據我們自己的業務來繼承Partition類重寫getPartition方法來決定partition數。