1. 程式人生 > >考究Hadoop中split的計算方法

考究Hadoop中split的計算方法

Hadoop中block塊大小和split切片大小會影響到MapReduce程式在執行過程中的效率、map的個數。在本文中,以經典入門案例WordCount為例,通過debug的方式跟蹤原始碼,來分析hadoop中split的計算方法。

前期準備

wc.txt的準備
單詞統計,以空格為分割符對單詞進行切割。因此wc.txt中的單詞都以空格進行分割
mr核心部分介紹
map
提取每一行,String[] arr = line.split(” “);

for(String word : arr){
    context.write(new Text(word),new IntWritable(1
)); }

reduce
對每一個key的個數進行迭代,計算總數。

修改HDFS的檔案塊大小

hdfs-default.xml
首先檢視預設的配置資訊
檔案塊的大小

<property>
  <name>dfs.blocksize</name>
  <value>134217728</value>
  <description>
      The default block size for new files, in bytes.
      You can use the following suffix (case
insensitive): k(kilo), m(mega), g(giga), t(tera), p(peta), e(exa) to specify the size (such as 128k, 512m, 1g, etc.), Or provide complete size in bytes (such as 134217728 for 128 MB). </description> </property>

最小檔案塊的大小

<property>
  <name>dfs.namenode.fs-limits.min
-block-size</name> <value>1048576</value> <description>Minimum block size in bytes, enforced by the Namenode at create time. This prevents the accidental creation of files with tiny block sizes (and thus many blocks), which can degrade performance.</description> </property>

hdfs-site.xml
瞭解預設的配置資訊之後,對其進行修改

<property>
  <name>dfs.blocksize</name>
  <value>512</value>   //塊大小需要能夠整除校驗和   需要符合校驗和的倍數要求(切記!!必須為512的倍數)
</property>
<property>
  <name>dfs.namenode.fs-limits.min-block-size</name>
  <value>10</value>
</property>

通過命令列檢視屬性是否修改成功

$>hdfs getconf -confKey dfs.blocksize
$>hdfs getconf -confKey dfs.namenode.fs-limits.min-block-size

關鍵部分原始碼解析

對所涉及到的關鍵部分,進行解析
org.apache.hadoop.mapreduce.JobSubmitter類

  private int writeSplits(org.apache.hadoop.mapreduce.JobContext job,
      Path jobSubmitDir) throws IOException,
      InterruptedException, ClassNotFoundException {
    // 得到配置資訊
    JobConf jConf = (JobConf)job.getConfiguration();
    int maps;
    // 判斷是否是新的Mapper
    if (jConf.getUseNewMapper()) {
      // 計算新的切片資訊
      // 作業       job:              com.sun.jdi.InvocationException occurred invoking method.
      // 作業提交目錄 jobSubmitDir:       /tmp/hadoop-yarn/staging/zhaotao/.staging/job_1496761683234_0001
      maps = writeNewSplits(job, jobSubmitDir);
    } else {
      maps = writeOldSplits(jConf, jobSubmitDir);
    }
    return maps;
  }
  ...
  private <T extends InputSplit>
  int writeNewSplits(JobContext job, Path jobSubmitDir) throws IOException,
      InterruptedException, ClassNotFoundException {
    // 得到配置資訊
    Configuration conf = job.getConfiguration();
    // 得到輸入格式
    InputFormat<?, ?> input =
      ReflectionUtils.newInstance(job.getInputFormatClass(), conf);

    // 得到splits切片的集合,具體的值為:
    //  [hdfs://s100/data/wc.txt:0+150, 
        hdfs://s100/data/wc.txt:150+150, 
        hdfs://s100/data/wc.txt:300+150, 
        hdfs://s100/data/wc.txt:450+150, 
        hdfs://s100/data/wc.txt:600+158]
        總共切了5個
    List<InputSplit> splits = input.getSplits(job);
    T[] array = (T[]) splits.toArray(new InputSplit[splits.size()]);

    // sort the splits into order based on size, so that the biggest
    // go first
    Arrays.sort(array, new SplitComparator());
    JobSplitWriter.createSplitFiles(jobSubmitDir, conf, 
        jobSubmitDir.getFileSystem(conf), array);
    return array.length;
  }

