Hadoop中split數量計演算法則(原始碼跟蹤)
從前面的文章(MapReduce執行原理【原始碼跟蹤】)我們知道計算切片的部分在JobSubmitter類中,然後我們看此類的Structure(在idea中View->Tool Windows ->Structure)檢視類結構我們很輕易的就能找到有關split的方法
我們可以在writeSplits方法中打一個斷點,隨便執行一個計數程式Debug跟蹤檢視。
這裡給出一下計數程式
WCmapper
1 package com.qin.MapReduce;
2
3 import org.apache.hadoop.io.IntWritable;
4 import org.apache.hadoop.io.LongWritable;
5 import org.apache.hadoop.io.Text;
6 import org.apache.hadoop.mapreduce.Mapper;
7
8 import java.io.IOException;
9
10 public class WCMapper extends Mapper<LongWritable, Text, Text, IntWritable>{
11
12 protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
13 Text outText = new Text();
14 IntWritable valueOut = new IntWritable();
15 String[] split = value.toString().split(" ");
16 for (String str: split ){
17 outText.set(str);
18 valueOut.set(1);
19 context.write(outText,valueOut);
20 }
21 }
22
23 }
WCreducer
package com.qin.MapReduce;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
public class WCreducer extends Reducer<Text, IntWritable, Text, IntWritable>{
protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
int count = 0;
for (IntWritable value : values){
count = value.get() + count;
}
context.write(key, new IntWritable(count));
}
}
WCapp
package com.qin.MapReduce;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
public class WCapp {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
Configuration conf = new Configuration();
conf.set("fs.defaultFS","file:///");
Job job = Job.getInstance(conf);
//Job的各種屬性
job.setJobName("WCapp"); //設定作業名稱
job.setJarByClass(WCapp.class); //設定搜尋類
job.setInputFormatClass(TextInputFormat.class);
job.setMapperClass(WCMapper.class);
job.setReducerClass(WCreducer.class);
job.setNumReduceTasks(1);
//新增輸入路徑
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job,new Path(args[1]));
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
job.waitForCompletion(true); //是否打印出詳細資訊
}
}
Debug執行以後,Step over到 maps = this.writeNewSplits(job, jobSubmitDir);的地方時
我們Step Into進去看看
多次Step over到 List splits = input.getSplits(job);我們再次Step into
這裡我們看見minSize是取兩個引數中最大值。我們通過滑鼠放在某個引數上Alt+滑鼠左鍵看屬性的詳細定義,知道this.getFormatMinSplitSize()的值為1,
getMinSplitSize(job)是得到配置檔案mapred-default.xml中的mapreduce.input.fileinputformat.split.minsize的值
預設配置圖如下
所以minSize的值為1
然後我們在step over 然後step into到getMaxSplitSize(job)中
我們很容易就知道maxSize是long型的最大值
maxSize=9223372036854775807L
繼續向下看
這裡我們知道blockSize1在得到塊大小。
blockSize1 = 33554432
進入到this.computeSplitSize()中看它是如何計算得到splitSize的
blockSize1 maxSize blockSize我們都得到了
一分析computeSplitSize方法,我們知道得到的是三個值的中間值。
總結:預設情況下,切片大小跟塊大小是一樣大
切片大小跟塊大小一樣的好處:
如果我們定義splitSize是1M,那麼一塊128M,切成128個split,分發到網路上128個結點同時執行(可以一個結點執行多個切片,但是叢集併發情況下,負載均衡,系統會自動分發給其它結點),浪費時間與資源。
如果我的splitSize和塊大小相同,直接就在本結點上運行了(nodemanage的本地優先策略)。