hadoop輸入分片計算(Map Task個數的確定)
1 public List<InputSplit> getSplits(JobContext job
2 ) throws IOException {
3 //getFormatMinSplitSize():始終返回1
4 //getMinSplitSize(job):獲取” mapred.min.split.size”的值,預設為1
5 long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));
6
7 //getMaxSplitSize(job):獲取"mapred.max.split.size"的值,
8 //預設配置檔案中並沒有這一項,所以其預設值為” Long.MAX_VALUE”,即2^63 – 1
9 long maxSize = getMaxSplitSize(job);
10
11 // generate splits
12 List<InputSplit> splits = new ArrayList<InputSplit>();
13 List<FileStatus>files = listStatus(job);
14 for (FileStatus file: files) {
15 Path path = file.getPath();
16 FileSystem fs = path.getFileSystem(job.getConfiguration());
17 long length = file.getLen();
18 BlockLocation[] blkLocations = fs.getFileBlockLocations(file, 0, length);
19 if ((length != 0) && isSplitable(job, path)) {
20 long blockSize = file.getBlockSize();
21 //計算split大小
22 long splitSize = computeSplitSize(blockSize, minSize, maxSize);
23
24 //計算split個數
25 long bytesRemaining = length; //bytesRemaining表示剩餘位元組數
26 while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) { //SPLIT_SLOP=1.1
27 int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
28 splits.add(new FileSplit(path, length-bytesRemaining, splitSize,
29 blkLocations[blkIndex].getHosts()));
30 bytesRemaining -= splitSize;
31 }
32
33 if (bytesRemaining != 0) {
34 splits.add(new FileSplit(path, length-bytesRemaining, bytesRemaining,
35 blkLocations[blkLocations.length-1].getHosts()));
36 }
37 } else if (length != 0) {
38 splits.add(new FileSplit(path, 0, length, blkLocations[0].getHosts()));
39 } else {
40 //Create empty hosts array for zero length files
41 splits.add(new FileSplit(path, 0, length, new String[0]));
42 }
43 }
44
45 // Save the number of input files in the job-conf
46 job.getConfiguration().setLong(NUM_INPUT_FILES, files.size());
47
48 LOG.debug("Total # of splits: " + splits.size());
49 return splits;
50 }