1. 程式人生 > >讀HDFS書筆記---5.2 檔案讀操作與輸入流(5.2.2)---上

讀HDFS書筆記---5.2 檔案讀操作與輸入流(5.2.2)---上

5.2.2 讀操作--DFSInputStream實現

   HDFS目前實現的讀操作有三個層次,分別是網路讀、短路讀(short circuit read)以及零拷貝(zero copy read),它們的讀取效率一次遞增。

網路讀:

網路讀是最基本的一種HDFS讀,DFSClient和Datanode通過建立Socket連線傳輸資料。

短路讀:

當DFSClient和儲存目標資料塊的Datanode在同一個物理節點上時,DFSClient可以直接開啟資料塊副本檔案讀取資料,而不需要Datanode程序的轉發。後面會講到。

零拷貝讀:

當DFSClient和快取目標資料塊

的Datanode在同一個物理節點上時,DFSClient可以通過零拷貝方式讀取該資料塊,大大提供了效率。而且即使在讀取過程中該資料塊被Datanode從快取中移出了,讀取操作也可以退化成本地短路讀。

 

HdfsDataInputStream.read()方法就實現了上面描述的三個層次讀取,程式碼(read方法在其父類FSDataInputStream下)如下:

@Override
  public ByteBuffer read(ByteBufferPool bufferPool, int maxLength,
      EnumSet<ReadOption> opts) 
          throws IOException, UnsupportedOperationException {
    try {
      return ((HasEnhancedByteBufferAccess)in).read(bufferPool,
          maxLength, opts);
    }
    catch (ClassCastException e) {
      ByteBuffer buffer = ByteBufferUtil.
          fallbackRead(this, bufferPool, maxLength);
      if (buffer != null) {
        extendedReadBuffers.put(buffer, bufferPool);
      }
      return buffer;
    }
  }

HdfsDataInputStream.read()方法首先會呼叫HasEnhancedByteBufferAccess.read()方法嘗試進行零拷貝讀取,如果當前配置不支援零拷貝讀取模式,則丟擲異常,然後呼叫ByteBufferUtil.fallbackRead()靜態方法退化成短路讀或者網路讀。HdfsDataInputStream.read()方法呼叫流程如下圖:

HdfsDataInputStream.read()呼叫流程圖

HdfsDataInputStream實現了HasEnhancedByteBufferAccess.read()方法以及InputStream.read()方法,這兩個方法的實現都是通過呼叫底層包裝類DFSInputStream對應的方法執行的。HasEnhancedByteBufferAccess.read()方法定義了零拷貝讀取的實現,而InputStream.read()方法則定義了短路讀和網路讀的實現。下面我們開始分別講解DFSInputStream實現的InputStream.read()和HasEnhancedByteBufferAccess.read()。

這裡需要解釋一下,為什麼說HasEnhancedByteBufferAccess.read()方法以及InputStream.read()方法都是通過呼叫底層包裝類DFSInputStream對應的方法執行的?

我們看一下FSDataInputStream的建立函式doCall()方法,
public FSDataInputStream doCall(final Path p) throws IOException, UnresolvedLinkException {

        //通過open函式返回一個DFSInputStream類物件
        final DFSInputStream dfsis = dfs.open(getPathName(p), bufferSize, verifyChecksum);

        //將DFSInputStream類物件封裝成HdfsDataInputStream類物件(createWrappedInputStream函式下面會列出)
        return dfs.createWrappedInputStream(dfsis);
}

createWrappedInputStream()函式程式碼如下:

/**
   * Wraps the stream in a CryptoInputStream if the underlying file is
   * encrypted.
   */
  public HdfsDataInputStream createWrappedInputStream(DFSInputStream dfsis)
      throws IOException {
    final FileEncryptionInfo feInfo = dfsis.getFileEncryptionInfo();
    if (feInfo != null) {
      // File is encrypted, wrap the stream in a crypto stream.
      // Currently only one version, so no special logic based on the version #
      getCryptoProtocolVersion(feInfo);
      final CryptoCodec codec = getCryptoCodec(conf, feInfo);
      final KeyVersion decrypted = decryptEncryptedDataEncryptionKey(feInfo);
      final CryptoInputStream cryptoIn =
          new CryptoInputStream(dfsis, codec, decrypted.getMaterial(),
              feInfo.getIV());
      return new HdfsDataInputStream(cryptoIn);
    } else {
      // No FileEncryptionInfo so no encryption.
      return new HdfsDataInputStream(dfsis);
    }
  }

