1. 程式人生 > >MapReduce原始碼分析之InputSplit分析

MapReduce原始碼分析之InputSplit分析

前言

MapReduce的原始碼分析是基於Hadoop1.2.1基礎上進行的程式碼分析。

什麼是InputSplit

       InputSplit是指分片,在MapReduce當中作業中,作為map task最小輸入單位。分片是基於檔案基礎上出來的而來的概念,通俗的理解一個檔案可以切分為多少個片段,每個片段包括了<檔名,開始位置,長度,位於哪些主機>等資訊。在MapTask拿到這些分片後,會知道從哪開始讀取資料。

Job提交時如何獲取到InputSplit
       以org.apache.hadoop.mapred包中的FileInputFormat為例(因為該類作為其他檔案型別的基類),內部實現瞭如何獲取分片,通過分析程式碼,以便知曉檔案是如何被切片的。

 
 public InputSplit[] getSplits(JobConf job, int numSplits)
   throwsIOException {
    //獲取檔案列表的狀態,底層通過HDFS客戶端的//DistributedFileSystem.getFileStatus獲取到檔案的狀態(檔案長度,訪問時間,許可權,塊大小,副本數等資訊)
   FileStatus[] files = listStatus(job);
   
   // 儲存輸入的檔案的檔案個數
   job.setLong(NUM_INPUT_FILES, files.length);
    //計算所有檔案的總長度
   longtotalSize = 0;                           // compute total size
   for(FileStatus file: files) {               // check we have valid files
     if(file.isDir()) {
        throw new IOException("Not a file: "+ file.getPath());
     }
     totalSize += file.getLen();
   }
    
    // 計算出目標長度,通過總長度和使用者指定的map task的個數相除得到
   longgoalSize = totalSize / (numSplits == 0 ? 1 : numSplits);
   // 獲取使用者配置檔案中指定的最小split的長度,預設為1,如果不希望按預設計算出的大//小進行分片,則可以指定最小切分的大小,當這個值大於計算出的分片大小,則會以此為準。
   longminSize = Math.max(job.getLong("mapred.min.split.size", 1),
                            minSplitSize);
 
   // 儲存後續生成的split
   ArrayList<FileSplit> splits = new ArrayList<FileSplit>(numSplits);
   NetworkTopology clusterMap = new NetworkTopology();
 
    //對每個檔案進行切片
   for(FileStatus file: files) {
     Path path = file.getPath();
     FileSystem fs = path.getFileSystem(job);
     longlength = file.getLen();
      // 獲取到整個檔案的所有block的位置資訊
     BlockLocation[] blkLocations = fs.getFileBlockLocations(file, 0,length);
     // 檔案長度不為0,且能被切分(二進位制檔案總是不允許切分)
     if((length != 0) && isSplitable(fs, path)) {
        long blockSize = file.getBlockSize();
        //計算出當前檔案需要按多長作為當前該檔案切分的單位(一般為blockSize,當map task指定的多,則為goalSize,這需要按具體的引數)
        long splitSize = computeSplitSize(goalSize,minSize, blockSize);
 
        long bytesRemaining = length;
        //迴圈按分片大小取出一個個分片
        while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
        //獲取分片所在的主機列表,這裡會涉及到如何計算本地化,這在後面會拿出來分析
          String[] splitHosts =getSplitHosts(blkLocations,
              length-bytesRemaining, splitSize,clusterMap);
          splits.add(new FileSplit(path,length-bytesRemaining, splitSize,
              splitHosts));
          bytesRemaining -= splitSize;
        }
       
        //對尾部不足一個分片大小的也生成一個分片
        if (bytesRemaining != 0) {
          splits.add(new FileSplit(path,length-bytesRemaining, bytesRemaining,
                     blkLocations[blkLocations.length-1].getHosts()));
        }
     } elseif(length != 0) {
        // 不允許被切分的檔案,不會因為檔案大小而去計算需要佔用幾個分片
        String[] splitHosts = getSplitHosts(blkLocations,0,length,clusterMap);
        splits.add(new FileSplit(path, 0, length, splitHosts));
     } else{
        //檔案長度為空的也會產生一個分片
        //Create empty hosts array for zero length files
        splits.add(new FileSplit(path, 0, length, new String[0]));
     }
   }
   LOG.debug("Total # of splits: "+ splits.size());
   returnsplits.toArray(newFileSplit[splits.size()]);
 }

       通過上述分析,可以知道我們指定一個目錄作為job的輸入源時,使用者指定的MapTask的個數,以及檔案總長度,塊大小,以及使用者指定的最小分片長度會影響到最後可以產生多少個分片,也就是這個Job最後需要執行多少次MapTask。

       同時,還可以得知,一個分片是不會跨越兩個檔案的;一個空的檔案也會佔用到一個分片;不是每個分片都是等長的;以及一個分片可以跨一個大檔案中連續的多個block。

