Hadoop舊mapreduce的map任務切分原理
阿新 • • 發佈:2019-02-03
前言
最近在工作過程中接觸一些Hive資料倉庫中的表,這些表實際是從關係型資料庫通過Sqoop抽到Hive的。在開發過程中對map任務的劃分進行效能調優,發現mapreduce中關於FileInputFormat的引數調整都不起作用,最後發現這些老任務都是用舊版的mapreduce開發的,於是順便研究下舊版mapreduce的任務劃分策略。有關新版mapreduce的任務劃分策略,大家可以參考我之前的博文《Hadoop2.6.0的FileInputFormat的任務切分原理分析(即如何控制FileInputFormat的map任務數量)》。
原始碼分析
public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException { Stopwatch sw = new Stopwatch().start(); FileStatus[] files = listStatus(job); // Save the number of input files for metrics/loadgen job.setLong(NUM_INPUT_FILES, files.length); long totalSize = 0; // compute total size for (FileStatus file: files) { // check we have valid files if (file.isDirectory()) { throw new IOException("Not a file: "+ file.getPath()); } totalSize += file.getLen(); } long goalSize = totalSize / (numSplits == 0 ? 1 : numSplits); long minSize = Math.max(job.getLong(org.apache.hadoop.mapreduce.lib.input. FileInputFormat.SPLIT_MINSIZE, 1), minSplitSize); // generate splits ArrayList<FileSplit> splits = new ArrayList<FileSplit>(numSplits); NetworkTopology clusterMap = new NetworkTopology(); for (FileStatus file: files) { Path path = file.getPath(); long length = file.getLen(); if (length != 0) { FileSystem fs = path.getFileSystem(job); BlockLocation[] blkLocations; if (file instanceof LocatedFileStatus) { blkLocations = ((LocatedFileStatus) file).getBlockLocations(); } else { blkLocations = fs.getFileBlockLocations(file, 0, length); } if (isSplitable(fs, path)) { long blockSize = file.getBlockSize(); long splitSize = computeSplitSize(goalSize, minSize, blockSize); long bytesRemaining = length; while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) { String[][] splitHosts = getSplitHostsAndCachedHosts(blkLocations, length-bytesRemaining, splitSize, clusterMap); splits.add(makeSplit(path, length-bytesRemaining, splitSize, splitHosts[0], splitHosts[1])); bytesRemaining -= splitSize; } if (bytesRemaining != 0) { String[][] splitHosts = getSplitHostsAndCachedHosts(blkLocations, length - bytesRemaining, bytesRemaining, clusterMap); splits.add(makeSplit(path, length - bytesRemaining, bytesRemaining, splitHosts[0], splitHosts[1])); } } else { String[][] splitHosts = getSplitHostsAndCachedHosts(blkLocations,0,length,clusterMap); splits.add(makeSplit(path, 0, length, splitHosts[0], splitHosts[1])); } } else { //Create empty hosts array for zero length files splits.add(makeSplit(path, 0, length, new String[0])); } } sw.stop(); if (LOG.isDebugEnabled()) { LOG.debug("Total # of splits generated by getSplits: " + splits.size() + ", TimeTaken: " + sw.elapsedMillis()); } return splits.toArray(new FileSplit[splits.size()]); } protected long computeSplitSize(long goalSize, long minSize, long blockSize) { return Math.max(minSize, Math.min(goalSize, blockSize)); }
這裡對以上程式碼的劃分策略進行整理:
- 遍歷當前作業的所有輸入檔案,然後將累積這些檔案的位元組數並儲存到變數totalSize中;
- 如果使用者指定了mapreduce.job.maps引數,那麼這個引數會被儲存在入參numSplits中;
- 使用者想要通過numSplits控制map任務的數量,那麼需求對totalSize進行平分,以便確定每個map任務劃分的輸入大小。這個計算很簡單,即使用totalSize除以numSplits,最後得到的目標劃分大小儲存在變數goalSize中;
- 常量SPLIT_MINSIZE實際是由引數mapreduce.input.fileinputformat.split.minsize來控制的,如果沒有配置則預設是1。minSplitSize預設是1,切舊版FileIntputFormat沒有設定此變數的地方。最後取SPLIT_MINSIZE和minSplitSize的最大值,並儲存在變數minSize中;
- 遍歷當前作業的每個輸入檔案,計算每個輸入檔案,將被劃分的任務數量,最後將每個檔案劃分的任務數量合併起來就是整個作業劃分的任務數量。
- 判斷檔案的大小,只有檔案位元組數大於0才是有意義的;
- 判斷檔案是否是可以切分的,只有能夠切分的檔案才會繼續進行任務數量劃分;
- 呼叫檔案的getBlockSize方法,獲取檔案的塊大小並存儲在變數blockSize中;
- 呼叫computeSplitSize方法計算最後劃分給每個任務的輸入大小,並儲存在splitSize中。計算公式為:splitSize = max(minSize, min(goalSize, blockSize));
- 將檔案按照splitSize的大小進行劃分,不足splitSize大小的也算作一個任務劃分數。