1. 程式人生 > >Hadoop HDFS原始碼學習筆記(四)

Hadoop HDFS原始碼學習筆記(四)

一、首先根據HDFS的API寫一段程式,然後是用Eclipse進行debug 單步跟蹤,從而檢視原始碼執行流程:

import java.net.URI;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;

public class testRead1 {
    public static void main(String[] args) throws Exception{
        //String uri = args[0];
        String uri = "hdfs://localhost:9000/user/mmliu/test01.txt";
        Configuration conf = new Configuration();
        FileSystem fs = FileSystem.get(URI.create(uri), conf);
        FSDataInputStream in = null;
        try {
            in = fs.open(new Path(uri));
            IOUtils.copyBytes(in, System.out, 4096, false);
            in.seek(0);
            IOUtils.copyBytes(in, System.out, 4096, false);
        } finally {
            IOUtils.closeStream(in);
        }
    }
}

先給出整體的序列圖以便後邊分析:

1、單步追蹤client端的程式碼:

可以看到一開始當get一個FileSystem類的例項的時候,最終會根據我們提供的uri和conf 建立了一個DistributedFileSystem的例項物件,並完成了初始化的過程,初始化函式主要是完成了RPCNamenode和namenode的建立,而這兩個變數都是ClientProtocol型別的,ClientProtocol本質上是一個介面,最終完成的是RPC操作。

  private static ClientProtocol createRPCNamenode(InetSocketAddress nameNodeAddr,
      Configuration conf, UserGroupInformation ugi) 
    throws IOException {
    return (ClientProtocol)RPC.getProxy(ClientProtocol.class,
        ClientProtocol.versionID, nameNodeAddr, ugi, conf,
        NetUtils.getSocketFactory(conf, ClientProtocol.class));
  }

Open階段,HDFS呼叫了DistributedFileSystem的open函式:
public FSDataInputStream open(Path f, int bufferSize) throws IOException {
    statistics.incrementReadOps(1);
    return new DFSClient.DFSDataInputStream(
          dfs.open(getPathName(f), bufferSize, verifyChecksum, statistics));
  }