可以看到,HdfsDataInputStream類將DFSInputStream類物件作為建構函式的引數傳入,最終賦值給InputStream類中的成員變數,如下:

protected volatile InputStream in;

在呼叫read函式的時候,會呼叫HdfsDataInputStream類中的read函式,但是由於該類中沒有實現read函式,所以呼叫它的父類FDSInputStream類中的read函式,該函式先嚐試進行零拷貝,程式碼為

return ((HasEnhancedByteBufferAccess)in).read(bufferPool, maxLength, opts);

這裡的in實際型別為DFSInputStream類,由於該類實現了介面HasEnhancedByteBufferAccess,所以這裡轉換沒有問題,所以這裡的read呼叫的是DFSInputStream類中的函式,如果該read函式呼叫異常,那麼就會執行程式碼

ByteBuffer buffer = ByteBufferUtil.fallbackRead(this, bufferPool, maxLength);

fallbackRead函式程式碼如下:

/**
   * Perform a fallback read.
   */
  public static ByteBuffer fallbackRead(
      InputStream stream, ByteBufferPool bufferPool, int maxLength)
          throws IOException {
    if (bufferPool == null) {
      throw new UnsupportedOperationException("zero-copy reads " +
          "were not available, and you did not provide a fallback " +
          "ByteBufferPool.");
    }
    boolean useDirect = streamHasByteBufferRead(stream);
    ByteBuffer buffer = bufferPool.getBuffer(useDirect, maxLength);
    if (buffer == null) {
      throw new UnsupportedOperationException("zero-copy reads " +
          "were not available, and the ByteBufferPool did not provide " +
          "us with " + (useDirect ? "a direct" : "an indirect") +
          "buffer.");
    }
    Preconditions.checkState(buffer.capacity() > 0);
    Preconditions.checkState(buffer.isDirect() == useDirect);
    maxLength = Math.min(maxLength, buffer.capacity());
    boolean success = false;
    try {
      if (useDirect) {
        buffer.clear();
        buffer.limit(maxLength);
        ByteBufferReadable readable = (ByteBufferReadable)stream;
        int totalRead = 0;
        while (true) {
          if (totalRead >= maxLength) {
            success = true;
            break;
          }
          int nRead = readable.read(buffer);
          if (nRead < 0) {
            if (totalRead > 0) {
              success = true;
            }
            break;
          }
          totalRead += nRead;
        }
        buffer.flip();
      } else {
        buffer.clear();
        int nRead = stream.read(buffer.array(),
            buffer.arrayOffset(), maxLength);
        if (nRead >= 0) {
          buffer.limit(nRead);
          success = true;
        }
      }
    } finally {
      if (!success) {
        // If we got an error while reading, or if we are at EOF, we 
        // don't need the buffer any more.  We can give it back to the
        // bufferPool.
        bufferPool.putBuffer(buffer);
        buffer = null;
      }
    }
    return buffer;
  }

從上面的程式碼可以看到,裡面呼叫了FSDInputStream中的兩種read函式,最終呼叫的都是FSDInputStream類中in成員變數的read函式,而in的實際型別就是DFSInputStream,所以read函式最終呼叫的都是DFSInputStream類中的。

InputStream.read()

InputStream.read()函式的流程圖如下:

InputStream.read()函式呼叫流程圖

read函式程式碼如下:

/**
   * Read the entire buffer.
   */
  @Override
  public synchronized int read(final byte buf[], int off, int len) throws IOException {
    //這裡使用位元組陣列作為容器
    ReaderStrategy byteArrayReader = new ByteArrayStrategy(buf);

    return readWithStrategy(byteArrayReader, off, len);
  }

read()方法將從輸入流的off遊標開始,讀取len個位元組,然後存入buf[]快取陣列中,這裡的off、len以及buf[]都是read()方法的輸入引數。read()方法首先會構造一個ByteArrayStrategy物件,表明當前的讀取操作使用位元組陣列作為容器,然後呼叫readWithStrategy()方法讀取資料。其中ByteArrayStrategy類的程式碼如下:

