1. 程式人生 > >HDFS讀取檔案過程

HDFS讀取檔案過程

從HDFS中讀取一個檔案,都需要做些什麼呢?我們拿一個簡單的例子來看一下:

import java.io.InputStream;
import java.net.URI;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;


public class FileSystemCat {

	/**
	 * @param args
	 */
	public static void main(String[] args) throws Exception{
		// TODO Auto-generated method stub
		String uri = args[0];
  /**
   * Get block locations within the specified range.
   * @see ClientProtocol#getBlockLocations(String, long, long)
   */
  public LocatedBlocks getBlockLocations(String src, long offset, long length,
      boolean doAccessTime, boolean needBlockToken) throws IOException {
    if (isPermissionEnabled) {
      checkPathAccess(src, FsAction.READ);
    }

    if (offset < 0) {
      throw new IOException("Negative offset is not supported. File: " + src );
    }
    if (length < 0) {
      throw new IOException("Negative length is not supported. File: " + src );
    }
    final LocatedBlocks ret = getBlockLocationsInternal(src, 
        offset, length, Integer.MAX_VALUE, doAccessTime, needBlockToken);  
    if (auditLog.isInfoEnabled() && isExternalInvocation()) {
      logAuditEvent(UserGroupInformation.getCurrentUser(),
                    Server.getRemoteIp(),
                    "open", src, null, null);
    }
    return ret;
  }

Configuration conf = new Configuration();FileSystem fs = FileSystem.get(URI.create(uri), conf);InputStream in = null;try{in = fs.open(new Path(uri));IOUtils.copyBytes(in, System.out, 4096, false);}finally{IOUtils.closeStream(in);}}}
這個程式就是一個HDFS客戶端程式,我們通過這個程式可以指定一個檔案的路徑,然後讀取這個檔案的內容。這個過程中都做了哪些事情呢?從總體上看非常簡單:開啟檔案-讀取檔案-關閉檔案。

眾所周知,HDFS是以塊的形式儲存檔案的,在我們讀取HDFS中的一個檔案的時候,我們必須搞清楚這個檔案由哪些塊組成,這個操作就涉及到對名位元組點的訪問,因為名位元組點儲存了檔案-塊序列的對映資訊,並將這些資訊持久儲存於名位元組點上,當然,為了效率方面的考慮,在HDFS啟動時會把這些資訊載入到記憶體中,這也從一定程度上反映了名位元組點的記憶體大小限制了真個HDFS叢集能儲存的檔案量的事實。

其實,在這個小的客戶端程式中,最重要的就是一行程式碼:

in = fs.open(new Path(uri));

open方法返回了一個InputStream,對於這個InputStream我向大家都比較熟悉了,是Java內建的資料型別,就是一個輸入流。但是怎樣來形成這個流就需要費一番周折了,那我們就看一下DistributedFileSystem是怎麼實現的,下面我們就一步步追蹤程式碼:

  public FSDataInputStream open(Path f) throws IOException {
    return open(f, getConf().getInt("io.file.buffer.size", 4096));
  }

繼而追蹤到:
  public FSDataInputStream open(Path f, int bufferSize) throws IOException {
    statistics.incrementReadOps(1);
    return new DFSClient.DFSDataInputStream(
          dfs.open(getPathName(f), bufferSize, verifyChecksum, statistics));
  }

很明顯,dfs.open(getPathName(f), bufferSize, verifyChecksum, statistics)返回了一個輸入流,而外層的包裝又裝飾了一層,添加了一些其他功能。我們這裡只看主要的,這個dfs.open到底做了什麼。首先要弄清楚dfs是什麼型別的,好吧,我們來看一下它的宣告:
DFSClient dfs;

看到它是一個DFSClient物件。我們繼續追溯到DFSClient這個類裡open方法的實現:
  DFSInputStream open(String src, int buffersize, boolean verifyChecksum,
                      FileSystem.Statistics stats
      ) throws IOException {
    checkOpen();
    //    Get block info from namenode
    return new DFSInputStream(src, buffersize, verifyChecksum);
  }