其中的dfs是一個DFSClient型別的成員函式,在open函式中建立了一個DFSInputStream類物件,並返回,在DFSInputStream類的建構函式中,其中有一個非常關鍵的函式被呼叫openInfo,一開始的時候我忽略了這個函式,結果導致到後來read過程中,不知道block資訊從哪裡來的,所以這裡非常關鍵,其實從DFSInputStream類的註釋中可以看到它的作用:
/****************************************************************
   * DFSInputStream provides bytes from a named file.  It handles 
   * negotiation of the namenode and various datanodes as necessary.
   ****************************************************************/
  public class DFSInputStream extends FSInputStream {

openInfo函式主要與namenode通訊,從namenode上預取file的block資訊,其中perfetchSize可以設定預取的block的塊數,預設為10.

private long prefetchSize = 10 * defaultBlockSize;

/**
     * Grab the open-file info from namenode
     */
    synchronized void openInfo() throws IOException {
      LocatedBlocks newInfo = callGetBlockLocations(namenode, src, 0, prefetchSize);
      if (newInfo == null) {
        throw new FileNotFoundException("File does not exist: " + src);
      }

      // I think this check is not correct. A file could have been appended to
      // between two calls to openInfo().
      if (locatedBlocks != null && !locatedBlocks.isUnderConstruction() &&
          !newInfo.isUnderConstruction()) {
        Iterator<LocatedBlock> oldIter = locatedBlocks.getLocatedBlocks().iterator();
        Iterator<LocatedBlock> newIter = newInfo.getLocatedBlocks().iterator();
        while (oldIter.hasNext() && newIter.hasNext()) {
          if (! oldIter.next().getBlock().equals(newIter.next().getBlock())) {
            throw new IOException("Blocklist for " + src + " has changed!");
          }
        }
      }
      updateBlockInfo(newInfo);
      this.locatedBlocks = newInfo;
      this.currentNode = null;
    }

private static LocatedBlocks callGetBlockLocations(ClientProtocol namenode,
      String src, long start, long length) throws IOException {
    try {
      return namenode.getBlockLocations(src, start, length);
    } catch(RemoteException re) {
      throw re.unwrapRemoteException(AccessControlException.class,
                                    FileNotFoundException.class);
    }
  }
LocatedBlocks主要包含檔案長度資訊long fileLength和一個連結串列的List<LocatedBlock> blocks,其中每個LocatedBlock包含如下資訊:
private Block b;
private long offset;  // offset of the first byte of the block in the file
private DatanodeInfo[] locs;
b 儲存block相關的資訊,offset 指向的是該block 在檔案中的偏移量,locs記錄了儲存該block的datanode的資訊

根據在get階段的追蹤記錄我們可以得知namenode.getBlockLocations函式呼叫是一個RPC呼叫,最終呼叫到namenode的getBlockLocations函式上,接下來我們就開始單步除錯namenode

2、NameNode端getBlockLocations函式分析 



NameNode類的getBlocksLocation函式實現為:

  /** {@inheritDoc} */
  public LocatedBlocks   getBlockLocations(String src, 
                                          long offset, 
                                          long length) throws IOException {
    myMetrics.incrNumGetBlockLocations();
    return namesystem.getBlockLocations(getClientMachine(), 
                                        src, offset, length);
  }

其中namesystem在為一個FSNameSystem型別的成員變數,儲存的是NameNode的name space樹,其中一個重要的成員變數為FSDirectory dir。FSDirectory主要包括FSImage fsImage,用於讀寫硬碟上的fsimage檔案,FSImage類有成員變數FSEditLog editLog,用於讀寫硬碟上的edit檔案,這兩個檔案的關係和作用已經在之前的文章中提到過了,並且也解釋了INode, INodeFile, 以及INodeDeirectory等之間的關係。
  /**
   * Get block locations within the specified range.
   * 
   * @see #getBlockLocations(String, long, long)
   */
  LocatedBlocks getBlockLocations(String clientMachine, String src,
      long offset, long length) throws IOException {
    LocatedBlocks blocks = getBlockLocations(src, offset, length, true, true);
    if (blocks != null) {
      //sort the blocks
      DatanodeDescriptor client = host2DataNodeMap.getDatanodeByHost(
          clientMachine);  // 根據client的主機IP等找到一個在該主機上執行的DataNode(如果存在的話)
      for (LocatedBlock b : blocks.getLocatedBlocks()) {
        clusterMap.pseudoSortByDistance(client, b.getLocations()); // 根據網路拓撲結構,將每個block的datanode按照距離client最近的標準進行排序
      }
    }
    return blocks;
  }

 /**
   * Get block locations within the specified range.
   * @see ClientProtocol#getBlockLocations(String, long, long)
   */
  public LocatedBlocks getBlockLocations(String src, long offset, long length,
      boolean doAccessTime, boolean needBlockToken) throws IOException {
    if (isPermissionEnabled) {
      checkPathAccess(src, FsAction.READ);
    }

    if (offset < 0) {
      throw new IOException("Negative offset is not supported. File: " + src );
    }
    if (length < 0) {
      throw new IOException("Negative length is not supported. File: " + src );
    }
    final LocatedBlocks ret = getBlockLocationsInternal(src, 
        offset, length, Integer.MAX_VALUE, doAccessTime, needBlockToken);  // 實際工作者 
    if (auditLog.isInfoEnabled() && isExternalInvocation()) {
      logAuditEvent(UserGroupInformation.getCurrentUser(),
                    Server.getRemoteIp(),
                    "open", src, null, null);
    }
    return ret;
  }

可以看到最終是呼叫的getBlockLocationsInternal函式完成的LocatedBlocks的構建過程:
private synchronized LocatedBlocks getBlockLocationsInternal(String src,
                                                       long offset, 
                                                       long length,
                                                       int nrBlocksToReturn,
                                                       boolean doAccessTime, 
                                                       boolean needBlockToken)
                                                       throws IOException {
    INodeFile inode = dir.getFileINode(src); // 通過路徑名從檔案系統樹中找到要開啟的檔案的INodeFile,其中儲存了該檔案的INode資訊 
    if(inode == null) {
      return null;
    }
    if (doAccessTime && isAccessTimeSupported()) {
      dir.setTimes(src, inode, -1, now(), false);
    }
    Block[] blocks = inode.getBlocks();
    if (blocks == null) {
      return null;
    }
    if (blocks.length == 0) {
      return inode.createLocatedBlocks(new ArrayList<LocatedBlock>(blocks.length));
    }
    List<LocatedBlock> results;
    results = new ArrayList<LocatedBlock>(blocks.length);

    int curBlk = 0;
    long curPos = 0, blkSize = 0;
    int nrBlocks = (blocks[0].getNumBytes() == 0) ? 0 : blocks.length;
    for (curBlk = 0; curBlk < nrBlocks; curBlk++) {   // 這個迴圈的目的是找到offset所在的block的位置 
      blkSize = blocks[curBlk].getNumBytes();        // curBlk就指向了offset所在的block
      assert blkSize > 0 : "Block of size 0";  
      if (curPos + blkSize > offset) {
        break;
      }
      curPos += blkSize;
    }
    
    if (nrBlocks > 0 && curBlk == nrBlocks)   // offset >= end of file
      return null;
    
    long endOff = offset + length;   // 找到使用者請求的length長度的data在檔案中的結束位置 
    
    do {   // 該迴圈將涉及到從offset開始到endOff結束這段長度的data的block資訊收集起來,然後組合成LocatedBlock
      // get block locations
      int numNodes = blocksMap.numNodes(blocks[curBlk]);
      int numCorruptNodes = countNodes(blocks[curBlk]).corruptReplicas();
      int numCorruptReplicas = corruptReplicas.numCorruptReplicas(blocks[curBlk]); 
      if (numCorruptNodes != numCorruptReplicas) {
        LOG.warn("Inconsistent number of corrupt replicas for " + 
            blocks[curBlk] + "blockMap has " + numCorruptNodes + 
            " but corrupt replicas map has " + numCorruptReplicas);
      }
      DatanodeDescriptor[] machineSet = null;
      boolean blockCorrupt = false;
      if (inode.isUnderConstruction() && curBlk == blocks.length - 1
          && blocksMap.numNodes(blocks[curBlk]) == 0) {
        // get unfinished block locations
        INodeFileUnderConstruction cons = (INodeFileUnderConstruction)inode;
        machineSet = cons.getTargets();
        blockCorrupt = false;
      } else {
        blockCorrupt = (numCorruptNodes == numNodes);
        int numMachineSet = blockCorrupt ? numNodes : 
                            (numNodes - numCorruptNodes);
        machineSet = new DatanodeDescriptor[numMachineSet]; // 找到block對應的machineSet資訊並將為損壞的放入集合中
        if (numMachineSet > 0) {
          numNodes = 0;
          for(Iterator<DatanodeDescriptor> it = 
              blocksMap.nodeIterator(blocks[curBlk]); it.hasNext();) {
            DatanodeDescriptor dn = it.next();
            boolean replicaCorrupt = corruptReplicas.isReplicaCorrupt(blocks[curBlk], dn);
            if (blockCorrupt || (!blockCorrupt && !replicaCorrupt))
              machineSet[numNodes++] = dn;
          }
        }
      }
      LocatedBlock b = new LocatedBlock(blocks[curBlk], machineSet, curPos,
          blockCorrupt);
      if(isAccessTokenEnabled && needBlockToken) {
        b.setBlockToken(accessTokenHandler.generateToken(b.getBlock(), 
            EnumSet.of(BlockTokenSecretManager.AccessMode.READ)));
      }
      
      results.add(b);  // 將machineSet以及block構造的LocatedBlock放入到連結串列中
      curPos += blocks[curBlk].getNumBytes();
      curBlk++;
    } while (curPos < endOff 
          && curBlk < blocks.length 
          && results.size() < nrBlocksToReturn);
    
    return inode.createLocatedBlocks(results); // 使用LocatedBlock連結串列構造一個LocatedBlocks物件然後返回
  }


通過RPC呼叫,在NameNode得到的LocatedBlocks物件,作為成員變數構造DFSInputStream物件,最後包裝為FSDataInputStream返回給使用者