/**
   * Used to read bytes into a byte[]
   */
  private static class ByteArrayStrategy implements ReaderStrategy {
    final byte[] buf;

    public ByteArrayStrategy(byte[] buf) {
      this.buf = buf;
    }

    @Override
    public int doRead(BlockReader blockReader, int off, int len,
            ReadStatistics readStatistics) throws ChecksumException, IOException {
        int nRead = blockReader.read(buf, off, len);
        updateReadStatistics(readStatistics, nRead, blockReader);
        return nRead;
    }
  }

下面我們看一下readWithStrategy()方法的實現,程式碼如下:

private int readWithStrategy(ReaderStrategy strategy, int off, int len) throws IOException {
    dfsClient.checkOpen();
    if (closed) {
      throw new IOException("Stream closed");
    }
    Map<ExtendedBlock,Set<DatanodeInfo>> corruptedBlockMap 
      = new HashMap<ExtendedBlock, Set<DatanodeInfo>>();
    failures = 0;
    if (pos < getFileLength()) {//讀取位置在檔案範圍內
      int retries = 2;//如果出現異常,則重試兩次
      while (retries > 0) {
        try {
          // currentNode can be left as null if previous read had a checksum
          // error on the same block. See HDFS-3067
          //pos超過資料塊邊界,需要從新的資料塊開始讀取資料
          if (pos > blockEnd || currentNode == null) {
            //呼叫blockSeekTo()方法獲取儲存這個資料塊的一個數據節點
            currentNode = blockSeekTo(pos);
          }
          //計算這次讀取的長度
          int realLen = (int) Math.min(len, (blockEnd - pos + 1L));
          if (locatedBlocks.isLastBlockComplete()) {
            realLen = (int) Math.min(realLen, locatedBlocks.getFileLength());
          }

          //呼叫readBuffer()方法讀取資料
          int result = readBuffer(strategy, off, realLen, corruptedBlockMap);
          
          if (result >= 0) {
            pos += result;//pos移位
          } else {
            // got a EOS from reader though we expect more data on it.
            throw new IOException("Unexpected EOS from the reader");
          }
          if (dfsClient.stats != null) {
            dfsClient.stats.incrementBytesRead(result);
          }
          return result;
        } catch (ChecksumException ce) {
          throw ce;          //出現校驗錯誤,則丟擲異常  
        } catch (IOException e) {
          if (retries == 1) {
            DFSClient.LOG.warn("DFS Read", e);
          }
          blockEnd = -1;
          if (currentNode != null) { addToDeadNodes(currentNode); }//將當前失敗的節點入黑名單
          if (--retries == 0) {//重試超過兩次,直接丟擲異常
            throw e;
          }
        } finally {
          // Check if need to report block replicas corruption either read
          // was successful or ChecksumException occured.
          //檢查是否需要向Namenode彙報損壞的資料塊
          reportCheckSumFailure(corruptedBlockMap, 
              currentLocatedBlock.getLocations().length);
        }
      }
    }
    return -1;
  }

readWithStrategy()方法首先呼叫blockSeek()方法獲取一個儲存了目標資料塊的Datanode,然後呼叫readBuffer()方法從該Datanode讀取資料塊。如果讀取過程出現IO異常,則進行重試操作,並將該Datanode放入黑名單中。

可以看到,readWithStrategy()呼叫了blockSeekTo()以及readBuffer()方法,接下來講解這兩個方法。

(1) blockSeekTo()

該函式程式碼如下:

