1. 程式人生 > >Hadoop中split數量計演算法則(原始碼跟蹤)

Hadoop中split數量計演算法則(原始碼跟蹤)

 

  從前面的文章(MapReduce執行原理【原始碼跟蹤】)我們知道計算切片的部分在JobSubmitter類中,然後我們看此類的Structure(在idea中View->Tool Windows ->Structure)檢視類結構我們很輕易的就能找到有關split的方法

我們可以在writeSplits方法中打一個斷點,隨便執行一個計數程式Debug跟蹤檢視。

這裡給出一下計數程式

WCmapper

 1 package com.qin.MapReduce;
 2 
 3 import org.apache.hadoop.io.IntWritable;
4 import org.apache.hadoop.io.LongWritable; 5 import org.apache.hadoop.io.Text; 6 import org.apache.hadoop.mapreduce.Mapper; 7 8 import java.io.IOException; 9 10 public class WCMapper extends Mapper<LongWritable, Text, Text, IntWritable>{ 11 12 protected void map(LongWritable key, Text value, Context context) throws
IOException, InterruptedException { 13 Text outText = new Text(); 14 IntWritable valueOut = new IntWritable(); 15 String[] split = value.toString().split(" "); 16 for (String str: split ){ 17 outText.set(str); 18 valueOut.set(1); 19 context.write(outText,valueOut);
20 } 21 } 22 23 }

WCreducer

package com.qin.MapReduce;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;


public class WCreducer extends Reducer<Text, IntWritable, Text, IntWritable>{

    protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
        int count = 0;
        for (IntWritable value : values){
            count = value.get() + count;
        }

        context.write(key, new IntWritable(count));
    }
}

WCapp

package com.qin.MapReduce;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

public class WCapp {
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {

        Configuration conf = new Configuration();
        conf.set("fs.defaultFS","file:///");

        Job job = Job.getInstance(conf);
        //Job的各種屬性
        job.setJobName("WCapp");        //設定作業名稱
        job.setJarByClass(WCapp.class); //設定搜尋類
        job.setInputFormatClass(TextInputFormat.class);

        job.setMapperClass(WCMapper.class);
        job.setReducerClass(WCreducer.class);

        job.setNumReduceTasks(1);

        //新增輸入路徑
        FileInputFormat.addInputPath(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job,new Path(args[1]));

        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);

        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
        job.waitForCompletion(true);   //是否打印出詳細資訊

    }
}

Debug執行以後,Step over到 maps = this.writeNewSplits(job, jobSubmitDir);的地方時

 

我們Step Into進去看看

多次Step over到 List splits = input.getSplits(job);我們再次Step into

這裡我們看見minSize是取兩個引數中最大值。我們通過滑鼠放在某個引數上Alt+滑鼠左鍵看屬性的詳細定義,知道this.getFormatMinSplitSize()的值為1,

getMinSplitSize(job)是得到配置檔案mapred-default.xml中的mapreduce.input.fileinputformat.split.minsize的值

預設配置圖如下

所以minSize的值為1

 然後我們在step over 然後step into到getMaxSplitSize(job)中

我們很容易就知道maxSize是long型的最大值

maxSize=9223372036854775807L

繼續向下看

這裡我們知道blockSize1在得到塊大小。

blockSize1 = 33554432

進入到this.computeSplitSize()中看它是如何計算得到splitSize的

blockSize1 maxSize blockSize我們都得到了

一分析computeSplitSize方法,我們知道得到的是三個值的中間值

總結:預設情況下,切片大小跟塊大小是一樣大

 切片大小跟塊大小一樣的好處:

  如果我們定義splitSize是1M,那麼一塊128M,切成128個split,分發到網路上128個結點同時執行(可以一個結點執行多個切片,但是叢集併發情況下,負載均衡,系統會自動分發給其它結點),浪費時間與資源。

  如果我的splitSize和塊大小相同,直接就在本結點上運行了(nodemanage的本地優先策略)。