我們看到了一個非常關鍵的註釋:Get block info from namenode(從名稱節點獲取塊資訊)。那麼我們就來看一下hdfs是怎樣獲取到這個檔案的塊資訊的:
  private static LocatedBlocks callGetBlockLocations(ClientProtocol namenode,
      String src, long start, long length) throws IOException {
    try {
      return namenode.getBlockLocations(src, start, length);
    } catch(RemoteException re) {
      throw re.unwrapRemoteException(AccessControlException.class,
                                    FileNotFoundException.class);
    }
  }

這是DFSClient中獲取塊資訊的方法,它的第一個引數是一個實現了客戶端協議的物件,第二個就是指定的檔案路徑,然後就是檔案偏移量和長度了。裡面還是隻有一行關鍵程式碼:return namenode.getBlockLocations(src, start, length),這個NameNode的一個方法,我們定位到這個方法:

  public LocatedBlocks   getBlockLocations(String src, 
                                          long offset, 
                                          long length) throws IOException {
    myMetrics.incrNumGetBlockLocations();
    return namesystem.getBlockLocations(getClientMachine(), 
                                        src, offset, length);
  }

其中的namesystem是一個FSNamesystem類的例項,定位到上面呼叫那個方法:
  /**
   * Get block locations within the specified range.
   * 
   * @see #getBlockLocations(String, long, long)
   */
  LocatedBlocks getBlockLocations(String clientMachine, String src,
      long offset, long length) throws IOException {
    LocatedBlocks blocks = getBlockLocations(src, offset, length, true, true);
    if (blocks != null) {
      //sort the blocks
      DatanodeDescriptor client = host2DataNodeMap.getDatanodeByHost(
          clientMachine);
      for (LocatedBlock b : blocks.getLocatedBlocks()) {
        clusterMap.pseudoSortByDistance(client, b.getLocations());
      }
    }
    return blocks;
  }

這個方法的第一個引數是客戶端機器,這個引數主要是在計算資料節點與客戶端機器的距離時用到的,組成一個檔案的所有的塊都會按照配置(預設3個)冗餘地儲存於不同的資料節點上,為了最大程度的提高效能,客戶端要直接從最近的那個資料節點上接收資料。後面的3個引數很明瞭,就是檔案的路徑、偏移量和長度。然後獲取到組成這個檔案的塊資訊、按照與客戶端機器的距離排序這些塊資訊,最後返回。

其實到這裡我們還是沒有看到到底是怎樣獲取塊資訊的,只有再次追溯程式碼:

  /**
   * Get block locations within the specified range.
   * @see ClientProtocol#getBlockLocations(String, long, long)
   */
  public LocatedBlocks getBlockLocations(String src, long offset, long length,
      boolean doAccessTime, boolean needBlockToken) throws IOException {
    if (isPermissionEnabled) {
      checkPathAccess(src, FsAction.READ);
    }

    if (offset < 0) {
      throw new IOException("Negative offset is not supported. File: " + src );
    }
    if (length < 0) {
      throw new IOException("Negative length is not supported. File: " + src );
    }
    final LocatedBlocks ret = getBlockLocationsInternal(src, 
        offset, length, Integer.MAX_VALUE, doAccessTime, needBlockToken);  
    if (auditLog.isInfoEnabled() && isExternalInvocation()) {
      logAuditEvent(UserGroupInformation.getCurrentUser(),
                    Server.getRemoteIp(),
                    "open", src, null, null);
    }
    return ret;
  }

