hadoop2.6.0原始碼剖析-客戶端(第二部分--讀(read)HDFS檔案)
上篇文章我們分析了open函式,這個函式會獲取要開啟檔案的塊資訊,接下來我們開始分析讀檔案部分的程式碼。
我們先來看一個示例,程式碼如下:
package com.hadoop.senior.hdfs; import java.io.File; import java.io.FileInputStream; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IOUtils; public class HDFSApp { public static FileSystem getFileSystem() throws IOException{ //core-site.xml hdfs-site.xml log4j.properties Configuration configuration=new Configuration(); //FileSystem FileSystem filesystem= FileSystem.get(configuration); return filesystem; } //讀取hdfs檔案系統上的檔案,在視窗打印出來 public static void readFile(String filename) throws IOException{ FileSystem filesystem= getFileSystem(); //read path Path path = new Path(filename); //dfInputStream FSDataInputStream inStream= filesystem.open(path); try{ IOUtils.copyBytes(inStream, System.out, 4, false); }catch(Exception e){ e.printStackTrace(); }finally{ IOUtils.closeStream(inStream); } } //將本地檔案上傳到hdfs檔案系統上 public static void writeFile(String filename) throws IOException{ FileSystem filesystem=getFileSystem(); Path path= new Path(filename); FSDataOutputStream outStream=filesystem.create(path); FileInputStream inStream=new FileInputStream(new File("/opt/modules/hadoop-2.5.0-cdh5.3.6/input.txt")); try{ IOUtils.copyBytes(inStream, outStream, 1024, false); }catch(Exception e){ e.printStackTrace(); }finally{ IOUtils.closeStream(inStream); IOUtils.closeStream(outStream); } } public static void main(String[] args) throws IOException { //read local file not use core-site.xml hdfs-site.xml //String filename="/opt/modules/workspace/senior/pom.xml"; //read hdfs input.txt String filename ="/usr/css/mapreduce/wordcount/input/input.txt"; String filename2="/usr/css/mapreduce/wordcount/input/input02.txt"; readFile(filename); writeFile(filename2); } }
上面這段程式碼有讀和寫兩個功能,我們先來分析讀,首先會先呼叫FileSystem類的open函式開啟檔案,這個open函式的分析見hadoop2.6.0原始碼剖析-客戶端(第二部分--讀(open)HDFS檔案),然後執行程式碼
IOUtils.copyBytes(inStream, System.out, 4, false);
這行程式碼用來將檔案內容從inStream中拷貝到System.out中,我們進入這個函式,程式碼如下:
/** * Copies from one stream to another. * * @param in InputStrem to read from * @param out OutputStream to write to * @param buffSize the size of the buffer * @param close whether or not close the InputStream and * OutputStream at the end. The streams are closed in the finally clause. */ public static void copyBytes(InputStream in, OutputStream out, int buffSize, boolean close) throws IOException { try { copyBytes(in, out, buffSize); if(close) { out.close(); out = null; in.close(); in = null; } } finally { if(close) { closeStream(out); closeStream(in); } } }
我們看看copyBytes函式,程式碼如下:
/** * Copies from one stream to another. * * @param in InputStrem to read from * @param out OutputStream to write to * @param buffSize the size of the buffer */ public static void copyBytes(InputStream in, OutputStream out, int buffSize) throws IOException { PrintStream ps = out instanceof PrintStream ? (PrintStream)out : null; byte buf[] = new byte[buffSize]; int bytesRead = in.read(buf); while (bytesRead >= 0) { out.write(buf, 0, bytesRead); if ((ps != null) && ps.checkError()) { throw new IOException("Unable to write to output stream."); } bytesRead = in.read(buf); } }
DFSInputStream 繼承 FSInputStream
FSInputStream 繼承 InputStream
DFSOutputStream 繼承 FSOutputSummer
FSOutputSummer 繼承 OutputStream
這裡由於in實際型別是DFSInpuStream,所以in.read(buf)會呼叫DFSInpuStream中的read函式。該函式程式碼如下:
/**
* Read the entire buffer.
*/
@Override
public synchronized int read(final byte buf[], int off, int len) throws IOException {
ReaderStrategy byteArrayReader = new ByteArrayStrategy(buf);
return readWithStrategy(byteArrayReader, off, len);
}
其中ByteArrayStrategy類程式碼如下:
/**
* Used to read bytes into a byte[]
*/
private static class ByteArrayStrategy implements ReaderStrategy {
final byte[] buf;
public ByteArrayStrategy(byte[] buf) {
this.buf = buf;
}
@Override
public int doRead(BlockReader blockReader, int off, int len,
ReadStatistics readStatistics) throws ChecksumException, IOException {
int nRead = blockReader.read(buf, off, len);
updateReadStatistics(readStatistics, nRead, blockReader);
return nRead;
}
}
在read函式中會呼叫readWithStrategy函式,該函式程式碼如下:
private int readWithStrategy(ReaderStrategy strategy, int off, int len) throws IOException {
//用來判斷客戶端連線是否斷開了,如果斷開了那麼就丟擲異常
dfsClient.checkOpen();
if (closed) {
throw new IOException("Stream closed");
}
Map<ExtendedBlock,Set<DatanodeInfo>> corruptedBlockMap
= new HashMap<ExtendedBlock, Set<DatanodeInfo>>();
failures = 0;
//如果當前讀取的檔案位置小於檔案長度
if (pos < getFileLength()) {
//重試次數
int retries = 2;
while (retries > 0) {
try {
// currentNode can be left as null if previous read had a checksum
// error on the same block. See HDFS-3067
//如果當前讀取的檔案位置超過了塊最大位置或者當前node為null,那麼需要更新當前的node,
//也就是說此時要獲取的資料不在當前的塊上,而是另外一個塊,所以需要找到需要的那個node
if (pos > blockEnd || currentNode == null) {
currentNode = blockSeekTo(pos);
}
int realLen = (int) Math.min(len, (blockEnd - pos + 1L));
if (locatedBlocks.isLastBlockComplete()) {
realLen = (int) Math.min(realLen, locatedBlocks.getFileLength());
}
int result = readBuffer(strategy, off, realLen, corruptedBlockMap);
if (result >= 0) {
pos += result;
} else {
// got a EOS from reader though we expect more data on it.
throw new IOException("Unexpected EOS from the reader");
}
if (dfsClient.stats != null) {
dfsClient.stats.incrementBytesRead(result);
}
return result;
} catch (ChecksumException ce) {
throw ce;
} catch (IOException e) {
if (retries == 1) {
DFSClient.LOG.warn("DFS Read", e);
}
blockEnd = -1;
if (currentNode != null) { addToDeadNodes(currentNode); }
if (--retries == 0) {
throw e;
}
} finally {
// Check if need to report block replicas corruption either read
// was successful or ChecksumException occured.
reportCheckSumFailure(corruptedBlockMap,
currentLocatedBlock.getLocations().length);
}
}
}
return -1;
}
當如果當前讀取的檔案位置超過了塊最大位置或者當前node為null,那麼需要更新當前的node,此時會呼叫函式blockSeekTo,我們進入到該函式中,程式碼如下:
/**
* Open a DataInputStream to a DataNode so that it can be read from.
* We get block ID and the IDs of the destinations at startup, from the namenode.
*/
private synchronized DatanodeInfo blockSeekTo(long target) throws IOException {
if (target >= getFileLength()) {
throw new IOException("Attempted to read past end of file");
}
// Will be getting a new BlockReader.
// 由於要獲得一個新的BlockReader,所以這裡需要將該變數置空
if (blockReader != null) {
blockReader.close();
blockReader = null;
}
//
// Connect to best DataNode for desired Block, with potential offset
//
DatanodeInfo chosenNode = null;
int refetchToken = 1; // only need to get a new access token once
int refetchEncryptionKey = 1; // only need to get a new encryption key once
boolean connectFailedOnce = false;
while (true) {
//
// Compute desired block
//
LocatedBlock targetBlock = getBlockAt(target, true);
assert (target==pos) : "Wrong postion " + pos + " expect " + target;
long offsetIntoBlock = target - targetBlock.getStartOffset();
//獲取一個Datanode用來讀取該資料塊,
DNAddrPair retval = chooseDataNode(targetBlock, null);
chosenNode = retval.info;
InetSocketAddress targetAddr = retval.addr;
StorageType storageType = retval.storageType;
try {
ExtendedBlock blk = targetBlock.getBlock();
Token<BlockTokenIdentifier> accessToken = targetBlock.getBlockToken();
blockReader = new BlockReaderFactory(dfsClient.getConf()).
setInetSocketAddress(targetAddr).
setRemotePeerFactory(dfsClient).
setDatanodeInfo(chosenNode).
setStorageType(storageType).
setFileName(src).
setBlock(blk).
setBlockToken(accessToken).
setStartOffset(offsetIntoBlock).
setVerifyChecksum(verifyChecksum).
setClientName(dfsClient.clientName).
setLength(blk.getNumBytes() - offsetIntoBlock).
setCachingStrategy(cachingStrategy).
setAllowShortCircuitLocalReads(!shortCircuitForbidden()).
setClientCacheContext(dfsClient.getClientContext()).
setUserGroupInformation(dfsClient.ugi).
setConfiguration(dfsClient.getConfiguration()).
build();//構造從指定Datanode上讀取資料塊的BlockReader物件,這裡使用了
//BlockReaderFactory工廠類
if(connectFailedOnce) {
DFSClient.LOG.info("Successfully connected to " + targetAddr +
" for " + blk);
}
return chosenNode;
} catch (IOException ex) {
if (ex instanceof InvalidEncryptionKeyException && refetchEncryptionKey > 0) {
DFSClient.LOG.info("Will fetch a new encryption key and retry, "
+ "encryption key was invalid when connecting to " + targetAddr
+ " : " + ex);
// The encryption key used is invalid.
refetchEncryptionKey--;
dfsClient.clearDataEncryptionKey();
} else if (refetchToken > 0 && tokenRefetchNeeded(ex, targetAddr)) {
refetchToken--;
fetchBlockAt(target);
} else {
connectFailedOnce = true;
DFSClient.LOG.warn("Failed to connect to " + targetAddr + " for block"
+ ", add to deadNodes and continue. " + ex, ex);
// Put chosen node into dead list, continue
addToDeadNodes(chosenNode);
}
}
}
}
函式blockSeekTo中呼叫getBlockAt用來獲取指定位置所在的塊,getBlockAt函式程式碼如下:
/**
* Get block at the specified position.
* Fetch it from the namenode if not cached.
*
* @param offset block corresponding to this offset in file is returned
* @param updatePosition whether to update current position
* @return located block
* @throws IOException
*/
private synchronized LocatedBlock getBlockAt(long offset,
boolean updatePosition) throws IOException {
assert (locatedBlocks != null) : "locatedBlocks is null";
final LocatedBlock blk;
//check offset
//資料偏移量不能小於0或者不能大於等於當前檔案的大小,否則丟擲異常
if (offset < 0 || offset >= getFileLength()) {
throw new IOException("offset < 0 || offset >= getFileLength(), offset="
+ offset
+ ", updatePosition=" + updatePosition
+ ", locatedBlocks=" + locatedBlocks);
}//如果偏移量大於等於當前檔案大小
else if (offset >= locatedBlocks.getFileLength()) {
// offset to the portion of the last block,
// which is not known to the name-node yet;
// getting the last block
//獲得最新的塊資料
blk = locatedBlocks.getLastLocatedBlock();
}
else {
//在當前偏移量大於等於0且小於當前檔案大小時,開始從快取中讀取響應塊資料
// search cached blocks first
int targetBlockIdx = locatedBlocks.findBlock(offset);
if (targetBlockIdx < 0) { // block is not cached
//此時說明要找的偏移量資料不在指定的塊中
targetBlockIdx = LocatedBlocks.getInsertIndex(targetBlockIdx);
// fetch more blocks
//從遠端伺服器上去獲取指定偏移量的塊資料
final LocatedBlocks newBlocks = dfsClient.getLocatedBlocks(src, offset);
assert (newBlocks != null) : "Could not find target position " + offset;
//將新獲取到的塊資料替換對應的老資料,對應規則下面會講到
locatedBlocks.insertRange(targetBlockIdx, newBlocks.getLocatedBlocks());
}
//返回相應的塊
blk = locatedBlocks.get(targetBlockIdx);
}
// update current position
if (updatePosition) {
pos = offset;
blockEnd = blk.getStartOffset() + blk.getBlockSize() - 1;
currentLocatedBlock = blk;
}
return blk;
}
我們進入到findBlock函式中,該函式用來獲取指定偏移量所在塊對應的索引號,在快取中查詢塊索引值,程式碼如下:
/**
* Find block containing specified offset.
*
* @return block if found, or null otherwise.
*/
public int findBlock(long offset) {
// create fake block of size 0 as a key
LocatedBlock key = new LocatedBlock(
new ExtendedBlock(), new DatanodeInfo[0], 0L, false);
key.setStartOffset(offset);
key.getBlock().setNumBytes(1);
Comparator<LocatedBlock> comp =
new Comparator<LocatedBlock>() {
// Returns 0 iff a is inside b or b is inside a
@Override
public int compare(LocatedBlock a, LocatedBlock b) {
long aBeg = a.getStartOffset();
long bBeg = b.getStartOffset();
long aEnd = aBeg + a.getBlockSize();
long bEnd = bBeg + b.getBlockSize();
if(aBeg <= bBeg && bEnd <= aEnd
|| bBeg <= aBeg && aEnd <= bEnd)
return 0; // one of the blocks is inside the other
if(aBeg < bBeg)
return -1; // a's left bound is to the left of the b's
return 1;
}
};
return Collections.binarySearch(blocks, key, comp);
}
findBlock函式:
返回0表示一個塊在另外一個塊的裡面,
返回-1表示要找到偏移量不在快取中的塊中
返回1表示其他情況
我們現在回到getBlockAt函式中,繼續往下,我們進入到函式insertRange中,程式碼如下:
public void insertRange(int blockIdx, List<LocatedBlock> newBlocks) {
int oldIdx = blockIdx;
int insStart = 0, insEnd = 0;
for(int newIdx = 0; newIdx < newBlocks.size() && oldIdx < blocks.size();
newIdx++) {
long newOff = newBlocks.get(newIdx).getStartOffset();
long oldOff = blocks.get(oldIdx).getStartOffset();
if(newOff < oldOff) {
insEnd++;
} else if(newOff == oldOff) {
// replace old cached block by the new one
blocks.set(oldIdx, newBlocks.get(newIdx));
if(insStart < insEnd) { // insert new blocks
blocks.addAll(oldIdx, newBlocks.subList(insStart, insEnd));
oldIdx += insEnd - insStart;
}
insStart = insEnd = newIdx+1;
oldIdx++;
} else { // newOff > oldOff
assert false : "List of LocatedBlock must be sorted by startOffset";
}
}
insEnd = newBlocks.size();
if(insStart < insEnd) { // insert new blocks
blocks.addAll(oldIdx, newBlocks.subList(insStart, insEnd));
}
}
這個函式會將新的塊替換老的塊。我們回到getBlockAt函式中,獲取到塊後,會將相應的引數進行更新並返回塊資訊,當前的塊讀取偏移量,當前的塊,當前塊的最大偏移量。
至此整個讀的過程就結束了!文章很多函式都沒有深入講解,後面會做相應的補充。