/**
   * Open a DataInputStream to a DataNode so that it can be read from.
   * We get block ID and the IDs of the destinations at startup, from the namenode.
   */
  private synchronized DatanodeInfo blockSeekTo(long target) throws IOException {
    if (target >= getFileLength()) {//如果讀取位置超過HDFS檔案長度,則丟擲異常
      throw new IOException("Attempted to read past end of file");
    }

    // Will be getting a new BlockReader.
    if (blockReader != null) {//關閉上一個資料塊對應的BlockReader
      blockReader.close();
      blockReader = null;
    }

    //
    // Connect to best DataNode for desired Block, with potential offset
    //
    DatanodeInfo chosenNode = null;
    int refetchToken = 1; // only need to get a new access token once
    int refetchEncryptionKey = 1; // only need to get a new encryption key once
    
    boolean connectFailedOnce = false;

    while (true) {
      //
      // Compute desired block
      //
      //獲取target對應的資料塊的位置資訊
      LocatedBlock targetBlock = getBlockAt(target, true);
      assert (target==pos) : "Wrong postion " + pos + " expect " + target;
      //獲取當前target在新資料塊中的偏移量
      long offsetIntoBlock = target - targetBlock.getStartOffset();

      //呼叫chooseDataNode()方法,獲取一個Datanode用來讀取該資料塊
      DNAddrPair retval = chooseDataNode(targetBlock, null);
      chosenNode = retval.info;
      InetSocketAddress targetAddr = retval.addr;
      StorageType storageType = retval.storageType;

      try {
        ExtendedBlock blk = targetBlock.getBlock();
        Token<BlockTokenIdentifier> accessToken = targetBlock.getBlockToken();
        //通過BlockReaderFactory獲取blockReader物件
        blockReader = new BlockReaderFactory(dfsClient.getConf()).
            setInetSocketAddress(targetAddr).
            setRemotePeerFactory(dfsClient).
            setDatanodeInfo(chosenNode).
            setStorageType(storageType).
            setFileName(src).
            setBlock(blk).
            setBlockToken(accessToken).
            setStartOffset(offsetIntoBlock).
            setVerifyChecksum(verifyChecksum).
            setClientName(dfsClient.clientName).
            setLength(blk.getNumBytes() - offsetIntoBlock).
            setCachingStrategy(cachingStrategy).
            setAllowShortCircuitLocalReads(!shortCircuitForbidden()).
            setClientCacheContext(dfsClient.getClientContext()).
            setUserGroupInformation(dfsClient.ugi).
            setConfiguration(dfsClient.getConfiguration()).
            build();
        if(connectFailedOnce) {
          DFSClient.LOG.info("Successfully connected to " + targetAddr +
                             " for " + blk);
        }
        return chosenNode;
      } catch (IOException ex) {
        if (ex instanceof InvalidEncryptionKeyException && refetchEncryptionKey > 0) {
          //安全相關的異常
          DFSClient.LOG.info("Will fetch a new encryption key and retry, " 
              + "encryption key was invalid when connecting to " + targetAddr
              + " : " + ex);
          // The encryption key used is invalid.
          refetchEncryptionKey--;
          dfsClient.clearDataEncryptionKey();
        } else if (refetchToken > 0 && tokenRefetchNeeded(ex, targetAddr)) {
          //安全相關         
          refetchToken--;
          fetchBlockAt(target);
        } else {
          connectFailedOnce = true;
          DFSClient.LOG.warn("Failed to connect to " + targetAddr + " for block"
            + ", add to deadNodes and continue. " + ex, ex);
          // Put chosen node into dead list, continue
          //BlockReader構造失敗,將chosenNode放入黑名單中
          addToDeadNodes(chosenNode);
        }
      }
    }
  }

一個HDFS檔案會被切分成多個數據塊,這些資料塊分散在HDFS叢集的Datanode上。當我們讀取檔案時,也就是按照順序讀取資料塊時,如果讀操作完成了一個數據塊的讀取,就需要構造讀取下一個資料塊的輸入流,這時就需要呼叫blockSeekTo()方法獲取儲存下一個資料塊的Datanode。

blockSeekTo()方法會先呼叫getBlockAt()方法獲取遊標(DFSInputStream.pos欄位儲存)所在資料塊的資訊,然後呼叫chooseDataNode()方法獲取一個儲存了該資料塊的Datanode。接下來會構造從這個節點讀取資料塊的BlockReader物件,構造的BlockReader物件會被儲存在DFSInputStream.blockReader欄位中。這裡需要注意,構造BlockReader時使用了BlockReaderFactory這個工廠類,後面會講到BlockReader類的實現。

接下來分析getBlockAt()->chooseDataNode()->blockReader。也就是獲取資料塊->獲取資料塊對應的資料節點->獲取BlockReader物件的過程。

<1> getBlockAt()