這個方法首先做了一個許可權相關的檢測,然後對於偏移量和檔案長度引數做了一些合法性驗證,然後又呼叫了一個靜態方法來獲取到相關的塊資訊,做了一些日誌相關的記錄工作,最後返回獲取的塊資訊。繼續追溯獲取塊資訊的程式碼:
  private synchronized LocatedBlocks getBlockLocationsInternal(String src,
                                                       long offset, 
                                                       long length,
                                                       int nrBlocksToReturn,
                                                       boolean doAccessTime, 
                                                       boolean needBlockToken)
                                                       throws IOException {
    INodeFile inode = dir.getFileINode(src);
    if(inode == null) {
      return null;
    }
    if (doAccessTime && isAccessTimeSupported()) {
      dir.setTimes(src, inode, -1, now(), false);
    }
    Block[] blocks = inode.getBlocks();
    if (blocks == null) {
      return null;
    }
    if (blocks.length == 0) {
      return inode.createLocatedBlocks(new ArrayList<LocatedBlock>(blocks.length));
    }
    List<LocatedBlock> results;
    results = new ArrayList<LocatedBlock>(blocks.length);

    int curBlk = 0;
    long curPos = 0, blkSize = 0;
    int nrBlocks = (blocks[0].getNumBytes() == 0) ? 0 : blocks.length;
    for (curBlk = 0; curBlk < nrBlocks; curBlk++) {
      blkSize = blocks[curBlk].getNumBytes();
      assert blkSize > 0 : "Block of size 0";
      if (curPos + blkSize > offset) {
        break;
      }
      curPos += blkSize;
    }
    
    if (nrBlocks > 0 && curBlk == nrBlocks)   // offset >= end of file
      return null;
    
    long endOff = offset + length;
    
    do {
      // get block locations
      int numNodes = blocksMap.numNodes(blocks[curBlk]);
      int numCorruptNodes = countNodes(blocks[curBlk]).corruptReplicas();
      int numCorruptReplicas = corruptReplicas.numCorruptReplicas(blocks[curBlk]); 
      if (numCorruptNodes != numCorruptReplicas) {
        LOG.warn("Inconsistent number of corrupt replicas for " + 
            blocks[curBlk] + "blockMap has " + numCorruptNodes + 
            " but corrupt replicas map has " + numCorruptReplicas);
      }
      boolean blockCorrupt = (numCorruptNodes == numNodes);
      int numMachineSet = blockCorrupt ? numNodes : 
                            (numNodes - numCorruptNodes);
      DatanodeDescriptor[] machineSet = new DatanodeDescriptor[numMachineSet];
      if (numMachineSet > 0) {
        numNodes = 0;
        for(Iterator<DatanodeDescriptor> it = 
            blocksMap.nodeIterator(blocks[curBlk]); it.hasNext();) {
          DatanodeDescriptor dn = it.next();
          boolean replicaCorrupt = corruptReplicas.isReplicaCorrupt(blocks[curBlk], dn);
          if (blockCorrupt || (!blockCorrupt && !replicaCorrupt))
            machineSet[numNodes++] = dn;
        }
      }
      LocatedBlock b = new LocatedBlock(blocks[curBlk], machineSet, curPos,
          blockCorrupt);
      if(isAccessTokenEnabled && needBlockToken) {
        b.setBlockToken(accessTokenHandler.generateToken(b.getBlock(), 
            EnumSet.of(BlockTokenSecretManager.AccessMode.READ)));
      }
      
      results.add(b); 
      curPos += blocks[curBlk].getNumBytes();
      curBlk++;
    } while (curPos < endOff 
          && curBlk < blocks.length 
          && results.size() < nrBlocksToReturn);
    
    return inode.createLocatedBlocks(results);
  }

在上面這個靜態方法中,獲取塊資訊的操作更加具體了,這個方法首先引入了一個(以這個檔案路徑為引數建立的)索引節點(inode),如果這個索引節點物件為null,說明這個檔案不存在,直接返回null。然後做一些訪問標記,獲取這個索引節點物件的所有塊資訊(Block[] blocks = inode.getBlocks())。如果獲取的塊資訊為null直接返回null,如果塊列表的長度為0,返回一個空的結果;否則先將位置移動到偏移量的位置,並同時計數塊編號。如果最後看到滿足條件的塊編號大於塊總數,直接返回null。如果以上的條件都不滿足,將在範圍之內的塊組織到一個ArrayList中,最後用這個列表建立一個LocatedBlocks物件,並返回。