1. 程式人生 > >讀HDFS書筆記---5.2 檔案讀操作與輸入流(5.2.2)---下

讀HDFS書筆記---5.2 檔案讀操作與輸入流(5.2.2)---下

2、BlockReader

DFSInputStream.read()方法中,呼叫了BlockReader物件的doRead()方法讀取資料塊。

BlockReader是一個介面,抽象了從指定資料節點上讀取資料塊的類。BlockReader有三個主要的子類,如下:

(1) BlockReaderLocal:

進行本地短路讀取(請參考短路讀取相關小節)的BlockReader。當客戶端與Datanode在同一臺物理機器上時,客戶端可以直接從本來地磁碟讀取資料。繞過Datanode程序,從而提高了讀取效能。

(2) BlockReaderLocalLegacy:

老版本的BlockReaderLocal。當客戶端與Datanode在同一臺機器(通過ip地址來判斷是否在同一臺機器上)上時,客戶端直接從磁碟讀取資料,老版本的實現要求客戶端獲取Datanode資料目錄的許可權,這可能引入安全問題(請參考HDF-2246)remoteBlockReader

(3)  RemoteBlockReader2:

使用TCP協議從Datanode讀取資料塊。

 

BlockReader介面下的方法

<1> read()、readFully()、readAll():將資料讀取到byte[]陣列中。

<2> skip():從資料塊中跳過若干位元組

<3> available():當不用進行一次新的網路IO時,當前輸入流可以讀取的位元組數。

<4> isLocal:是否是一個本地讀取,也就是說,客戶端和資料塊是否在同一臺機器上

<5> isShortCircuit():是否是一個短路讀取,注意短路讀取必須是本地讀取。

<6> getClientMmap():為當前讀取獲得一個記憶體對映區域(請參考零拷貝讀取)

先看看BlockReaderFactory構造BlockReader物件的流程

在DFSInputStream.read()方法中,在呼叫readWithStrategy()方法的blockSeekTo()方法中,會建立BlockReader,程式碼如下:

blockReader = new BlockReaderFactory(dfsClient.getConf()).
            setInetSocketAddress(targetAddr).
            setRemotePeerFactory(dfsClient).
            setDatanodeInfo(chosenNode).
            setStorageType(storageType).
            setFileName(src).
            setBlock(blk).
            setBlockToken(accessToken).
            setStartOffset(offsetIntoBlock).
            setVerifyChecksum(verifyChecksum).
            setClientName(dfsClient.clientName).
            setLength(blk.getNumBytes() - offsetIntoBlock).
            setCachingStrategy(cachingStrategy).
            setAllowShortCircuitLocalReads(!shortCircuitForbidden()).
            setClientCacheContext(dfsClient.getClientContext()).
            setUserGroupInformation(dfsClient.ugi).
            setConfiguration(dfsClient.getConfiguration()).
            build();

其中build()方法的程式碼如下:

public BlockReader build() throws IOException {
    BlockReader reader = null;

    Preconditions.checkNotNull(configuration);
    if (conf.shortCircuitLocalReads && allowShortCircuitLocalReads) {
      if (clientContext.getUseLegacyBlockReaderLocal()) {
        reader = getLegacyBlockReaderLocal();
        if (reader != null) {
          if (LOG.isTraceEnabled()) {
            LOG.trace(this + ": returning new legacy block reader local.");
          }
          return reader;
        }
      } else {
        reader = getBlockReaderLocal();
        if (reader != null) {
          if (LOG.isTraceEnabled()) {
            LOG.trace(this + ": returning new block reader local.");
          }
          return reader;
        }
      }
    }
    if (conf.domainSocketDataTraffic) {
      reader = getRemoteBlockReaderFromDomain();
      if (reader != null) {
        if (LOG.isTraceEnabled()) {
          LOG.trace(this + ": returning new remote block reader using " +
              "UNIX domain socket on " + pathInfo.getPath());
        }
        return reader;
      }
    }
    Preconditions.checkState(!DFSInputStream.tcpReadsDisabledForTesting,
        "TCP reads were disabled for testing, but we failed to " +
        "do a non-TCP read.");
    return getRemoteBlockReaderFromTcp();
  }

build()方法首先嚐試建立一個本地短路讀取器,短路讀取避免了Socket通訊的開銷。如果短路讀取方式建立失敗,則建立一個域套接字讀取器,這種方式使用Linux的domainSocket方法進行本地傳輸(由dfs.client.domain.socket.data.traffic配置,預設為false)。如果上述兩種方式都不能建立成功,則建立一個遠端讀取器,使用TCP進行資料的讀取。

 

<1> getLegacyBlockReaderLocal()方法

        (1) 這個方法先判斷客戶端和datanode是否在同一臺機器上,如果不在那麼就返回null

         (2) 呼叫BlockReaderLocalLegacy.newBlockReader(conf,userGroupInformation, configuration, fileName, block,                                 token,datanode, startOffset, length, storageType)函式建立BlockReaderLocalLegacy類物件,newBlockReader()方法              程式碼如下:

          

