Hadoop HDFS原始碼學習筆記(四)
阿新 • • 發佈:2019-01-07
一、首先根據HDFS的API寫一段程式,然後是用Eclipse進行debug 單步跟蹤,從而檢視原始碼執行流程:
import java.net.URI; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IOUtils; public class testRead1 { public static void main(String[] args) throws Exception{ //String uri = args[0]; String uri = "hdfs://localhost:9000/user/mmliu/test01.txt"; Configuration conf = new Configuration(); FileSystem fs = FileSystem.get(URI.create(uri), conf); FSDataInputStream in = null; try { in = fs.open(new Path(uri)); IOUtils.copyBytes(in, System.out, 4096, false); in.seek(0); IOUtils.copyBytes(in, System.out, 4096, false); } finally { IOUtils.closeStream(in); } } }
先給出整體的序列圖以便後邊分析:
1、單步追蹤client端的程式碼:
可以看到一開始當get一個FileSystem類的例項的時候,最終會根據我們提供的uri和conf 建立了一個DistributedFileSystem的例項物件,並完成了初始化的過程,初始化函式主要是完成了RPCNamenode和namenode的建立,而這兩個變數都是ClientProtocol型別的,ClientProtocol本質上是一個介面,最終完成的是RPC操作。
private static ClientProtocol createRPCNamenode(InetSocketAddress nameNodeAddr, Configuration conf, UserGroupInformation ugi) throws IOException { return (ClientProtocol)RPC.getProxy(ClientProtocol.class, ClientProtocol.versionID, nameNodeAddr, ugi, conf, NetUtils.getSocketFactory(conf, ClientProtocol.class)); }
Open階段,HDFS呼叫了DistributedFileSystem的open函式:
public FSDataInputStream open(Path f, int bufferSize) throws IOException {
statistics.incrementReadOps(1);
return new DFSClient.DFSDataInputStream(
dfs.open(getPathName(f), bufferSize, verifyChecksum, statistics));
}
其中的dfs是一個DFSClient型別的成員函式,在open函式中建立了一個DFSInputStream類物件,並返回,在DFSInputStream類的建構函式中,其中有一個非常關鍵的函式被呼叫openInfo,一開始的時候我忽略了這個函式,結果導致到後來read過程中,不知道block資訊從哪裡來的,所以這裡非常關鍵,其實從DFSInputStream類的註釋中可以看到它的作用:
/**************************************************************** * DFSInputStream provides bytes from a named file. It handles * negotiation of the namenode and various datanodes as necessary. ****************************************************************/ public class DFSInputStream extends FSInputStream {
openInfo函式主要與namenode通訊,從namenode上預取file的block資訊,其中perfetchSize可以設定預取的block的塊數,預設為10.
private long prefetchSize = 10 * defaultBlockSize;
/**
* Grab the open-file info from namenode
*/
synchronized void openInfo() throws IOException {
LocatedBlocks newInfo = callGetBlockLocations(namenode, src, 0, prefetchSize);
if (newInfo == null) {
throw new FileNotFoundException("File does not exist: " + src);
}
// I think this check is not correct. A file could have been appended to
// between two calls to openInfo().
if (locatedBlocks != null && !locatedBlocks.isUnderConstruction() &&
!newInfo.isUnderConstruction()) {
Iterator<LocatedBlock> oldIter = locatedBlocks.getLocatedBlocks().iterator();
Iterator<LocatedBlock> newIter = newInfo.getLocatedBlocks().iterator();
while (oldIter.hasNext() && newIter.hasNext()) {
if (! oldIter.next().getBlock().equals(newIter.next().getBlock())) {
throw new IOException("Blocklist for " + src + " has changed!");
}
}
}
updateBlockInfo(newInfo);
this.locatedBlocks = newInfo;
this.currentNode = null;
}
private static LocatedBlocks callGetBlockLocations(ClientProtocol namenode,
String src, long start, long length) throws IOException {
try {
return namenode.getBlockLocations(src, start, length);
} catch(RemoteException re) {
throw re.unwrapRemoteException(AccessControlException.class,
FileNotFoundException.class);
}
}
LocatedBlocks主要包含檔案長度資訊long fileLength和一個連結串列的List<LocatedBlock>
blocks,其中每個LocatedBlock包含如下資訊:
private Block b;
private long offset; // offset of the first byte of the block in the file
private DatanodeInfo[] locs;
b 儲存block相關的資訊,offset 指向的是該block 在檔案中的偏移量,locs記錄了儲存該block的datanode的資訊
根據在get階段的追蹤記錄我們可以得知namenode.getBlockLocations函式呼叫是一個RPC呼叫,最終呼叫到namenode的getBlockLocations函式上,接下來我們就開始單步除錯namenode
2、NameNode端getBlockLocations函式分析
NameNode類的getBlocksLocation函式實現為:
/** {@inheritDoc} */
public LocatedBlocks getBlockLocations(String src,
long offset,
long length) throws IOException {
myMetrics.incrNumGetBlockLocations();
return namesystem.getBlockLocations(getClientMachine(),
src, offset, length);
}
其中namesystem在為一個FSNameSystem型別的成員變數,儲存的是NameNode的name space樹,其中一個重要的成員變數為FSDirectory dir。FSDirectory主要包括FSImage fsImage,用於讀寫硬碟上的fsimage檔案,FSImage類有成員變數FSEditLog editLog,用於讀寫硬碟上的edit檔案,這兩個檔案的關係和作用已經在之前的文章中提到過了,並且也解釋了INode, INodeFile, 以及INodeDeirectory等之間的關係。
/**
* Get block locations within the specified range.
*
* @see #getBlockLocations(String, long, long)
*/
LocatedBlocks getBlockLocations(String clientMachine, String src,
long offset, long length) throws IOException {
LocatedBlocks blocks = getBlockLocations(src, offset, length, true, true);
if (blocks != null) {
//sort the blocks
DatanodeDescriptor client = host2DataNodeMap.getDatanodeByHost(
clientMachine); // 根據client的主機IP等找到一個在該主機上執行的DataNode(如果存在的話)
for (LocatedBlock b : blocks.getLocatedBlocks()) {
clusterMap.pseudoSortByDistance(client, b.getLocations()); // 根據網路拓撲結構,將每個block的datanode按照距離client最近的標準進行排序
}
}
return blocks;
}
/**
* Get block locations within the specified range.
* @see ClientProtocol#getBlockLocations(String, long, long)
*/
public LocatedBlocks getBlockLocations(String src, long offset, long length,
boolean doAccessTime, boolean needBlockToken) throws IOException {
if (isPermissionEnabled) {
checkPathAccess(src, FsAction.READ);
}
if (offset < 0) {
throw new IOException("Negative offset is not supported. File: " + src );
}
if (length < 0) {
throw new IOException("Negative length is not supported. File: " + src );
}
final LocatedBlocks ret = getBlockLocationsInternal(src,
offset, length, Integer.MAX_VALUE, doAccessTime, needBlockToken); // 實際工作者
if (auditLog.isInfoEnabled() && isExternalInvocation()) {
logAuditEvent(UserGroupInformation.getCurrentUser(),
Server.getRemoteIp(),
"open", src, null, null);
}
return ret;
}
可以看到最終是呼叫的getBlockLocationsInternal函式完成的LocatedBlocks的構建過程:
private synchronized LocatedBlocks getBlockLocationsInternal(String src,
long offset,
long length,
int nrBlocksToReturn,
boolean doAccessTime,
boolean needBlockToken)
throws IOException {
INodeFile inode = dir.getFileINode(src); // 通過路徑名從檔案系統樹中找到要開啟的檔案的INodeFile,其中儲存了該檔案的INode資訊
if(inode == null) {
return null;
}
if (doAccessTime && isAccessTimeSupported()) {
dir.setTimes(src, inode, -1, now(), false);
}
Block[] blocks = inode.getBlocks();
if (blocks == null) {
return null;
}
if (blocks.length == 0) {
return inode.createLocatedBlocks(new ArrayList<LocatedBlock>(blocks.length));
}
List<LocatedBlock> results;
results = new ArrayList<LocatedBlock>(blocks.length);
int curBlk = 0;
long curPos = 0, blkSize = 0;
int nrBlocks = (blocks[0].getNumBytes() == 0) ? 0 : blocks.length;
for (curBlk = 0; curBlk < nrBlocks; curBlk++) { // 這個迴圈的目的是找到offset所在的block的位置
blkSize = blocks[curBlk].getNumBytes(); // curBlk就指向了offset所在的block
assert blkSize > 0 : "Block of size 0";
if (curPos + blkSize > offset) {
break;
}
curPos += blkSize;
}
if (nrBlocks > 0 && curBlk == nrBlocks) // offset >= end of file
return null;
long endOff = offset + length; // 找到使用者請求的length長度的data在檔案中的結束位置
do { // 該迴圈將涉及到從offset開始到endOff結束這段長度的data的block資訊收集起來,然後組合成LocatedBlock
// get block locations
int numNodes = blocksMap.numNodes(blocks[curBlk]);
int numCorruptNodes = countNodes(blocks[curBlk]).corruptReplicas();
int numCorruptReplicas = corruptReplicas.numCorruptReplicas(blocks[curBlk]);
if (numCorruptNodes != numCorruptReplicas) {
LOG.warn("Inconsistent number of corrupt replicas for " +
blocks[curBlk] + "blockMap has " + numCorruptNodes +
" but corrupt replicas map has " + numCorruptReplicas);
}
DatanodeDescriptor[] machineSet = null;
boolean blockCorrupt = false;
if (inode.isUnderConstruction() && curBlk == blocks.length - 1
&& blocksMap.numNodes(blocks[curBlk]) == 0) {
// get unfinished block locations
INodeFileUnderConstruction cons = (INodeFileUnderConstruction)inode;
machineSet = cons.getTargets();
blockCorrupt = false;
} else {
blockCorrupt = (numCorruptNodes == numNodes);
int numMachineSet = blockCorrupt ? numNodes :
(numNodes - numCorruptNodes);
machineSet = new DatanodeDescriptor[numMachineSet]; // 找到block對應的machineSet資訊並將為損壞的放入集合中
if (numMachineSet > 0) {
numNodes = 0;
for(Iterator<DatanodeDescriptor> it =
blocksMap.nodeIterator(blocks[curBlk]); it.hasNext();) {
DatanodeDescriptor dn = it.next();
boolean replicaCorrupt = corruptReplicas.isReplicaCorrupt(blocks[curBlk], dn);
if (blockCorrupt || (!blockCorrupt && !replicaCorrupt))
machineSet[numNodes++] = dn;
}
}
}
LocatedBlock b = new LocatedBlock(blocks[curBlk], machineSet, curPos,
blockCorrupt);
if(isAccessTokenEnabled && needBlockToken) {
b.setBlockToken(accessTokenHandler.generateToken(b.getBlock(),
EnumSet.of(BlockTokenSecretManager.AccessMode.READ)));
}
results.add(b); // 將machineSet以及block構造的LocatedBlock放入到連結串列中
curPos += blocks[curBlk].getNumBytes();
curBlk++;
} while (curPos < endOff
&& curBlk < blocks.length
&& results.size() < nrBlocksToReturn);
return inode.createLocatedBlocks(results); // 使用LocatedBlock連結串列構造一個LocatedBlocks物件然後返回
}
通過RPC呼叫,在NameNode得到的LocatedBlocks物件,作為成員變數構造DFSInputStream物件,最後包裝為FSDataInputStream返回給使用者