org.apache.hadoop.mapreduce.lib.input.FileInputFormat類

  public List<InputSplit> getSplits(JobContext job) throws IOException {
    StopWatch sw = new StopWatch().start();
    // 通過getFormatMinSplitSize()方法  取得最小切片的大小為一個固定值(可通過配置檔案配置得到)為0
    // 通過getMinSplitSize()方法 得到最小切片值為1
    // minSize的值為1
    long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));
    // maxSize的值為:9223372036854775807
    long maxSize = getMaxSplitSize(job);

    // generate splits
    // 建立一個集合
    List<InputSplit> splits = new ArrayList<InputSplit>();
    //  得到hdfs的資料夾列表
    List<FileStatus> files = listStatus(job);
    // 得到每個檔案的狀態,進行for迴圈
    for (FileStatus file: files) {
      // 取到檔案的路徑
      Path path = file.getPath();
      // 取到檔案的長度
      long length = file.getLen();
      if (length != 0) {
        // 檔案塊位置的陣列
        BlockLocation[] blkLocations;
        if (file instanceof LocatedFileStatus) {
          // 通過getBlockLocations()方法得到檔案塊的位置
          // blkLocations的值:[0,512,s103,s101,s104, 512,246,s103,s104,s102]  為兩塊檔案塊
          blkLocations = ((LocatedFileStatus) file).getBlockLocations();
        } else {
          FileSystem fs = path.getFileSystem(job.getConfiguration());
          blkLocations = fs.getFileBlockLocations(file, 0, length);
        }
        // 判斷文字檔案是否可切割
        if (isSplitable(job, path)) {
          // 得到塊大小
          // blockSize的值為512
          // 切片大小與塊大小、最小切片值、最大切片值有關
          long blockSize = file.getBlockSize();
          // 計算切片大小
          // blockSize=512;minSize=1;maxSize=9223372036854775807
          // splitSize的值為512
          long splitSize = computeSplitSize(blockSize, minSize, maxSize);
    ...
    // 最終返回的值為一個切片集合
    // 具體的值為:
        [hdfs://s100/data/wc.txt:0+150, 
        hdfs://s100/data/wc.txt:150+150, 
        hdfs://s100/data/wc.txt:300+150, 
        hdfs://s100/data/wc.txt:450+150, 
        hdfs://s100/data/wc.txt:600+158]
        總共切了5return splits;
  ...
  // 得到切片大小的一個下限值,返回的是一個最小切片的位元組數
  protected long getFormatMinSplitSize() {
    return 1;
  }
  ...
  public static long getMinSplitSize(JobContext job) {
    // SPLIT_MINSIZE = mapreduce.input.fileinputformat.split.minsize(該配置項的預設值為0)
    // job.getConfiguration().getLong(SPLIT_MINSIZE, 1L) = 0
    return job.getConfiguration().getLong(SPLIT_MINSIZE, 1L);
  }
  ...
  public static long getMaxSplitSize(JobContext context) {
    // SPLIT_MAXSIZE = mapreduce.input.fileinputformat.split.maxsize
    // Long.MAX_VALUE的意思為取長整型的最大值9223372036854775807
    // context.getConfiguration().getLong(SPLIT_MAXSIZE,Long.MAX_VALUE)的值為9223372036854775807
    return context.getConfiguration().getLong(SPLIT_MAXSIZE, 
                                              Long.MAX_VALUE);
  }
  ...
  protected long computeSplitSize(long blockSize, long minSize,
                                  long maxSize) {
    // 先在maxSize與blockSize之間選擇小的,再使用該值與minSize之間取得大的值
    // 就是取了三個值之間的中間值
    return Math.max(minSize, Math.min(maxSize, blockSize));
  }

split計算方法

通過對原始碼的跟蹤,將split的計算方法總結如下:
minSize = 1(預設) (屬性名字:mapreduce.input.fileinputformat.split.minsize)
maxSize = Long.MAX_VALUE(預設) (屬性名字:mapreduce.input.fileinputformat.split.maxsize)
blockSize = 512 (屬性名字:dfs.blocksize=128mb預設值)
計算方法:max(minSize,min(blockSize,maxSize))
根據上述資料,所以該過程中的切片大小為512