/**
   * The only way this object can be instantiated.
   */
  static BlockReaderLocalLegacy newBlockReader(DFSClient.Conf conf,
      UserGroupInformation userGroupInformation,
      Configuration configuration, String file, ExtendedBlock blk,
      Token<BlockTokenIdentifier> token, DatanodeInfo node, 
      long startOffset, long length, StorageType storageType)
      throws IOException {
    LocalDatanodeInfo localDatanodeInfo = getLocalDatanodeInfo(node
        .getIpcPort());
    // check the cache first
    // 如果本地快取中已經有了該檔案路徑和驗證碼檔案路徑等資訊,那麼就直接從快取中獲取
    BlockLocalPathInfo pathinfo = localDatanodeInfo.getBlockLocalPathInfo(blk);
    if (pathinfo == null) {
      if (userGroupInformation == null) {
        userGroupInformation = UserGroupInformation.getCurrentUser();
      }
      //通過代理物件用RPC從datanode獲取對應塊所在的檔案路徑和驗證碼檔案路徑等資訊,如果獲取到了
      //會將該資訊儲存到本地快取中
      pathinfo = getBlockPathInfo(userGroupInformation, blk, node,
          configuration, conf.socketTimeout, token,
          conf.connectToDnViaHostname, storageType);
    }

    // check to see if the file exists. It may so happen that the
    // HDFS file has been deleted and this block-lookup is occurring
    // on behalf of a new HDFS file. This time, the block file could
    // be residing in a different portion of the fs.data.dir directory.
    // In this case, we remove this entry from the cache. The next
    // call to this method will re-populate the cache.
    FileInputStream dataIn = null;
    FileInputStream checksumIn = null;
    BlockReaderLocalLegacy localBlockReader = null;
    boolean skipChecksumCheck = conf.skipShortCircuitChecksums ||
        storageType.isTransient();
    try {
      // get a local file system
      //獲取本地檔案路徑後,開始直接開啟本地檔案
      File blkfile = new File(pathinfo.getBlockPath());
      dataIn = new FileInputStream(blkfile);

      if (LOG.isDebugEnabled()) {
        LOG.debug("New BlockReaderLocalLegacy for file " + blkfile + " of size "
            + blkfile.length() + " startOffset " + startOffset + " length "
            + length + " short circuit checksum " + !skipChecksumCheck);
      }

      if (!skipChecksumCheck) {
        // get the metadata file
    	//獲取本地檔案驗證碼檔案路徑
        File metafile = new File(pathinfo.getMetaPath());
        checksumIn = new FileInputStream(metafile);

        final DataChecksum checksum = BlockMetadataHeader.readDataChecksum(
            new DataInputStream(checksumIn), blk);
        long firstChunkOffset = startOffset
            - (startOffset % checksum.getBytesPerChecksum());
        localBlockReader = new BlockReaderLocalLegacy(conf, file, blk, token,
            startOffset, length, pathinfo, checksum, true, dataIn,
            firstChunkOffset, checksumIn);
      } else {
        localBlockReader = new BlockReaderLocalLegacy(conf, file, blk, token,
            startOffset, length, pathinfo, dataIn);
      }
    } catch (IOException e) {
      // remove from cache
      localDatanodeInfo.removeBlockLocalPathInfo(blk);
      DFSClient.LOG.warn("BlockReaderLocalLegacy: Removing " + blk
          + " from cache because local file " + pathinfo.getBlockPath()
          + " could not be opened.");
      throw e;
    } finally {
      if (localBlockReader == null) {
        if (dataIn != null) {
          dataIn.close();
        }
        if (checksumIn != null) {
          checksumIn.close();
        }
      }
    }
    return localBlockReader;
  }

<2> getBlockReaderLocal()方法

這個方法會嘗試建立一個本地短路讀取器,這個方法首先從clientContext中獲取ShortCircuitCache,ShortCircuitCache是在DFSClient端快取ShortCircuitReplicaInfo的類。然後呼叫fetchOrCreate()方法從ShortCircuitCache中獲取當前讀取資料塊對應的ShortCircuitReplicaInfo類。

ShortCircuitCache類會在檔案短路讀操作小節中一併介紹,ShortCircuitCache中的ShortCircuitReplica類儲存了用來執行短路讀取的檔案描述符、client和Datanode共享記憶體中記錄當前副本資訊的Slot物件,以及資料塊在記憶體中的對映檔案mmapData。

獲取了資料塊對應的ShortCircuitReplica後,getBlockReaderLocal()方法會使用ShortCircuitReplica中儲存的檔案描述符構造資料塊檔案以及校驗檔案的輸入流,然後構造BlockReaderLocal類。

由於getBlockReaderLocal()方法涉及的內容比較多,我後面會單獨作為一篇文章進行講解,BlockReaderLocal類物件建立詳細流程

<3>getRemoteBlockReaderFromDomain()方法和getRemoteBlockReaderFromTcp()方法