主機列表是什麼,如何選擇

       InputSplit作為一個分片,所包含的的資訊中有主機列表這一資訊,這不是說這個分片就在這個主機列表上,這是錯誤的理解。主機列表是指做task的時候,JobTracker會把Task傳送到主機列表所在的節點上,由該節點來執行task。

       在上面我們已經得出過結論“一個分片可以有多個block”,那麼這種這情況下,主機列表就不會覆蓋所有block所對應的主機資訊,而是根據一種演算法來:通過將機架和資料節點引入進來,形成網路拓撲;機架對應的資訊中會儲存這個機架有這個分片的多少資料量,資料節點對應的節點資訊中會儲存這個節點有這個分片的多少資料量。根據機架和資料節點這兩個資訊來排序,會選擇出機架列表裡包含的了最多資料量的機架,在該機架內選擇包含了最多的資料量的資料節點。如果第一個機架的主機列表數量不夠,則再從第二個機架內選擇資料節點。通過這種形式來選擇出最合理的主機列表資訊。

       另外對應的,如果一個分片只包含一個block,那麼就沒有上述這麼複雜的情況,只要將這個塊對應的資訊(BlockLocation)中的主機列表資訊返回即可。

       下面我們來實際分析程式碼,會通過註釋來解釋關鍵的步驟。

