1. 程式人生 > >hadoop輸入分片計算(Map Task個數的確定)

hadoop輸入分片計算(Map Task個數的確定)

1 public List<InputSplit> getSplits(JobContext job

2 ) throws IOException {

3 //getFormatMinSplitSize():始終返回1

4 //getMinSplitSize(job):獲取” mapred.min.split.size”的值,預設為1

5 long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));

6

7 //getMaxSplitSize(job):獲取"mapred.max.split.size"的值,

8 //預設配置檔案中並沒有這一項,所以其預設值為” Long.MAX_VALUE”,即2^63 – 1

9 long maxSize = getMaxSplitSize(job);

10

11 // generate splits

12 List<InputSplit> splits = new ArrayList<InputSplit>();

13 List<FileStatus>files = listStatus(job);

14 for (FileStatus file: files) {

15 Path path = file.getPath();

16 FileSystem fs = path.getFileSystem(job.getConfiguration());

17 long length = file.getLen();

18 BlockLocation[] blkLocations = fs.getFileBlockLocations(file, 0, length);

19 if ((length != 0) && isSplitable(job, path)) {

20 long blockSize = file.getBlockSize();

21 //計算split大小

22 long splitSize = computeSplitSize(blockSize, minSize, maxSize);

23

24 //計算split個數

25 long bytesRemaining = length; //bytesRemaining表示剩餘位元組數

26 while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) { //SPLIT_SLOP=1.1

27 int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);

28 splits.add(new FileSplit(path, length-bytesRemaining, splitSize,

29 blkLocations[blkIndex].getHosts()));

30 bytesRemaining -= splitSize;

31 }

32

33 if (bytesRemaining != 0) {

34 splits.add(new FileSplit(path, length-bytesRemaining, bytesRemaining,

35 blkLocations[blkLocations.length-1].getHosts()));

36 }

37 } else if (length != 0) {

38 splits.add(new FileSplit(path, 0, length, blkLocations[0].getHosts()));

39 } else {

40 //Create empty hosts array for zero length files

41 splits.add(new FileSplit(path, 0, length, new String[0]));

42 }

43 }

44

45 // Save the number of input files in the job-conf

46 job.getConfiguration().setLong(NUM_INPUT_FILES, files.size());

47

48 LOG.debug("Total # of splits: " + splits.size());

49 return splits;

50 }