該方法用於獲取檔案pos遊標所在資料塊的位置資訊,也就是獲取該資料塊對應的LocatedBlock物件。LocatedBlock物件儲存了所有儲存該資料塊的Datanode資訊,這些資訊會按照距離客戶端的遠近排序,同時LocatedBlock還儲存了當前資料塊是否被快取等資訊。getBlockAt()方法會呼叫ClientProtocol.getBlockLocations()方法從Namenode獲取LocatedBlock物件,並將這個LocatedBlock物件儲存到DFSInputStream.locatedBlocks欄位中。

<2> chooseDataNode()

選擇一個合適的Datanode讀取資料塊,這個方法的邏輯很簡單,由於LocatedBlock物件中已經包含了按照與客戶端距離遠近排序的Datanode列表,所以只需要遍歷這個列表,選出第一個不在Datanode黑名單(DFSInputStream.deadNodes欄位中儲存)中的Datanode即可。

<3> BlockReaderFactory.build()

構造從指定Datanode上讀取資料塊的BlockReader物件,這裡使用了BlockReaderFactory這個工廠類,BlockReaderFactory.build()方法的實現後面會講到。

(2) readBuffer()

該方法程式碼如下:

/* This is a used by regular read() and handles ChecksumExceptions.
   * name readBuffer() is chosen to imply similarity to readBuffer() in
   * ChecksumFileSystem
   */ 
  private synchronized int readBuffer(ReaderStrategy reader, int off, int len,
      Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap)
      throws IOException {
    IOException ioe;
    
    /* we retry current node only once. So this is set to true only here.
     * Intention is to handle one common case of an error that is not a
     * failure on datanode or client : when DataNode closes the connection
     * since client is idle. If there are other cases of "non-errors" then
     * then a datanode might be retried by setting this to true again.
     */
    boolean retryCurrentNode = true;

    while (true) {
      // retry as many times as seekToNewSource allows.
      try {
        //讀取資料
        return reader.doRead(blockReader, off, len, readStatistics);
      } catch ( ChecksumException ce ) {
        DFSClient.LOG.warn("Found Checksum error for "
            + getCurrentBlock() + " from " + currentNode
            + " at " + ce.getPos()); 
        //出現校驗異常時,表明currentNode上的資料塊出現了錯誤       
        ioe = ce;
        retryCurrentNode = false;
        // we want to remember which block replicas we have tried
        //將損壞的資料塊加入corruptedBlockMap中,並向Namenode彙報
        addIntoCorruptedBlockMap(getCurrentBlock(), currentNode,
            corruptedBlockMap);
      } catch ( IOException e ) {
        if (!retryCurrentNode) {
          DFSClient.LOG.warn("Exception while reading from "
              + getCurrentBlock() + " of " + src + " from "
              + currentNode, e);
        }
        ioe = e;
      }
      boolean sourceFound = false;
      if (retryCurrentNode) {
        /* possibly retry the same node so that transient errors don't
         * result in application level failures (e.g. Datanode could have
         * closed the connection because the client is idle for too long).
         */ 
        //重試當前節點
        sourceFound = seekToBlockSource(pos);
      } else {
        //當Datanode重試失敗,則將當前節點加入黑名單中,然後重新選擇一個Datanode讀取資料
        addToDeadNodes(currentNode);
        sourceFound = seekToNewSource(pos);
      }
      if (!sourceFound) {
        throw ioe;
      }
      retryCurrentNode = false;
    }
  }

readBuffer()的讀入操作主要是通過委託BlockReader物件實現的,並在發生異常時進行重試。當讀取出現校驗異常時,表明currentNode上的資料塊出現了錯誤,這時readBuffer()方法會將錯誤的資料塊新增到corruptedBlockMap中,並通過reportCheckSumFailure()方法向Namenode彙報錯誤的資料塊。如果是普通的IO異常,則有可能是客戶端與資料節點之間的連線關閉了,那麼readBuffer()方法會在當前節點上呼叫seekToBlockSource()重試。如果重試失敗,則呼叫seekToNewSource()選擇新的Datanode,並將當前Datanode加入黑名單中,SeekToBlockSource()和seekToNewSource()方法都呼叫了上面介紹的blockSeekTo()方法。

由於篇幅原因,後續內容放在下一篇文章。