protected String[] getSplitHosts(BlockLocation[] blkLocations,
     longoffset, longsplitSize, NetworkTopology clusterMap)
 throwsIOException {
   
    // 通過指定的偏移來確定在偏移是落在了第幾個Block上
   intstartIndex = getBlockIndex(blkLocations, offset);
   
    // 計算出當前這個Block從偏移開始到塊結束還有多少資料量
   longbytesInThisBlock = blkLocations[startIndex].getOffset() +
                         blkLocations[startIndex].getLength() - offset;
 
    // 如果這個塊的剩餘的資料量是大於一個分片的長度的,
    // 則直接返回這個block所對應的主機列表。也就是一個分片不足一個block的情況
   //If this is the only block, just return
   if(bytesInThisBlock >= splitSize) {
     returnblkLocations[startIndex].getHosts();
   }
   
    // 否則,說明了這個分片還會包含其他的block,因此需要算出除當前塊外的分片長度
   longbytesInFirstBlock = bytesInThisBlock;
   intindex = startIndex + 1;
   splitSize -= bytesInThisBlock;
   
    // 計算出在最後一個塊做這個分片佔了多少長度的資料量。
   while(splitSize > 0) {
     bytesInThisBlock =
        Math.min(splitSize,blkLocations[index++].getLength());
     splitSize -= bytesInThisBlock;
   }
 
   longbytesInLastBlock = bytesInThisBlock;
   intendIndex = index - 1;
   
    //這是兩個核心的結果,用於記錄網路拓撲資訊
    //Node用來表示節點(如資料節點,機架)
    //NodeInfo用來表示節點的資訊,包含(葉子節點列表,blockId列表,資料長度)
    //hostsMap會記錄資料節點(簡稱節點,即Datanode)到對應的節點資訊的關係
    //在hostsMap記錄的value中會記錄資料節點包含了這個分片中的多少個塊索引
    //以及包含的這些block有多少資料是在這個分片中的。
    //racksMap會記錄機架到這個機架資訊,在racksMap中會記錄包括上述的資料節點
    //所包含的的資訊之外,還記錄了有哪些資料節點屬於這個機架
   Map <Node,NodeInfo> hostsMap = new IdentityHashMap<Node,NodeInfo>();
   Map <Node,NodeInfo> racksMap = new IdentityHashMap<Node,NodeInfo>();
   String [] allTopos = new String[0];
 
   // Build the hierarchy and aggregate thecontribution of
   // bytes at each level. SeeTestGetSplitHosts.java  
    // 遍歷這個分片所包含的的block,將block的拓撲資訊和資料長度資訊記錄到
    // hostsMap和racksMap中
   for(index = startIndex; index <= endIndex; index++) {
   
      // 確認block有多少資料是屬於當前這個分片的
     // Establish the bytes in this block
     if(index == startIndex) {
        bytesInThisBlock = bytesInFirstBlock;
     }
     elseif(index == endIndex) {
        bytesInThisBlock = bytesInLastBlock;
     }
     else{
        bytesInThisBlock =blkLocations[index].getLength();
     }
       
      // 獲取block的拓撲資訊,取得拓撲的路徑
      // 如["/rack1/node1","/rack1/node2","/rack2/node3"]
     allTopos = blkLocations[index].getTopologyPaths();
 
     // If no topology information is available,just
     // prefix a fakeRack
     if(allTopos.length== 0) {
        allTopos = fakeRacks(blkLocations,index);
     }
 
     // NOTE: This code currently works only forone level of
     // hierarchy (rack/host). However, it isrelatively easy
     // to extend this to support aggregation atdifferent
     // levels
     
      // 遍歷每個拓撲,將資訊構建到hostsMap和racksMap
     for(String topo: allTopos) {
 
        Node node, parentNode;
        NodeInfo nodeInfo, parentNodeInfo;
 
        node = clusterMap.getNode(topo);
 
        if (node == null) {
          node = new NodeBase(topo);
          clusterMap.add(node);
        }
       
        nodeInfo = hostsMap.get(node);
       
        // 資料節點資訊不存在,則在主機和機架資訊中都加入新的記錄
        //否則則更新下資料
        if (nodeInfo == null) {
          nodeInfo = new NodeInfo(node);
          hostsMap.put(node,nodeInfo);
          parentNode = node.getParent();
          parentNodeInfo =racksMap.get(parentNode);
          if (parentNodeInfo == null) {
            parentNodeInfo = new NodeInfo(parentNode);
           racksMap.put(parentNode,parentNodeInfo);
          }
          parentNodeInfo.addLeaf(nodeInfo);
        }
        else {
          nodeInfo = hostsMap.get(node);
          parentNode = node.getParent();
          parentNodeInfo =racksMap.get(parentNode);
        }
       
        // 更新這個資料節點包含了哪些塊索引和包含了分片中多少的資料量
        nodeInfo.addValue(index,bytesInThisBlock);
        //更新機架包含了哪些塊索引和包含了分片中多少的資料量
        parentNodeInfo.addValue(index,bytesInThisBlock);
 
     } // for all topos
   
   } // for all indices
    // 真正開始按選擇主機
   returnidentifyHosts(allTopos.length, racksMap);
 }
 
 // 會選擇出副本數的主機列表,即有副本數是3,則會返回3個主機的資訊
 // 選擇的演算法,是前面所說的先根據機架包含的資料量排序,再根據節點包含的資料量
 // 進行排序,然後依次從高到底選出副本數個主機資訊返回
 privateString[] identifyHosts(int replicationFactor,
                                Map<Node,NodeInfo> racksMap) {
   
   String [] retVal = new String[replicationFactor];
  
   List <NodeInfo> rackList = new LinkedList<NodeInfo>();
 
   rackList.addAll(racksMap.values());
   
    //對所有機架按包含的資料量多少進行排序
   // Sort the racks based on theircontribution to this split
   sortInDescendingOrder(rackList);
   
   booleandone = false;
   intindex = 0;
   
    //依次遍歷這些機架,在機架內會按節點包含的資料量的多少進行排序
   // Get the host list for all our aggregateditems, sort
   // them and return the top entries
   for(NodeInfo ni: rackList) {
 
     Set<NodeInfo> hostSet= ni.getLeaves();
 
     List<NodeInfo>hostList = new LinkedList<NodeInfo>();
     hostList.addAll(hostSet);
   
     // Sort the hosts in this rack based ontheir contribution
     sortInDescendingOrder(hostList);
     
      // 從按資料量的多少從高到底選擇主機
     for(NodeInfo host: hostList) {
        // Strip out the port number from the host name
        retVal[index++] = host.node.getName().split(":")[0];
        if (index == replicationFactor) {
          done = true;
          break;
        }
     }
     
     if(done == true){
        break;
     }
   }
   returnretVal;
 }

         通過上述選擇主機的演算法,我們可以知道,當一個分片包含的多個block的時候,總會從其他節點讀取資料,也就是做不到所有的計算都是本地化。為了發揮計算本地化效能,應該儘量使InputSplit大小與塊大小相當。

         在舊版的介面中,InputSplit的大小會受maptask個數,和split引數的影響,需要具體情況具體調整。在新版的介面中,這個比較容易控制,因為不受maptask的影響,InputSplit大小計算公式如下:         splitSize=max("mapred.min.split.size",min("mapred.max.split.size",blockSize))

兩個引數都取預設配置的時候,分片大小就是blockSize