(三)hadoop中FileInputFormat類的getSplits獲取InputSplit的過程
FileInputFormat繼承了抽象類InputFormat,來看一下InputFormat的原始碼:
public abstract class InputFormat<K, V> {
public abstract
List<InputSplit> getSplits(JobContext context) throws IOException, InterruptedException;
public abstract
RecordReader<K,V> createRecordReader(InputSplit split,TaskAttemptContext context) throws IOException, InterruptedException;
}
InputFormat 主要用於描述輸入資料的格式, 它提供以下兩個功能。
(1)資料切分 : 按照某個策略將輸入資料切分成若干個 InputSplit, 以便確定 Map Task 個數。對應的就是getSplits方法。
(2)為 Mapper 提供輸入資料: 給定某個 InputSplit, 能將其解析成一個個 key/value 對。對應的就是createRecordReader方法。
getSplits 方法主要完成資料切分的功能, 它會嘗試著將輸入資料切分成 InputSplit,並放入集合List中返回。
InputSplit有以下特點:
(1)邏輯分片 : 它只是在邏輯上對輸入資料進行分片, 並不會在磁碟上將其切分成分片進行儲存。 InputSplit 只記錄了分片的元資料資訊, 比如起始位置、 長度以及所在的節點列表等。
(2)可序列化: 在 Hadoop 中, 物件序列化主要有兩個作用: 程序間通訊和永久儲存。 此處, InputSplit 支援序列化操作主要是為了程序間通訊。 作業被提交到 JobTracker 之前, Client 會呼叫作業 InputFormat 中的 getSplits 函式, 並將得到的 InputSplit 序列
化到檔案中。 這樣, 當作業提交到 JobTracker 端對作業初始化時, 可直接讀取該檔案, 解析出所有 InputSplit, 並建立對應的 Map Task。而createRecordReader則根據InputSplit ,將其解析成一個個 key/value 對。
現在再來看FileInputFormat中對這兩個方法的具體實現。
先來看FileInputFormat的定義:
public abstract class FileInputFormat<K, V> extends InputFormat<K, V> {}
FileInputFormat也是一個抽象類,繼承了InputFormat這個抽象類。
再來看看FileInputFormat關於getSplits的實現,原始碼如下:
public List<InputSplit> getSplits(JobContext job
) throws IOException {
long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));
long maxSize = getMaxSplitSize(job);
// generate splits
List<InputSplit> splits = new ArrayList<InputSplit>();
List<FileStatus>files = listStatus(job);
for (FileStatus file: files) {
Path path = file.getPath();
FileSystem fs = path.getFileSystem(job.getConfiguration());
long length = file.getLen();
BlockLocation[] blkLocations = fs.getFileBlockLocations(file, 0, length);
if ((length != 0) && isSplitable(job, path)) {
long blockSize = file.getBlockSize();
long splitSize = computeSplitSize(blockSize, minSize, maxSize);
long bytesRemaining = length;
while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
splits.add(new FileSplit(path, length-bytesRemaining, splitSize,
blkLocations[blkIndex].getHosts()));
bytesRemaining -= splitSize;
}
if (bytesRemaining != 0) {
splits.add(new FileSplit(path, length-bytesRemaining, bytesRemaining,
blkLocations[blkLocations.length-1].getHosts()));
}
} else if (length != 0) {
splits.add(new FileSplit(path, 0, length, blkLocations[0].getHosts()));
} else {
//Create empty hosts array for zero length files
splits.add(new FileSplit(path, 0, length, new String[0]));
}
}
我們來一塊一塊的分析,
long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));
long maxSize = getMaxSplitSize(job);
不難理解,這裡是獲取InputSplit的size的最小值和最大值,最小值minSize是通過取getFormatMinSplitSize()和getMinSplitSize(job))中的較大的值
getFormatMinSplitSize方法的原始碼如下:
protected long getFormatMinSplitSize() {
return 1;
}
直接返回1,單位是B.
而getMinSplitSize方法的原始碼如下:
public static long getMinSplitSize(JobContext job) {
return job.getConfiguration().getLong("mapred.min.split.size", 1L);
}
返回的是從配置檔案中讀取“mapred.min.split.size”屬性的value值,“mapred.min.split.size”是需要使用者自己新增配置的,配置在mapred-site.xml檔案中。
這樣一來,minSize的值取決於使用者配置的mapred.min.split.size和1B中的較大值。
maxSize的大小是由getMaxSplitSize方法確定的,原始碼如下:
public static long getMaxSplitSize(JobContext context) {
return context.getConfiguration().getLong("mapred.max.split.size",Long.MAX_VALUE);
}
若“mapred.max.split.size”屬性值讀取不到,則返回Long.MAX_VALUE,否則返回“mapred.max.split.size”屬性的值。
getSplits方法中有一句程式碼值得關注:
long splitSize = computeSplitSize(blockSize, minSize, maxSize);
這裡是確定了InputSplit的大小,computeSplitSize方法原始碼如下:
protected long computeSplitSize(long blockSize, long minSize,long maxSize) {
return Math.max(minSize, Math.min(maxSize, blockSize));
}
上面說過,minSize的值取決於使用者配置的mapred.min.split.size和1B中的較大值。 maxSize的值取決於使用者配置的mapred.max.split.size和Long.MAX_VALUE中的較大值。blockSize則是HDFS的預設塊大小。
獲取到splitSize 後,檔案將被切分成大小為splitSize的InputSplit,最後剩下不足splitSize的資料塊單獨成為一個InputSplit。
那接下來毫無疑問就是按照splitSize 來切分檔案了(邏輯上的切分)。
再看getSplits方法的程式碼塊:
long bytesRemaining = length;
while (((double) bytesRemaining)/splitSize >SPLIT_SLOP) {
int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
splits.add(new FileSplit(path, length-bytesRemaining, splitSize, blkLocations[blkIndex].getHosts()));
bytesRemaining -= splitSize;
}
if (bytesRemaining != 0) {
splits.add(new FileSplit(path, length-bytesRemaining, bytesRemaining,
blkLocations[blkLocations.length-1].getHosts()));
}
這裡就是切分的核心程式碼了,bytesRemaining 表示的是切分後,剩餘的待切分的檔案大小,初始值就是檔案大小【length】,splitSize就是InputSplit的大小,SPLIT_SLOP是一個常量值,定義如下:
private static final double SPLIT_SLOP = 1.1;//10% slop
意思就是當剩餘檔案大小bytesRemaining與splitSize的比值還大於1.1的時候,就繼續切分,否則,剩下的直接作為一個InputSplit。
敲黑板,劃重點:並不一定非得bytesRemaining小於splitSize才停止劃分哦,只要bytesRemaining/splitSize<=1.1就會停止劃分,將剩下的作為一個InputSplit
我們還可以看到,
splits.add(new FileSplit(path, length-bytesRemaining, splitSize,blkLocations[blkIndex].getHosts()));
這裡四個引數表示的意思是該InputSplit所在的(路徑,起始位置,大小,所在的 host(節點) 列表)
以上就知道getSplits獲取InputSplit的過程。