這兩個方法分別使用Domain Socket以及TCP Socket作為底層IO流,構造RemoteBlockRead2物件讀取資料塊。

 

RemoteBlockRead2類

RemoteBlockRead2類實現了通過Socket連線(可以是Domain Socket或者TCP Socket)從Datanode讀取一個數據塊的邏輯。我看一下該類中的read()函式,程式碼如下:

@Override
  public int read(ByteBuffer buf) throws IOException {
    if (curDataSlice == null || curDataSlice.remaining() == 0 && bytesNeededToFinish > 0) {
       //讀取下一個資料包,將資料包中的資料部分存入curDataSlice變數中
       readNextPacket();
    }
    if (curDataSlice.remaining() == 0) {
      // we're at EOF now
      return -1;
    }

    //將curDataSlice中的資料寫入buf中
    int nRead = Math.min(curDataSlice.remaining(), buf.remaining());
    ByteBuffer writeSlice = curDataSlice.duplicate();
    writeSlice.limit(writeSlice.position() + nRead);
    buf.put(writeSlice);
    curDataSlice.position(writeSlice.position());

    return nRead;
  }

readNextPacket()函式程式碼如下:

private void readNextPacket() throws IOException {
    //Read packet headers.
    //呼叫packetReceiver從IO流中讀取一個新的資料包
    packetReceiver.receiveNextPacket(in);

    //將資料包頭讀入curHeader變數中,將資料包資料寫入curDataSlice變數中
    PacketHeader curHeader = packetReceiver.getHeader();
    curDataSlice = packetReceiver.getDataSlice();
    assert curDataSlice.capacity() == curHeader.getDataLen();
    
    //檢查頭域中的長度
    if (LOG.isTraceEnabled()) {
      LOG.trace("DFSClient readNextPacket got header " + curHeader);
    }

    // Sanity check the lengths
    if (!curHeader.sanityCheck(lastSeqNo)) {
         throw new IOException("BlockReader: error in packet header " +
                               curHeader);
    }
    
    //檢查資料和校驗和是否匹配
    if (curHeader.getDataLen() > 0) {
      int chunks = 1 + (curHeader.getDataLen() - 1) / bytesPerChecksum;
      int checksumsLen = chunks * checksumSize;

      assert packetReceiver.getChecksumSlice().capacity() == checksumsLen :
        "checksum slice capacity=" + packetReceiver.getChecksumSlice().capacity() + 
          " checksumsLen=" + checksumsLen;
      
      lastSeqNo = curHeader.getSeqno();
      if (verifyChecksum && curDataSlice.remaining() > 0) {
        // N.B.: the checksum error offset reported here is actually
        // relative to the start of the block, not the start of the file.
        // This is slightly misleading, but preserves the behavior from
        // the older BlockReader.
        checksum.verifyChunkedSums(curDataSlice,
            packetReceiver.getChecksumSlice(),
            filename, curHeader.getOffsetInBlock());
      }
      bytesNeededToFinish -= curHeader.getDataLen();
    }    
    
    // First packet will include some data prior to the first byte
    // the user requested. Skip it.
    if (curHeader.getOffsetInBlock() < startOffset) {
      int newPos = (int) (startOffset - curHeader.getOffsetInBlock());
      curDataSlice.position(newPos);
    }

    // If we've now satisfied the whole client read, read one last packet
    // header, which should be empty
    //如果完成了客戶端的整個讀取操作,讀取最後一個空的資料包,因為資料塊的最後一個數據包為空的標識資料包
    if (bytesNeededToFinish <= 0) {
      readTrailingEmptyPacket();
      if (verifyChecksum) {
        sendReadResult(Status.CHECKSUM_OK);
      } else {
        sendReadResult(Status.SUCCESS);
      }
    }
  }

BlockReaderLocal類

該類實現了本地短路讀取功能,也就是當客戶端與Datanode在同一臺機器上時,客戶端可以繞過Datanode程序直接從本地磁碟讀取資料。

當客戶端向Datanode請求資料時,Datanode會開啟塊檔案以及該塊檔案的元資料檔案,將這兩個檔案的檔案描述符通過domainSocket傳給客戶端,客戶端拿到檔案描述符後構造輸入流,之後通過輸入流直接讀取磁碟上的塊檔案,採用這種方式,資料繞過了Datanode程序的轉發,提供了更好的讀取效能(參考HDFS-347)。由於檔案描述符是隻讀的,所以客戶端不能修改收到的檔案,同時由於客戶端自身無法訪問塊檔案所在的目錄,所以它也就不能訪問資料目錄中的其他檔案了,從而提供的資料的安全性。

BlockReaderLocal流程圖
BlockReaderLocal流程圖

以下是BlockReaderLocal類中read()方法程式碼:

