讀HDFS書筆記---5.2 檔案讀操作與輸入流(5.2.2)---上
5.2.2 讀操作--DFSInputStream實現
HDFS目前實現的讀操作有三個層次,分別是網路讀、短路讀(short circuit read)以及零拷貝(zero copy read),它們的讀取效率一次遞增。
網路讀:
網路讀是最基本的一種HDFS讀,DFSClient和Datanode通過建立Socket連線傳輸資料。
短路讀:
當DFSClient和儲存目標資料塊的Datanode在同一個物理節點上時,DFSClient可以直接開啟資料塊副本檔案讀取資料,而不需要Datanode程序的轉發。後面會講到。
零拷貝讀:
當DFSClient和快取目標資料塊
HdfsDataInputStream.read()方法就實現了上面描述的三個層次讀取,程式碼(read方法在其父類FSDataInputStream下)如下:
@Override public ByteBuffer read(ByteBufferPool bufferPool, int maxLength, EnumSet<ReadOption> opts) throws IOException, UnsupportedOperationException { try { return ((HasEnhancedByteBufferAccess)in).read(bufferPool, maxLength, opts); } catch (ClassCastException e) { ByteBuffer buffer = ByteBufferUtil. fallbackRead(this, bufferPool, maxLength); if (buffer != null) { extendedReadBuffers.put(buffer, bufferPool); } return buffer; } }
HdfsDataInputStream.read()方法首先會呼叫HasEnhancedByteBufferAccess.read()方法嘗試進行零拷貝讀取,如果當前配置不支援零拷貝讀取模式,則丟擲異常,然後呼叫ByteBufferUtil.fallbackRead()靜態方法退化成短路讀或者網路讀。HdfsDataInputStream.read()方法呼叫流程如下圖:
HdfsDataInputStream實現了HasEnhancedByteBufferAccess.read()方法以及InputStream.read()方法,這兩個方法的實現都是通過呼叫底層包裝類DFSInputStream對應的方法執行的。HasEnhancedByteBufferAccess.read()方法定義了零拷貝讀取的實現,而InputStream.read()方法則定義了短路讀和網路讀的實現。下面我們開始分別講解DFSInputStream實現的InputStream.read()和HasEnhancedByteBufferAccess.read()。
這裡需要解釋一下,為什麼說HasEnhancedByteBufferAccess.read()方法以及InputStream.read()方法都是通過呼叫底層包裝類DFSInputStream對應的方法執行的?
我們看一下FSDataInputStream的建立函式doCall()方法,
public FSDataInputStream doCall(final Path p) throws IOException, UnresolvedLinkException {
//通過open函式返回一個DFSInputStream類物件
final DFSInputStream dfsis = dfs.open(getPathName(p), bufferSize, verifyChecksum);
//將DFSInputStream類物件封裝成HdfsDataInputStream類物件(createWrappedInputStream函式下面會列出)
return dfs.createWrappedInputStream(dfsis);
}
createWrappedInputStream()函式程式碼如下:
/**
* Wraps the stream in a CryptoInputStream if the underlying file is
* encrypted.
*/
public HdfsDataInputStream createWrappedInputStream(DFSInputStream dfsis)
throws IOException {
final FileEncryptionInfo feInfo = dfsis.getFileEncryptionInfo();
if (feInfo != null) {
// File is encrypted, wrap the stream in a crypto stream.
// Currently only one version, so no special logic based on the version #
getCryptoProtocolVersion(feInfo);
final CryptoCodec codec = getCryptoCodec(conf, feInfo);
final KeyVersion decrypted = decryptEncryptedDataEncryptionKey(feInfo);
final CryptoInputStream cryptoIn =
new CryptoInputStream(dfsis, codec, decrypted.getMaterial(),
feInfo.getIV());
return new HdfsDataInputStream(cryptoIn);
} else {
// No FileEncryptionInfo so no encryption.
return new HdfsDataInputStream(dfsis);
}
}
可以看到,HdfsDataInputStream類將DFSInputStream類物件作為建構函式的引數傳入,最終賦值給InputStream類中的成員變數,如下:
protected volatile InputStream in;
在呼叫read函式的時候,會呼叫HdfsDataInputStream類中的read函式,但是由於該類中沒有實現read函式,所以呼叫它的父類FDSInputStream類中的read函式,該函式先嚐試進行零拷貝,程式碼為
return ((HasEnhancedByteBufferAccess)in).read(bufferPool, maxLength, opts);
這裡的in實際型別為DFSInputStream類,由於該類實現了介面HasEnhancedByteBufferAccess,所以這裡轉換沒有問題,所以這裡的read呼叫的是DFSInputStream類中的函式,如果該read函式呼叫異常,那麼就會執行程式碼
ByteBuffer buffer = ByteBufferUtil.fallbackRead(this, bufferPool, maxLength);
fallbackRead函式程式碼如下:
/**
* Perform a fallback read.
*/
public static ByteBuffer fallbackRead(
InputStream stream, ByteBufferPool bufferPool, int maxLength)
throws IOException {
if (bufferPool == null) {
throw new UnsupportedOperationException("zero-copy reads " +
"were not available, and you did not provide a fallback " +
"ByteBufferPool.");
}
boolean useDirect = streamHasByteBufferRead(stream);
ByteBuffer buffer = bufferPool.getBuffer(useDirect, maxLength);
if (buffer == null) {
throw new UnsupportedOperationException("zero-copy reads " +
"were not available, and the ByteBufferPool did not provide " +
"us with " + (useDirect ? "a direct" : "an indirect") +
"buffer.");
}
Preconditions.checkState(buffer.capacity() > 0);
Preconditions.checkState(buffer.isDirect() == useDirect);
maxLength = Math.min(maxLength, buffer.capacity());
boolean success = false;
try {
if (useDirect) {
buffer.clear();
buffer.limit(maxLength);
ByteBufferReadable readable = (ByteBufferReadable)stream;
int totalRead = 0;
while (true) {
if (totalRead >= maxLength) {
success = true;
break;
}
int nRead = readable.read(buffer);
if (nRead < 0) {
if (totalRead > 0) {
success = true;
}
break;
}
totalRead += nRead;
}
buffer.flip();
} else {
buffer.clear();
int nRead = stream.read(buffer.array(),
buffer.arrayOffset(), maxLength);
if (nRead >= 0) {
buffer.limit(nRead);
success = true;
}
}
} finally {
if (!success) {
// If we got an error while reading, or if we are at EOF, we
// don't need the buffer any more. We can give it back to the
// bufferPool.
bufferPool.putBuffer(buffer);
buffer = null;
}
}
return buffer;
}
從上面的程式碼可以看到,裡面呼叫了FSDInputStream中的兩種read函式,最終呼叫的都是FSDInputStream類中in成員變數的read函式,而in的實際型別就是DFSInputStream,所以read函式最終呼叫的都是DFSInputStream類中的。
InputStream.read()
InputStream.read()函式的流程圖如下:
read函式程式碼如下:
/**
* Read the entire buffer.
*/
@Override
public synchronized int read(final byte buf[], int off, int len) throws IOException {
//這裡使用位元組陣列作為容器
ReaderStrategy byteArrayReader = new ByteArrayStrategy(buf);
return readWithStrategy(byteArrayReader, off, len);
}
read()方法將從輸入流的off遊標開始,讀取len個位元組,然後存入buf[]快取陣列中,這裡的off、len以及buf[]都是read()方法的輸入引數。read()方法首先會構造一個ByteArrayStrategy物件,表明當前的讀取操作使用位元組陣列作為容器,然後呼叫readWithStrategy()方法讀取資料。其中ByteArrayStrategy類的程式碼如下:
/**
* Used to read bytes into a byte[]
*/
private static class ByteArrayStrategy implements ReaderStrategy {
final byte[] buf;
public ByteArrayStrategy(byte[] buf) {
this.buf = buf;
}
@Override
public int doRead(BlockReader blockReader, int off, int len,
ReadStatistics readStatistics) throws ChecksumException, IOException {
int nRead = blockReader.read(buf, off, len);
updateReadStatistics(readStatistics, nRead, blockReader);
return nRead;
}
}
下面我們看一下readWithStrategy()方法的實現,程式碼如下:
private int readWithStrategy(ReaderStrategy strategy, int off, int len) throws IOException {
dfsClient.checkOpen();
if (closed) {
throw new IOException("Stream closed");
}
Map<ExtendedBlock,Set<DatanodeInfo>> corruptedBlockMap
= new HashMap<ExtendedBlock, Set<DatanodeInfo>>();
failures = 0;
if (pos < getFileLength()) {//讀取位置在檔案範圍內
int retries = 2;//如果出現異常,則重試兩次
while (retries > 0) {
try {
// currentNode can be left as null if previous read had a checksum
// error on the same block. See HDFS-3067
//pos超過資料塊邊界,需要從新的資料塊開始讀取資料
if (pos > blockEnd || currentNode == null) {
//呼叫blockSeekTo()方法獲取儲存這個資料塊的一個數據節點
currentNode = blockSeekTo(pos);
}
//計算這次讀取的長度
int realLen = (int) Math.min(len, (blockEnd - pos + 1L));
if (locatedBlocks.isLastBlockComplete()) {
realLen = (int) Math.min(realLen, locatedBlocks.getFileLength());
}
//呼叫readBuffer()方法讀取資料
int result = readBuffer(strategy, off, realLen, corruptedBlockMap);
if (result >= 0) {
pos += result;//pos移位
} else {
// got a EOS from reader though we expect more data on it.
throw new IOException("Unexpected EOS from the reader");
}
if (dfsClient.stats != null) {
dfsClient.stats.incrementBytesRead(result);
}
return result;
} catch (ChecksumException ce) {
throw ce; //出現校驗錯誤,則丟擲異常
} catch (IOException e) {
if (retries == 1) {
DFSClient.LOG.warn("DFS Read", e);
}
blockEnd = -1;
if (currentNode != null) { addToDeadNodes(currentNode); }//將當前失敗的節點入黑名單
if (--retries == 0) {//重試超過兩次,直接丟擲異常
throw e;
}
} finally {
// Check if need to report block replicas corruption either read
// was successful or ChecksumException occured.
//檢查是否需要向Namenode彙報損壞的資料塊
reportCheckSumFailure(corruptedBlockMap,
currentLocatedBlock.getLocations().length);
}
}
}
return -1;
}
readWithStrategy()方法首先呼叫blockSeek()方法獲取一個儲存了目標資料塊的Datanode,然後呼叫readBuffer()方法從該Datanode讀取資料塊。如果讀取過程出現IO異常,則進行重試操作,並將該Datanode放入黑名單中。
可以看到,readWithStrategy()呼叫了blockSeekTo()以及readBuffer()方法,接下來講解這兩個方法。
(1) blockSeekTo()
該函式程式碼如下:
/**
* Open a DataInputStream to a DataNode so that it can be read from.
* We get block ID and the IDs of the destinations at startup, from the namenode.
*/
private synchronized DatanodeInfo blockSeekTo(long target) throws IOException {
if (target >= getFileLength()) {//如果讀取位置超過HDFS檔案長度,則丟擲異常
throw new IOException("Attempted to read past end of file");
}
// Will be getting a new BlockReader.
if (blockReader != null) {//關閉上一個資料塊對應的BlockReader
blockReader.close();
blockReader = null;
}
//
// Connect to best DataNode for desired Block, with potential offset
//
DatanodeInfo chosenNode = null;
int refetchToken = 1; // only need to get a new access token once
int refetchEncryptionKey = 1; // only need to get a new encryption key once
boolean connectFailedOnce = false;
while (true) {
//
// Compute desired block
//
//獲取target對應的資料塊的位置資訊
LocatedBlock targetBlock = getBlockAt(target, true);
assert (target==pos) : "Wrong postion " + pos + " expect " + target;
//獲取當前target在新資料塊中的偏移量
long offsetIntoBlock = target - targetBlock.getStartOffset();
//呼叫chooseDataNode()方法,獲取一個Datanode用來讀取該資料塊
DNAddrPair retval = chooseDataNode(targetBlock, null);
chosenNode = retval.info;
InetSocketAddress targetAddr = retval.addr;
StorageType storageType = retval.storageType;
try {
ExtendedBlock blk = targetBlock.getBlock();
Token<BlockTokenIdentifier> accessToken = targetBlock.getBlockToken();
//通過BlockReaderFactory獲取blockReader物件
blockReader = new BlockReaderFactory(dfsClient.getConf()).
setInetSocketAddress(targetAddr).
setRemotePeerFactory(dfsClient).
setDatanodeInfo(chosenNode).
setStorageType(storageType).
setFileName(src).
setBlock(blk).
setBlockToken(accessToken).
setStartOffset(offsetIntoBlock).
setVerifyChecksum(verifyChecksum).
setClientName(dfsClient.clientName).
setLength(blk.getNumBytes() - offsetIntoBlock).
setCachingStrategy(cachingStrategy).
setAllowShortCircuitLocalReads(!shortCircuitForbidden()).
setClientCacheContext(dfsClient.getClientContext()).
setUserGroupInformation(dfsClient.ugi).
setConfiguration(dfsClient.getConfiguration()).
build();
if(connectFailedOnce) {
DFSClient.LOG.info("Successfully connected to " + targetAddr +
" for " + blk);
}
return chosenNode;
} catch (IOException ex) {
if (ex instanceof InvalidEncryptionKeyException && refetchEncryptionKey > 0) {
//安全相關的異常
DFSClient.LOG.info("Will fetch a new encryption key and retry, "
+ "encryption key was invalid when connecting to " + targetAddr
+ " : " + ex);
// The encryption key used is invalid.
refetchEncryptionKey--;
dfsClient.clearDataEncryptionKey();
} else if (refetchToken > 0 && tokenRefetchNeeded(ex, targetAddr)) {
//安全相關
refetchToken--;
fetchBlockAt(target);
} else {
connectFailedOnce = true;
DFSClient.LOG.warn("Failed to connect to " + targetAddr + " for block"
+ ", add to deadNodes and continue. " + ex, ex);
// Put chosen node into dead list, continue
//BlockReader構造失敗,將chosenNode放入黑名單中
addToDeadNodes(chosenNode);
}
}
}
}
一個HDFS檔案會被切分成多個數據塊,這些資料塊分散在HDFS叢集的Datanode上。當我們讀取檔案時,也就是按照順序讀取資料塊時,如果讀操作完成了一個數據塊的讀取,就需要構造讀取下一個資料塊的輸入流,這時就需要呼叫blockSeekTo()方法獲取儲存下一個資料塊的Datanode。
blockSeekTo()方法會先呼叫getBlockAt()方法獲取遊標(DFSInputStream.pos欄位儲存)所在資料塊的資訊,然後呼叫chooseDataNode()方法獲取一個儲存了該資料塊的Datanode。接下來會構造從這個節點讀取資料塊的BlockReader物件,構造的BlockReader物件會被儲存在DFSInputStream.blockReader欄位中。這裡需要注意,構造BlockReader時使用了BlockReaderFactory這個工廠類,後面會講到BlockReader類的實現。
接下來分析getBlockAt()->chooseDataNode()->blockReader。也就是獲取資料塊->獲取資料塊對應的資料節點->獲取BlockReader物件的過程。
<1> getBlockAt()
該方法用於獲取檔案pos遊標所在資料塊的位置資訊,也就是獲取該資料塊對應的LocatedBlock物件。LocatedBlock物件儲存了所有儲存該資料塊的Datanode資訊,這些資訊會按照距離客戶端的遠近排序,同時LocatedBlock還儲存了當前資料塊是否被快取等資訊。getBlockAt()方法會呼叫ClientProtocol.getBlockLocations()方法從Namenode獲取LocatedBlock物件,並將這個LocatedBlock物件儲存到DFSInputStream.locatedBlocks欄位中。
<2> chooseDataNode()
選擇一個合適的Datanode讀取資料塊,這個方法的邏輯很簡單,由於LocatedBlock物件中已經包含了按照與客戶端距離遠近排序的Datanode列表,所以只需要遍歷這個列表,選出第一個不在Datanode黑名單(DFSInputStream.deadNodes欄位中儲存)中的Datanode即可。
<3> BlockReaderFactory.build()
構造從指定Datanode上讀取資料塊的BlockReader物件,這裡使用了BlockReaderFactory這個工廠類,BlockReaderFactory.build()方法的實現後面會講到。
(2) readBuffer()
該方法程式碼如下:
/* This is a used by regular read() and handles ChecksumExceptions.
* name readBuffer() is chosen to imply similarity to readBuffer() in
* ChecksumFileSystem
*/
private synchronized int readBuffer(ReaderStrategy reader, int off, int len,
Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap)
throws IOException {
IOException ioe;
/* we retry current node only once. So this is set to true only here.
* Intention is to handle one common case of an error that is not a
* failure on datanode or client : when DataNode closes the connection
* since client is idle. If there are other cases of "non-errors" then
* then a datanode might be retried by setting this to true again.
*/
boolean retryCurrentNode = true;
while (true) {
// retry as many times as seekToNewSource allows.
try {
//讀取資料
return reader.doRead(blockReader, off, len, readStatistics);
} catch ( ChecksumException ce ) {
DFSClient.LOG.warn("Found Checksum error for "
+ getCurrentBlock() + " from " + currentNode
+ " at " + ce.getPos());
//出現校驗異常時,表明currentNode上的資料塊出現了錯誤
ioe = ce;
retryCurrentNode = false;
// we want to remember which block replicas we have tried
//將損壞的資料塊加入corruptedBlockMap中,並向Namenode彙報
addIntoCorruptedBlockMap(getCurrentBlock(), currentNode,
corruptedBlockMap);
} catch ( IOException e ) {
if (!retryCurrentNode) {
DFSClient.LOG.warn("Exception while reading from "
+ getCurrentBlock() + " of " + src + " from "
+ currentNode, e);
}
ioe = e;
}
boolean sourceFound = false;
if (retryCurrentNode) {
/* possibly retry the same node so that transient errors don't
* result in application level failures (e.g. Datanode could have
* closed the connection because the client is idle for too long).
*/
//重試當前節點
sourceFound = seekToBlockSource(pos);
} else {
//當Datanode重試失敗,則將當前節點加入黑名單中,然後重新選擇一個Datanode讀取資料
addToDeadNodes(currentNode);
sourceFound = seekToNewSource(pos);
}
if (!sourceFound) {
throw ioe;
}
retryCurrentNode = false;
}
}
readBuffer()的讀入操作主要是通過委託BlockReader物件實現的,並在發生異常時進行重試。當讀取出現校驗異常時,表明currentNode上的資料塊出現了錯誤,這時readBuffer()方法會將錯誤的資料塊新增到corruptedBlockMap中,並通過reportCheckSumFailure()方法向Namenode彙報錯誤的資料塊。如果是普通的IO異常,則有可能是客戶端與資料節點之間的連線關閉了,那麼readBuffer()方法會在當前節點上呼叫seekToBlockSource()重試。如果重試失敗,則呼叫seekToNewSource()選擇新的Datanode,並將當前Datanode加入黑名單中,SeekToBlockSource()和seekToNewSource()方法都呼叫了上面介紹的blockSeekTo()方法。
由於篇幅原因,後續內容放在下一篇文章。