_00005 Hadoop DataNode原始碼淺析(RPC是基礎)
個性簽名:世界上最遙遠的距離不是天涯,也不是海角,而是我站在妳的面前,妳卻感覺不到我的存在
技術方向:Flume+Kafka+Storm+Redis/Hbase+Hadoop+Hive+Mahout+Spark ... 雲端計算技術
轉載宣告:
qq交流群:
# DataNode原始碼分析
# datanode註釋翻譯
/**********************************************************
* DataNode is a class (and program)that stores a set of
* blocks for a DFS deployment. A single deployment can
* have one or many DataNodes. Each DataNode communicates
* regularly with a singleNameNode. It also communicates
* with client code and otherDataNodes from time to time.
*
* DataNodes store a series of namedblocks. The DataNode
* allows client code to read theseblocks, or to write new
* block data. The DataNode may also, in response toinstructions
* from its NameNode, delete blocksor copy blocks to/from other
* DataNodes.
*
* The DataNode maintains just onecritical table:
* block-> stream of bytes (of BLOCK_SIZE or less)
*
* This info is stored on a localdisk. The DataNode
* reports the table's contents tothe NameNode upon startup
* and every so often afterwards.
*
* DataNodes spend their lives in anendless loop of asking
* the NameNode for something todo. A NameNode cannot connect
* to a DataNode directly; aNameNode simply returns values from
* functions invoked by a DataNode.
*
* DataNodes maintain an open serversocket so that client code
* or other DataNodes can read/writedata. The host/port for
* this server is reported to theNameNode, which then sends that
* information to clients or otherDataNodes that might be interested.
*
**********************************************************/
# 首先看datanode結構,實現了Runnable介面(run方法)。
public class DataNode extendsConfigured
implements InterDatanodeProtocol,ClientDatanodeProtocol, FSConstants, Runnable, DataNodeMXBean {
# 找到main方法,進入
public static void main(String args[]) {
secureMain(args, null);
}
# 進入scureMain
public static void secureMain(String [] args, SecureResourcesresources) {
try {
StringUtils.startupShutdownMessage(DataNode.class, args,LOG);
DataNode datanode = createDataNode(args,null, resources);
if (datanode !=null)
datanode.join();
建立datanode跟呼叫datanode.join()(Java Thread中, join()方法主要是讓呼叫改方法的thread完成run方法裡面的東西后,在執行join()方法後面的程式碼。)
# 進入createDataNode
/** Instantiate & Start a single datanode daemon and wait for it tofinish.
* If this thread is specifically interrupted, it will stop waiting.
* LimitedPrivate for creating secure datanodes
*/
public static DataNode createDataNode(Stringargs[],
Configuration conf,SecureResources resources) throws IOException {
DataNode dn = instantiateDataNode(args,conf, resources);
runDatanodeDaemon(dn);
return dn;
}
註釋說:例項化和開始一個datanode守護程序(runDatanodeDaemon(dn)),等待它完成。如果專門打斷這個執行緒,它將停止等待。建立安全datanodes LimitedPrivate。
# startDataNode方法,一直跟進去(省略中間的程式碼)最後進入到這麼一個主要的方法,該方法程式碼很多
void startDataNode(Configuration conf,
AbstractList<File> dataDirs, SecureResources resources
) throws IOException {
// connect to name node
this.namenode = (DatanodeProtocol)
RPC.waitForProxy(DatanodeProtocol.class,
DatanodeProtocol.versionID,
nameNodeAddr,
conf);
在datanode中起了一個RPC的客戶端,得到一個服務端的代理物件,這裡被強轉為DatanodeProtocol,實際上就是NameNode這個類,因為NameNode類實現了DatanodeProtocol介面,然後就可以呼叫NameNode裡面的方法了
第二個重要的地方:
this.threadGroup =new ThreadGroup("dataXceiverServer");
this.dataXceiverServer =new Daemon(threadGroup,
new DataXceiverServer(ss, conf,this));
下面我們可以來開始分析DataNode上的動態行為。首先我們來分析DataXceiverServer和DataXceiver。DataNode上資料塊的接受/傳送並沒有採用我們前面介紹的RPC機制,原因很簡單,RPC是一個命令式的介面,而DataNode處理資料部分,往往是一種流式機制。DataXceiverServer和DataXceiver就是這個機制的實現。其中,DataXceiver還依賴於兩個輔助類:BlockSender和BlockReceiver。 DataXceiverServer很簡單,它開啟一個埠,然後每接收到一個連線,就建立一個DataXceiver,服務於該連線,DataXceiver是一個執行緒讀一次操作請求進行操作之後就返回,並記錄該連線的socket,對應的實現在DataXceiverServer的run方法裡。當系統關閉時,DataXceiverServer將關閉監聽的socket和所有DataXceiver的socket,這樣就導致了DataXceiver出錯並結束執行緒。DataXceiverServer接受到的資料主要有操作碼+操作資料+使用者名稱。 (1)BlockSender用來發送block資料,返回給使用者的是:成功與否+校驗型別+實際offset(因為校驗塊的原因和使用者請求的offset不一致)。BlockSender有配置引數corruptChecksumOk(校驗資料讀入出錯忽略,出錯用零填充),chunkOffsetOK(是否要告知實際的offset,如上所述),verifyChecksum(是否要求在把校驗資料和實際資料讀入包快取中時校驗資料,也就是在傳送之前),向客戶端傳包的時候第一、二個引數為true,第三為false,為的是儘快傳送資料。而用來校驗已有資料時使用第一二引數為false,第三引數為true,為了及時發現錯誤資料。readBlock完成實際讀資料的操作,比較簡單。sendChunks方法中,對於客戶端傳包的包只有校驗和而實際資料通過管道傳輸,具體見函式。
第三個重要的地方:
//create a servlet to serve full-file content
InetSocketAddress infoSocAddr = DataNode.getInfoAddr(conf);
String infoHost =infoSocAddr.getHostName();
int tmpInfoPort = infoSocAddr.getPort();
this.infoServer = (secureResources ==null)
? new HttpServer("datanode", infoHost, tmpInfoPort, tmpInfoPort == 0,
conf, SecurityUtil.getAdminAcls(conf,DFSConfigKeys.DFS_ADMIN))
: new HttpServer("datanode", infoHost, tmpInfoPort, tmpInfoPort == 0,
conf, SecurityUtil.getAdminAcls(conf,DFSConfigKeys.DFS_ADMIN),
secureResources.getListener());
這裡new了一個infoServer,new HttpServer裡面是一個jetty的server,就是為了向用戶提供web介面的,然後再後面再啟動infoServer。
第四個重要的地方:
//init ipc server
InetSocketAddress ipcAddr =NetUtils.createSocketAddr(
conf.get("dfs.datanode.ipc.address"));
ipcServer = RPC.getServer(this,ipcAddr.getHostName(), ipcAddr.getPort(),
conf.getInt("dfs.datanode.handler.count", 3),false, conf,
blockTokenSecretManager);
這裡datanode中起了一個RPC的服務端(暫時不知給誰呼叫的)
# 接下來就會呼叫dtanode.join()方法,datanode的run方法
/**
* No matter what kind ofexception we get, keep retrying to offerService().
* That's the loop that connectsto the NameNode and provides basic DataNode
* functionality.
*
* Only stop when"shouldRun" is turned off (which can only happen at shutdown).
*/
public void run() {
LOG.info(dnRegistration +"InDataNode.run, data = " +data);
// start dataXceiveServer
dataXceiverServer.start();
ipcServer.start();
while (shouldRun) {
try {
startDistributedUpgradeIfNeeded();
offerService();
} catch (Exception ex) {
LOG.error("Exception: " + StringUtils.stringifyException(ex));
if (shouldRun) {
try {
Thread.sleep(5000);
} catch (InterruptedExceptionie) {
}
}
}
}
LOG.info(dnRegistration +":Finishing DataNode in: "+data);
shutdown();
}
# 我們主要看offerService();這個方法
/**
* Main loop for theDataNode. Runs until shutdown,
* forever calling remote NameNodefunctions.
*/
public void offerService() throws Exception {
裡面有一個死迴圈,主要是呼叫了一個方法sendHeartbeat
DatanodeCommand[] cmds = namenode.sendHeartbeat(dnRegistration,
data.getCapacity(),
data.getDfsUsed(),
data.getRemaining(),
xmitsInProgress.get(),
getXceiverCount());
myMetrics.addHeartBeat(now() - startTime);
//LOG.info("Just sent heartbeat, with name " + localName);
if(!processCommand(cmds))
continue;
這裡的namenode在上面已經解釋過了,其實就是DataNode,這裡會呼叫DataNode的sendHeartbeat方法,將自己的狀態作為引數(容量,空間使用了多少,剩餘多少等等)傳遞給了namenode。然後namenode呼叫方法得到一些返回值給datanode,datanode處理這些命令,然後再處理這些指令processCommand(cmds),放送心跳基本分析完畢。
# 接下來看傳送心跳以及處理的namenode的指令後datanode還幹了些什麼
DatanodeCommand cmd= namenode.blockReport(dnRegistration,
BlockListAsLongs.convertToArrayLongs(bReport));
看到這裡了,namenode是DatanodeProtocol介面也就是NameNode類。這裡是RPC客戶端的遠端呼叫,datanode會掃描其機器上對應儲存hdfs block的目錄下(dfs.data.dir)所儲存的所有檔案塊,然後通過namenode的rpc呼叫將這些block的資訊以一個long陣列的方式傳送給namenode,namenode在接收到一個datanode的blockReport rpc呼叫後,從rpc中解析出block陣列,並將這些接收到的blocks插入到BlocksMap表中,由於此時BlocksMap缺少的僅僅是每個block對應的datanode資訊,而namenoe能從report中獲知當前report上來的是哪個datanode的塊資訊,所以,blockReport過程實際上就是namenode在接收到塊資訊彙報後,填充BlocksMap中每個block對應的datanodes列表的三元組資訊的過程。其流程如下圖所示:
當所有的datanode彙報完block,namenode針對每個datanode的彙報進行過處理後,namenode的啟動過程到此結束。此時BlocksMap中block->datanodes的對應關係已經初始化完畢。如果此時已經達到安全模式的推出閾值,則hdfs主動退出安全模式,開始提供服務。
// If we have sent the firstblock report, then wait a random
// time before we start the periodic blockreports.
如果我們把第一塊報告發送了之後,在我們開始定期塊報告之前會等待一個隨機時間。
傳送完畢之後NameNode會給DataNode傳送一些指令,然後DataNode會處理這些指令
processCommand(cmd);
基本上也就分析完了
# DataNode啟動過程分析
DataNode一啟動,會對DataNode進行初始化,最終進入到startDataNode方法中,該方法
主要有4個重要的地方。
1:起了一個RPC客戶端,namenode,實現的介面是DataProtocol,主要是DataNode跟NameNode通訊用的,DataNode將自己的一些狀態(容量,使用,未使用等)告訴NameNode,然後NameNode返回DataNode一些指令,告訴DataNode要去做什麼。
2:dataXceiverServer的啟動,是一個執行緒組(流服務)
3:起了一個jetty伺服器,提供web方式的訪問
4:起了一個RPC的服務端,給NameNode呼叫(應該是這樣)
接著會例項化datanode和開始一個datanode守護程序(runDatanodeDaemon(dn)),然後會呼叫datanode的join方法,進入到run方法,這個方法將剛剛startDataNode方法中的RPC服務端開啟,然後呼叫了一個offerService方法,裡面是一個死迴圈,最先開始是datanode RPC遠端呼叫namenode.sendHeartbeat方法,這裡的namenode在上面已經解釋過了,其實就是DataNode,這裡會呼叫DataNode的sendHeartbeat方法,將自己的狀態作為引數(容量,空間使用了多少,剩餘多少等等)傳遞給了namenode。然後namenode呼叫方法得到一些返回值給datanode,datanode處理這些命令,然後再處理這些指令processCommand(cmds)。
接下來datanode會呼叫namenode.blockReport的方法,datanode開始掃描自己目錄下所儲存的所有檔案塊,然後通過namenode的rpc呼叫將這些block資訊以一個long陣列的方式傳送給namenode,namenode在接收到一個datanode的blockReport rpc呼叫後,從rpc中解析出block陣列,並將這些接收到的blocks插入到BlocksMap表中,由於NameNode啟動時BlocksMap缺少的僅僅是每個block對應的datanode資訊,而namenoe能從blocReport中獲知當前blockReport上來的是哪個datanode的塊資訊,所以,blockReport過程實際上就是namenode在接收到塊資訊彙報後,填充BlocksMap中每個block對應的datanodes列表的三元組資訊的過程。當所有的datanode彙報完block,namenode針對每個datanode的彙報進行過處理後,namenode的啟動過程到此結束。此時BlocksMap中block->datanodes的對應關係已經初始化完畢。如果此時已經達到安全模式的退出閾值,則hdfs主動退出安全模式,開始提供服務。
呼叫完namenode.blockReport方法之後 ,namenode會給一些指令給datanode,然後datanode再處理這些指令。
在這個過程中是datanode主動與namenode通訊,然後namenode傳給datanode一些函式的返回值,告訴datanode該做什麼。
DataProtocol中的註釋:
* The only way aNameNode can communicate with a DataNode is by
*returning values from thesefunctions.
The you smile until forever 、、、、、、、、、、、、、、、、、、、、、