@Override
  public synchronized int read(ByteBuffer buf) throws IOException {
    //能否跳過資料校驗
    boolean canSkipChecksum = createNoChecksumContext();
    try {
      String traceString = null;
      if (LOG.isTraceEnabled()) {
        traceString = new StringBuilder().
            append("read(").
            append("buf.remaining=").append(buf.remaining()).
            append(", block=").append(block).
            append(", filename=").append(filename).
            append(", canSkipChecksum=").append(canSkipChecksum).
            append(")").toString();
        LOG.info(traceString + ": starting");
      }
      int nRead;
      try {
        //可以跳過資料校驗,不需要預讀
        if (canSkipChecksum && zeroReadaheadRequested) {
          nRead = readWithoutBounceBuffer(buf);
        } else {
          //需要校驗,以及開啟了預讀取時
          nRead = readWithBounceBuffer(buf, canSkipChecksum);
        }
      } catch (IOException e) {
        if (LOG.isTraceEnabled()) {
          LOG.info(traceString + ": I/O error", e);
        }
        throw e;
      }
      if (LOG.isTraceEnabled()) {
        LOG.info(traceString + ": returning " + nRead);
      }
      return nRead;
    } finally {
      if (canSkipChecksum) releaseNoChecksumContext();
    }
  }

該read()方法的程式碼可以切分為三塊:

第一塊:

判斷能否通過createNoChecksumContext()方法建立一個免校驗上下文

第二塊:

如果可以免校驗,並且無預讀取請求,則呼叫readWithoutBounceBuffer()方法讀取資料

第三塊:

如果不可以免校驗,並且開啟了預讀取,則呼叫readWithBounceBuffer()方法讀取資料。

 

下面分別介紹上面的三塊的具體實現

createNoChecksumContext()方法

該方法會判斷如果verifyChecksum欄位為false,也就是當前配置本來就不需要進行校驗,則直接返回true,建立免校驗上下文成功。如果當前配置需要進行校驗,那麼嘗試在Datanode和Client共享記憶體中副本的Slot上新增一個免校驗的錨(錨的概念後面會講到)。這裡注意,當且僅當Datanode已經快取了這個副本時,才可以新增一個錨,因為當Datanode嘗試快取一個數據塊副本時,會驗證資料塊的校驗和,然後通過mmap以及mlock將資料塊快取到記憶體中。也就是說說,當前Datanode上快取的資料塊是經過校驗的、是正確的,不用再次進行校驗。

 

readWithoutBounceBuffer()方法

這個方法比較簡單,不需要使用額外的資料以及校驗和和緩衝區預讀取資料以及校驗和,而是直接從資料流中將資料讀取到緩衝區。程式碼如下:

private synchronized int readWithoutBounceBuffer(ByteBuffer buf)
      throws IOException {
    freeDataBufIfExists();
    freeChecksumBufIfExists();
    int total = 0;
    //直接從輸入流中將資料讀取到buf
    while (buf.hasRemaining()) {
      int nRead = dataIn.read(buf, dataPos);
      if (nRead <= 0) break;
      dataPos += nRead;
      total += nRead;
    }
    return (total == 0 && (dataPos == dataIn.size())) ? -1 : total;
  }

 

readWithBounceBuffer()方法

該方法在BlockReaderLocal物件上申請了兩個緩衝區:

dataBuf 資料緩衝區

checksumBuf 校驗和緩衝區

dataBuf緩衝區的大小為maxReadaheadLength,這個長度始終是校驗塊(chunk,一個校驗值對應的資料長度)的整數倍,這樣設計是為了進行校驗操作時比較方便,能夠以校驗塊為單位讀取資料。dataBuf和checksumBuf的構造使用了direct byte buffer,也就是堆外記憶體上的緩衝區。

dataBuf以及checksumBuf都是通過呼叫java.nio.ByteBuffer.allocateDirect()方法分配的堆外記憶體,這裡值得我們積累,對於比較大的緩衝區,可以通過呼叫java.nio提供的方法,將緩衝區分配在堆外,節省寶貴的堆記憶體空間。

BlockReaderLocal提供了對緩衝區操作的幾個方法

<1> fillBuffer(ByteBuffer buf,boolean canSkipChecksum): 

將資料從輸入流讀入指定buf中,並將校驗和讀入checksumBuf中進行校驗操作

<2>fillDataBuf():呼叫fillBuffer()方法將資料讀入dataBuf緩衝區中,將校驗和讀入checksumBuf緩衝區中,這裡需要注意,dataBuf緩衝區中的資料始終是chunk(一個校驗值對應的資料長度)的整數倍。

<3>將dataBuf緩衝區中的資料拉取到buf中,然後返回讀取的位元組數。

 

readWithBounceBuffer()中首先從dataBuf中拉取快取中的資料到buf,這樣就保證了讀取遊標pos在chunk邊界上。如果buf的剩餘空間大於dataBuf緩衝區的大小,且當前資料流遊標在chunk邊界上,則呼叫fillBuffer(buf)方法將資料直接讀入buf,而不通過dataBuf快取。如果buf的剩餘空間小於dataBuf緩衝區大小,則先呼叫fillDataBuf()方法將資料讀入dataBuf快取,然後再呼叫drainDataBuf()將dataBuf中的資料拉取到buf緩衝區。

readWithBounceBuffer()方法的程式碼如下:

/**
   * Read using the bounce buffer.
   *
   * A 'direct' read actually has three phases. The first drains any
   * remaining bytes from the slow read buffer. After this the read is
   * guaranteed to be on a checksum chunk boundary. If there are still bytes
   * to read, the fast direct path is used for as many remaining bytes as
   * possible, up to a multiple of the checksum chunk size. Finally, any
   * 'odd' bytes remaining at the end of the read cause another slow read to
   * be issued, which involves an extra copy.
   *
   * Every 'slow' read tries to fill the slow read buffer in one go for
   * efficiency's sake. As described above, all non-checksum-chunk-aligned
   * reads will be served from the slower read path.
   *
   * @param buf              The buffer to read into. 
   * @param canSkipChecksum  True if we can skip checksums.
   */
  private synchronized int readWithBounceBuffer(ByteBuffer buf,
        boolean canSkipChecksum) throws IOException {
    int total = 0;
    //呼叫drainDataBuf(),將dataBuf緩衝區中的資料寫入buf
    int bb = drainDataBuf(buf); // drain bounce buffer if possible
    if (bb >= 0) {
      total += bb;
      if (buf.remaining() == 0) return total;
    }
    boolean eof = true, done = false;
    do {
      //如果buf的空間足夠大,並且輸入遊標在chunk邊界上,則直接從IO流中將資料寫入buf
      if (buf.isDirect() && (buf.remaining() >= maxReadaheadLength)
            && ((dataPos % bytesPerChecksum) == 0)) {
        // Fast lane: try to read directly into user-supplied buffer, bypassing
        // bounce buffer.
        int oldLimit = buf.limit();
        int nRead;
        try {
          buf.limit(buf.position() + maxReadaheadLength);
          nRead = fillBuffer(buf, canSkipChecksum);
        } finally {
          buf.limit(oldLimit);
        }
        if (nRead < maxReadaheadLength) {
          done = true;
        }
        if (nRead > 0) {
          eof = false;
        }
        total += nRead;
      } else {
        // Slow lane: refill bounce buffer.
        //否則,將資料讀入dataBuf快取
        if (fillDataBuf(canSkipChecksum)) {
          done = true;
        }
        //然後將dataBuf中的資料匯入buf
        bb = drainDataBuf(buf); // drain bounce buffer if possible
        if (bb >= 0) {
          eof = false;
          total += bb;
        }
      }
    } while ((!done) && (buf.remaining() > 0));
    return (eof && total == 0) ? -1 : total;
  }

3、HasEnhancedByteBufferAccess.read()

DFSInputStream實現了HasEnhancedByteBufferAccess介面的read()方法,提供了以零拷貝模式讀取資料塊的功能。程式碼如下:

@Override
  public synchronized ByteBuffer read(ByteBufferPool bufferPool,
      int maxLength, EnumSet<ReadOption> opts) 
          throws IOException, UnsupportedOperationException {
    if (maxLength == 0) {
      return EMPTY_BUFFER;
    } else if (maxLength < 0) {
      throw new IllegalArgumentException("can't read a negative " +
          "number of bytes.");
    }
    if ((blockReader == null) || (blockEnd == -1)) {
      if (pos >= getFileLength()) {
        return null;
      }
      /*
       * If we don't have a blockReader, or the one we have has no more bytes
       * left to read, we call seekToBlockSource to get a new blockReader and
       * recalculate blockEnd.  Note that we assume we're not at EOF here
       * (we check this above).
       */
      if ((!seekToBlockSource(pos)) || (blockReader == null)) {
        throw new IOException("failed to allocate new BlockReader " +
            "at position " + pos);
      }
    }
    ByteBuffer buffer = null;
    //首先嚐試零拷貝模式
    if (dfsClient.getConf().shortCircuitMmapEnabled) {
      buffer = tryReadZeroCopy(maxLength, opts);
    }
    if (buffer != null) {
      return buffer;
    }
    //如果零拷貝讀取不成功,則退化為一個普通的讀取
    buffer = ByteBufferUtil.fallbackRead(this, bufferPool, maxLength);
    if (buffer != null) {
      extendedReadBuffers.put(buffer, bufferPool);
    }
    return buffer;
  }

<1> tryReadZeroCopy()

在傳統的檔案IO操作中,都是呼叫作業系統提供的系統呼叫函式read()或write()來執行讀寫操作的,此時呼叫此函式的程序(在Java中即java程序)會由使用者態切換到核心態,然後作業系統的核心程式碼負責將相應的檔案資料讀取到核心的IO緩衝區,最後再把資料從核心IO緩衝區拷貝到程序的私有地址空間中,這樣便完成了一次IO操作。

tryReadZeroCopy()方法使用了記憶體對映檔案的讀取方式。記憶體對映檔案和標準IO操作最大的不同是並不需要將資料讀取到作業系統的核心緩衝區,而是直接將程序私有地址空間中的一部分割槽域與檔案物件建立起對映關係,就好像直接從記憶體中讀寫檔案一樣,減少了IO的拷貝次數,提高了檔案的讀寫速度。

java提供了三種記憶體對映模式,即:只讀(readonly)、讀寫(read_write)、專用(private)。對於只讀模式來說,如果程式試圖進行寫操作,則會丟擲ReadOnlyBufferException異常;對於讀寫模式來說,如果程式通過記憶體對映檔案的方式寫或者修改檔案內容,則修改內容會立刻反映到磁碟檔案中,如果另一個程序共享了同一個對映檔案,也會立即看到變化;專用模式採用的是作業系統的"寫時拷貝"原則,即在沒有發生寫操作的情況下,多個程序之間都是共享檔案的同一塊實體記憶體的(程序各自的虛擬地址指向同一片實體地址),一旦某個程序進行寫操作,就會把受影響的檔案資料單獨拷貝一份到程序的私有緩衝區中,不會反映到物理檔案中。在tryReadZeroCopy()方法中使用的是隻讀模式。

對於資料檔案的讀取,記憶體對映讀取大大提高了效能,這種模式值得積累。

<1>、tryReadZeroCopy()方法程式碼如下:

private synchronized ByteBuffer tryReadZeroCopy(int maxLength,
      EnumSet<ReadOption> opts) throws IOException {
    // Copy 'pos' and 'blockEnd' to local variables to make it easier for the
    // JVM to optimize this function.
    final long curPos = pos;
    final long curEnd = blockEnd;
    final long blockStartInFile = currentLocatedBlock.getStartOffset();
    final long blockPos = curPos - blockStartInFile;

    // Shorten this read if the end of the block is nearby.
    //首先確保讀取是在同一個資料塊之內
    long length63;
    if ((curPos + maxLength) <= (curEnd + 1)) {
      length63 = maxLength;
    } else {
      length63 = 1 + curEnd - curPos;
      if (length63 <= 0) {
        if (DFSClient.LOG.isDebugEnabled()) {
          DFSClient.LOG.debug("Unable to perform a zero-copy read from offset " +
            curPos + " of " + src + "; " + length63 + " bytes left in block.  " +
            "blockPos=" + blockPos + "; curPos=" + curPos +
            "; curEnd=" + curEnd);
        }
        return null;
      }
      if (DFSClient.LOG.isDebugEnabled()) {
        DFSClient.LOG.debug("Reducing read length from " + maxLength +
            " to " + length63 + " to avoid going more than one byte " +
            "past the end of the block.  blockPos=" + blockPos +
            "; curPos=" + curPos + "; curEnd=" + curEnd);
      }
    }
    // Make sure that don't go beyond 31-bit offsets in the MappedByteBuffer.
    //確保讀取對映資料沒有超過2GB
    int length;
    if (blockPos + length63 <= Integer.MAX_VALUE) {
      length = (int)length63;
    } else {
      long length31 = Integer.MAX_VALUE - blockPos;
      if (length31 <= 0) {
        // Java ByteBuffers can't be longer than 2 GB, because they use
        // 4-byte signed integers to represent capacity, etc.
        // So we can't mmap the parts of the block higher than the 2 GB offset.
        // FIXME: we could work around this with multiple memory maps.
        // See HDFS-5101.
        if (DFSClient.LOG.isDebugEnabled()) {
          DFSClient.LOG.debug("Unable to perform a zero-copy read from offset " +
            curPos + " of " + src + "; 31-bit MappedByteBuffer limit " +
            "exceeded.  blockPos=" + blockPos + ", curEnd=" + curEnd);
        }
        return null;
      }
      length = (int)length31;
      if (DFSClient.LOG.isDebugEnabled()) {
        DFSClient.LOG.debug("Reducing read length from " + maxLength +
            " to " + length + " to avoid 31-bit limit.  " +
            "blockPos=" + blockPos + "; curPos=" + curPos +
            "; curEnd=" + curEnd);
      }
    }
    //呼叫blockReader.getClientMmap()將檔案對映到記憶體中,並返回ClientMmap物件。這個物件當中包含了MappedByteBuffer物件
    final ClientMmap clientMmap = blockReader.getClientMmap(opts);
    if (clientMmap == null) {
      if (DFSClient.LOG.isDebugEnabled()) {
        DFSClient.LOG.debug("unable to perform a zero-copy read from offset " +
          curPos + " of " + src + "; BlockReader#getClientMmap returned " +
          "null.");
      }
      return null;
    }
    boolean success = false;
    ByteBuffer buffer;
    try {
      seek(curPos + length);
      //將記憶體對映緩衝區返回,在緩衝區中是資料塊檔案的資料
      buffer = clientMmap.getMappedByteBuffer().asReadOnlyBuffer();
      buffer.position((int)blockPos);
      buffer.limit((int)(blockPos + length));
      extendedReadBuffers.put(buffer, clientMmap);
      readStatistics.addZeroCopyBytes(length);
      if (DFSClient.LOG.isDebugEnabled()) {
        DFSClient.LOG.debug("readZeroCopy read " + length + 
            " bytes from offset " + curPos + " via the zero-copy read " +
            "path.  blockEnd = " + blockEnd);
      }
      success = true;
    } finally {
      if (!success) {
        IOUtils.closeQuietly(clientMmap);
      }
    }
    return buffer;
  }

tryReadZeroCopy()會通過呼叫序列BlockReader.getClientMmap()->ShortCircuitReplica.getOrCreateClientMmap()->ShortCircuitCache.getOrCreateClientMmap()->ShortCircuitReplica.loadMmapInternal()獲取MappedByteBuffer物件,也就是資料塊檔案在記憶體中的對映物件。BlockReader.getClientMmap()程式碼如下:

/**
   * Get or create a memory map for this replica.
   * 
   * There are two kinds of ClientMmap objects we could fetch here: one that 
   * will always read pre-checksummed data, and one that may read data that
   * hasn't been checksummed.
   *
   * If we fetch the former, "safe" kind of ClientMmap, we have to increment
   * the anchor count on the shared memory slot.  This will tell the DataNode
   * not to munlock the block until this ClientMmap is closed.
   * If we fetch the latter, we don't bother with anchoring.
   *
   * @param opts     The options to use, such as SKIP_CHECKSUMS.
   * 
   * @return         null on failure; the ClientMmap otherwise.
   */
  @Override
  public ClientMmap getClientMmap(EnumSet<ReadOption> opts) {
    boolean anchor = verifyChecksum &&
        (opts.contains(ReadOption.SKIP_CHECKSUMS) == false);
    if (anchor) {
      if (!createNoChecksumContext()) {
        if (LOG.isTraceEnabled()) {
          LOG.trace("can't get an mmap for " + block + " of " + filename + 
              " since SKIP_CHECKSUMS was not given, " +
              "we aren't skipping checksums, and the block is not mlocked.");
        }
        return null;
      }
    }
    ClientMmap clientMmap = null;
    try {
      clientMmap = replica.getOrCreateClientMmap(anchor);
    } finally {
      if ((clientMmap == null) && anchor) {
        releaseNoChecksumContext();
      }
    }
    return clientMmap;
  }

ShortCircuitReplica.getOrCreateClientMmap()程式碼如下:

 public ClientMmap getOrCreateClientMmap(boolean anchor) {
    return cache.getOrCreateClientMmap(this, anchor);
 }

ShortCircuitCache.getOrCreateClientMmap()程式碼如下:

ClientMmap getOrCreateClientMmap(ShortCircuitReplica replica,
      boolean anchored) {
    Condition newCond;
    lock.lock();
    try {
      while (replica.mmapData != null) {
        if (replica.mmapData instanceof MappedByteBuffer) {
          ref(replica);
          MappedByteBuffer mmap = (MappedByteBuffer)replica.mmapData;
          return new ClientMmap(replica, mmap, anchored);
        } else if (replica.mmapData instanceof Long) {
          long lastAttemptTimeMs = (Long)replica.mmapData;
          long delta = Time.monotonicNow() - lastAttemptTimeMs;
          if (delta < mmapRetryTimeoutMs) {
            if (LOG.isTraceEnabled()) {
              LOG.trace(this + ": can't create client mmap for " +
                  replica + " because we failed to " +
                  "create one just " + delta + "ms ago.");
            }
            return null;
          }
          if (LOG.isTraceEnabled()) {
            LOG.trace(this + ": retrying client mmap for " + replica +
                ", " + delta + " ms after the previous failure.");
          }
        } else if (replica.mmapData instanceof Condition) {
          Condition cond = (Condition)replica.mmapData;
          cond.awaitUninterruptibly();
        } else {
          Preconditions.checkState(false, "invalid mmapData type " +
              replica.mmapData.getClass().getName());
        }
      }
      newCond = lock.newCondition();
      replica.mmapData = newCond;
    } finally {
      lock.unlock();
    }
    MappedByteBuffer map = replica.loadMmapInternal();
    lock.lock();
    try {
      if (map == null) {
        replica.mmapData = Long.valueOf(Time.monotonicNow());
        newCond.signalAll();
        return null;
      } else {
        outstandingMmapCount++;
        replica.mmapData = map;
        ref(replica);
        newCond.signalAll();
        return new ClientMmap(replica, map, anchored);
      }
    } finally {
      lock.unlock();
    }
  }

ShortCircuitReplica.loadMmapInternal()的程式碼如下:

MappedByteBuffer loadMmapInternal() {
    try {
      FileChannel channel = dataStream.getChannel();
      //呼叫java.nio.channel.map()方法建立檔案的記憶體對映
      MappedByteBuffer mmap = channel.map(MapMode.READ_ONLY, 0, 
          Math.min(Integer.MAX_VALUE, channel.size()));
      if (LOG.isTraceEnabled()) {
        LOG.trace(this + ": created mmap of size " + channel.size());
      }
      return mmap;
    } catch (IOException e) {
      LOG.warn(this + ": mmap error", e);
      return null;
    } catch (RuntimeException e) {
      LOG.warn(this + ": mmap error", e);
      return null;
    }
  }

tryReadZeroCopy()方法首先通過上面的getClientMmap()方法獲取資料塊檔案的記憶體對映物件clientMmap,clientMmap物件中儲存了MappedByteBuffer物件,也就是資料塊檔案在記憶體中的對映緩衝區,tryReadZeroCopy()會通過clientMmap獲取這個MappedByteBuffer物件並將這個物件返回。接下來使用者程式碼就可以從MappedByteBuffer這個物件讀取資料了,這裡特別注意的是,java的ByteBuffer只支援2GB以下的空間,因為ByteBuffer使用4位元組的地址空間,所以需要對載入資料的大小進行判斷,超過2GB不予載入。

<2> ByteBufferUtil.fallbackRead()

/**
   * Perform a fallback read.
   */
  public static ByteBuffer fallbackRead(
      InputStream stream, ByteBufferPool bufferPool, int maxLength)
          throws IOException {
    if (bufferPool == null) {
      throw new UnsupportedOperationException("zero-copy reads " +
          "were not available, and you did not provide a fallback " +
          "ByteBufferPool.");
    }
    //判斷stream是否支援將資料讀入ByteBuffer
    boolean useDirect = streamHasByteBufferRead(stream);
    //呼叫ByteBufferPool構造一個ByteBuffer
    ByteBuffer buffer = bufferPool.getBuffer(useDirect, maxLength);
    if (buffer == null) {
      //ByteBufferPool無法構造ByteBuffer
      throw new UnsupportedOperationException("zero-copy reads " +
          "were not available, and the ByteBufferPool did not provide " +
          "us with " + (useDirect ? "a direct" : "an indirect") +
          "buffer.");
    }
    Preconditions.checkState(buffer.capacity() > 0);
    Preconditions.checkState(buffer.isDirect() == useDirect);
    maxLength = Math.min(maxLength, buffer.capacity());
    boolean success = false;
    try {
      if (useDirect) {
        buffer.clear();
        buffer.limit(maxLength);
        ByteBufferReadable readable = (ByteBufferReadable)stream;
        int totalRead = 0;
        while (true) {
          if (totalRead >= maxLength) {
            success = true;
            break;
          }
          //直接呼叫stream上支援ByteBufferRead的函式
          int nRead = readable.read(buffer);
          if (nRead < 0) {
            if (totalRead > 0) {
              success = true;
            }
            break;
          }
          totalRead += nRead;
        }
        buffer.flip();
      } else {
        buffer.clear();
        //呼叫InputStream.read(byte[])方法
        int nRead = stream.read(buffer.array(),
            buffer.arrayOffset(), maxLength);
        if (nRead >= 0) {
          buffer.limit(nRead);
          success = true;
        }
      }
    } finally {
      if (!success) {
        // If we got an error while reading, or if we are at EOF, we 
        // don't need the buffer any more.  We can give it back to the
        // bufferPool.
        bufferPool.putBuffer(buffer);
        buffer = null;
      }
    }
    return buffer;
  }

當read()方法執行零拷貝讀操作失敗後,會呼叫ByteBufferUtil.fallbackRead()退化為一個普通的讀操作。ByteBufferUtil.fallbackRead()方法非常簡單,判斷傳入引數的InputStream(DFSInputStream)是否支援ByteBufferRead(實現了ByteBufferReadable介面)。如果支援則直接將資料讀取至ByteBuffer中,否則讀取到ByteBuffer.array()位元組陣列中。

4、關閉輸入流

使用者程式碼讀取完所有資料之後,就會呼叫DFSInputStream.close()方法關閉輸入流。close()方法的實現也非常簡單,它首先檢查DFSClient是否處於執行狀態,然後關閉讀取過程中可能使用過的ByteBuffer,最後呼叫BlockReader.close()關閉當前輸入流底層的BlockReader。close()程式碼如下:

/**
   * Close it down!
   */
  @Override
  public synchronized void close() throws IOException {
    if (closed) {
      return;
    }
    dfsClient.checkOpen();
    //關閉讀取過程中使用的ByteBuffer
    if (!extendedReadBuffers.isEmpty()) {
      final StringBuilder builder = new StringBuilder();
      extendedReadBuffers.visitAll(new IdentityHashStore.Visitor<ByteBuffer, Object>() {
        private String prefix = "";
        @Override
        public void accept(ByteBuffer k, Object v) {
          builder.append(prefix).append(k);
          prefix = ", ";
        }
      });
      DFSClient.LOG.warn("closing file " + src + ", but there are still " +
          "unreleased ByteBuffers allocated by read().  " +
          "Please release " + builder.toString() + ".");
    }
    //關閉BlockReader物件
    if (blockReader != null) {
      blockReader.close();
      blockReader = null;
    }
    super.close();
    closed = true;
  }

接下來我們開始記錄5.3節的內容