1. 程式人生 > >_00005 Hadoop DataNode原始碼淺析(RPC是基礎)

_00005 Hadoop DataNode原始碼淺析(RPC是基礎)

博文作者:妳那伊抹微笑
個性簽名:世界上最遙遠的距離不是天涯,也不是海角,而是我站在妳的面前,妳卻感覺不到我的存在
技術方向:Flume+Kafka+Storm+Redis/Hbase+Hadoop+Hive+Mahout+Spark ... 雲端計算技術
轉載宣告:
qq交流群:


# DataNode原始碼分析

# datanode註釋翻譯

/**********************************************************

 * DataNode is a class (and program)that stores a set of

 * blocks for a DFS deployment.  A single deployment can

 * have one or many DataNodes.  Each DataNode communicates

 * regularly with a singleNameNode.  It also communicates

 * with client code and otherDataNodes from time to time.

 *

 * DataNodes store a series of namedblocks.  The DataNode

 * allows client code to read theseblocks, or to write new

 * block data.  The DataNode may also, in response toinstructions

 * from its NameNode, delete blocksor copy blocks to/from other

 * DataNodes.

 *

 * The DataNode maintains just onecritical table:

 *  block-> stream of bytes (of BLOCK_SIZE or less)

 *

 * This info is stored on a localdisk.  The DataNode

 * reports the table's contents tothe NameNode upon startup

 * and every so often afterwards.

 *

 * DataNodes spend their lives in anendless loop of asking

 * the NameNode for something todo.  A NameNode cannot connect

 * to a DataNode directly; aNameNode simply returns values from

 * functions invoked by a DataNode.

 *

 * DataNodes maintain an open serversocket so that client code

 * or other DataNodes can read/writedata.  The host/port for

 * this server is reported to theNameNode, which then sends that

 * information to clients or otherDataNodes that might be interested.

 *

 **********************************************************/

# 首先看datanode結構,實現了Runnable介面(run方法)。

public class DataNode extendsConfigured

    implements InterDatanodeProtocol,ClientDatanodeProtocol, FSConstants, Runnable, DataNodeMXBean {

# 找到main方法,進入

public static void main(String args[]) {

    secureMain(args, null);

  }

# 進入scureMain

public static void secureMain(String [] args, SecureResourcesresources) {

    try {

      StringUtils.startupShutdownMessage(DataNode.class, args,LOG);

      DataNode datanode = createDataNode(args,null, resources);

      if (datanode !=null)

        datanode.join();

建立datanode跟呼叫datanode.join()(Java Threadjoin()方法主要是讓呼叫改方法的thread完成run方法裡面的東西后,在執行join()方法後面的程式碼。)

# 進入createDataNode

/** Instantiate & Start a single datanode daemon and wait for it tofinish.

   * If this thread is specifically interrupted, it will stop waiting.

   * LimitedPrivate for creating secure datanodes

   */

  public static DataNode createDataNode(Stringargs[],

            Configuration conf,SecureResources resources) throws IOException {

    DataNode dn = instantiateDataNode(args,conf, resources);

    runDatanodeDaemon(dn);

    return dn;

  }

註釋說:例項化和開始一個datanode守護程序(runDatanodeDaemon(dn)),等待它完成。如果專門打斷這個執行緒,它將停止等待。建立安全datanodes LimitedPrivate。

# startDataNode方法,一直跟進去(省略中間的程式碼)最後進入到這麼一個主要的方法,該方法程式碼很多

void startDataNode(Configuration conf,

                    AbstractList<File> dataDirs, SecureResources resources

                     ) throws IOException {

// connect to name node

    this.namenode = (DatanodeProtocol)

      RPC.waitForProxy(DatanodeProtocol.class,

                      DatanodeProtocol.versionID,

                       nameNodeAddr,

                       conf);

在datanode中起了一個RPC的客戶端,得到一個服務端的代理物件,這裡被強轉為DatanodeProtocol,實際上就是NameNode這個類,因為NameNode類實現了DatanodeProtocol介面,然後就可以呼叫NameNode裡面的方法了

第二個重要的地方:

this.threadGroup =new ThreadGroup("dataXceiverServer");

    this.dataXceiverServer =new Daemon(threadGroup,

        new DataXceiverServer(ss, conf,this));

下面我們可以來開始分析DataNode上的動態行為。首先我們來分析DataXceiverServer和DataXceiver。DataNode上資料塊的接受/傳送並沒有採用我們前面介紹的RPC機制,原因很簡單,RPC是一個命令式的介面,而DataNode處理資料部分,往往是一種流式機制。DataXceiverServer和DataXceiver就是這個機制的實現。其中,DataXceiver還依賴於兩個輔助類:BlockSender和BlockReceiver。  DataXceiverServer很簡單,它開啟一個埠,然後每接收到一個連線,就建立一個DataXceiver,服務於該連線,DataXceiver是一個執行緒讀一次操作請求進行操作之後就返回,並記錄該連線的socket,對應的實現在DataXceiverServer的run方法裡。當系統關閉時,DataXceiverServer將關閉監聽的socket和所有DataXceiver的socket,這樣就導致了DataXceiver出錯並結束執行緒。DataXceiverServer接受到的資料主要有操作碼+操作資料+使用者名稱。  (1)BlockSender用來發送block資料,返回給使用者的是:成功與否+校驗型別+實際offset(因為校驗塊的原因和使用者請求的offset不一致)。BlockSender有配置引數corruptChecksumOk(校驗資料讀入出錯忽略,出錯用零填充),chunkOffsetOK(是否要告知實際的offset,如上所述),verifyChecksum(是否要求在把校驗資料和實際資料讀入包快取中時校驗資料,也就是在傳送之前),向客戶端傳包的時候第一、二個引數為true,第三為false,為的是儘快傳送資料。而用來校驗已有資料時使用第一二引數為false,第三引數為true,為了及時發現錯誤資料。readBlock完成實際讀資料的操作,比較簡單。sendChunks方法中,對於客戶端傳包的包只有校驗和而實際資料通過管道傳輸,具體見函式。

第三個重要的地方:

//create a servlet to serve full-file content

    InetSocketAddress infoSocAddr = DataNode.getInfoAddr(conf);

    String infoHost =infoSocAddr.getHostName();

    int tmpInfoPort = infoSocAddr.getPort();

    this.infoServer = (secureResources ==null)

       ? new HttpServer("datanode", infoHost, tmpInfoPort, tmpInfoPort == 0,

           conf, SecurityUtil.getAdminAcls(conf,DFSConfigKeys.DFS_ADMIN))

       : new HttpServer("datanode", infoHost, tmpInfoPort, tmpInfoPort == 0,

           conf, SecurityUtil.getAdminAcls(conf,DFSConfigKeys.DFS_ADMIN),

           secureResources.getListener());

這裡new了一個infoServer,new HttpServer裡面是一個jetty的server,就是為了向用戶提供web介面的,然後再後面再啟動infoServer。

第四個重要的地方:

//init ipc server

    InetSocketAddress ipcAddr =NetUtils.createSocketAddr(

        conf.get("dfs.datanode.ipc.address"));

    ipcServer = RPC.getServer(this,ipcAddr.getHostName(), ipcAddr.getPort(),

        conf.getInt("dfs.datanode.handler.count", 3),false, conf,

        blockTokenSecretManager);

這裡datanode中起了一個RPC的服務端(暫時不知給誰呼叫的)

# 接下來就會呼叫dtanode.join()方法,datanode的run方法

/**

   * No matter what kind ofexception we get, keep retrying to offerService().

   * That's the loop that connectsto the NameNode and provides basic DataNode

   * functionality.

   *

   * Only stop when"shouldRun" is turned off (which can only happen at shutdown).

   */

  public void run() {

LOG.info(dnRegistration +"InDataNode.run, data = " +data);

    // start dataXceiveServer

    dataXceiverServer.start();

    ipcServer.start();

    while (shouldRun) {

      try {

        startDistributedUpgradeIfNeeded();

        offerService();

      } catch (Exception ex) {

        LOG.error("Exception: " + StringUtils.stringifyException(ex));

        if (shouldRun) {

          try {

            Thread.sleep(5000);

          } catch (InterruptedExceptionie) {

          }

        }

      }

    }

    LOG.info(dnRegistration +":Finishing DataNode in: "+data);

    shutdown();

  }

# 我們主要看offerService();這個方法

/**

   * Main loop for theDataNode.  Runs until shutdown,

   * forever calling remote NameNodefunctions.

   */

  public void offerService() throws Exception {

裡面有一個死迴圈,主要是呼叫了一個方法sendHeartbeat

DatanodeCommand[] cmds = namenode.sendHeartbeat(dnRegistration,

                                                      data.getCapacity(),

                                                       data.getDfsUsed(),

                                                      data.getRemaining(),

                                                      xmitsInProgress.get(),

                                                       getXceiverCount());

          myMetrics.addHeartBeat(now() - startTime);

          //LOG.info("Just sent heartbeat, with name " + localName);

          if(!processCommand(cmds))

            continue;

這裡的namenode在上面已經解釋過了,其實就是DataNode,這裡會呼叫DataNode的sendHeartbeat方法,將自己的狀態作為引數(容量,空間使用了多少,剩餘多少等等)傳遞給了namenode。然後namenode呼叫方法得到一些返回值給datanode,datanode處理這些命令,然後再處理這些指令processCommand(cmds),放送心跳基本分析完畢。

# 接下來看傳送心跳以及處理的namenode的指令後datanode還幹了些什麼

DatanodeCommand cmd= namenode.blockReport(dnRegistration,

                   BlockListAsLongs.convertToArrayLongs(bReport));

看到這裡了,namenode是DatanodeProtocol介面也就是NameNode類。這裡是RPC客戶端的遠端呼叫,datanode會掃描其機器上對應儲存hdfs block的目錄下(dfs.data.dir)所儲存的所有檔案塊,然後通過namenode的rpc呼叫將這些block的資訊以一個long陣列的方式傳送給namenode,namenode在接收到一個datanode的blockReport rpc呼叫後,從rpc中解析出block陣列,並將這些接收到的blocks插入到BlocksMap表中,由於此時BlocksMap缺少的僅僅是每個block對應的datanode資訊,而namenoe能從report中獲知當前report上來的是哪個datanode的塊資訊,所以,blockReport過程實際上就是namenode在接收到塊資訊彙報後,填充BlocksMap中每個block對應的datanodes列表的三元組資訊的過程。其流程如下圖所示:

當所有的datanode彙報完block,namenode針對每個datanode的彙報進行過處理後,namenode的啟動過程到此結束。此時BlocksMap中block->datanodes的對應關係已經初始化完畢。如果此時已經達到安全模式的推出閾值,則hdfs主動退出安全模式,開始提供服務。

// If we have sent the firstblock report, then wait a random

// time before we start the periodic blockreports.

如果我們把第一塊報告發送了之後,在我們開始定期塊報告之前會等待一個隨機時間。

傳送完畢之後NameNode會給DataNode傳送一些指令,然後DataNode會處理這些指令

processCommand(cmd);

基本上也就分析完了

# DataNode啟動過程分析

DataNode一啟動,會對DataNode進行初始化,最終進入到startDataNode方法中,該方法

主要有4個重要的地方。

1起了一個RPC客戶端,namenode,實現的介面是DataProtocol,主要是DataNode跟NameNode通訊用的,DataNode將自己的一些狀態(容量,使用,未使用等)告訴NameNode,然後NameNode返回DataNode一些指令,告訴DataNode要去做什麼。

2dataXceiverServer的啟動,是一個執行緒組(流服務)

3起了一個jetty伺服器,提供web方式的訪問

4起了一個RPC的服務端,給NameNode呼叫(應該是這樣)

接著會例項化datanode和開始一個datanode守護程序(runDatanodeDaemon(dn)),然後會呼叫datanode的join方法,進入到run方法,這個方法將剛剛startDataNode方法中的RPC服務端開啟,然後呼叫了一個offerService方法,裡面是一個死迴圈,最先開始是datanode RPC遠端呼叫namenode.sendHeartbeat方法,這裡的namenode在上面已經解釋過了,其實就是DataNode,這裡會呼叫DataNode的sendHeartbeat方法,將自己的狀態作為引數(容量,空間使用了多少,剩餘多少等等)傳遞給了namenode。然後namenode呼叫方法得到一些返回值給datanode,datanode處理這些命令,然後再處理這些指令processCommand(cmds)。

接下來datanode會呼叫namenode.blockReport的方法,datanode開始掃描自己目錄下所儲存的所有檔案塊,然後通過namenode的rpc呼叫將這些block資訊以一個long陣列的方式傳送給namenode,namenode在接收到一個datanode的blockReport rpc呼叫後,從rpc中解析出block陣列,並將這些接收到的blocks插入到BlocksMap表中,由於NameNode啟動時BlocksMap缺少的僅僅是每個block對應的datanode資訊,而namenoe能從blocReport中獲知當前blockReport上來的是哪個datanode的塊資訊,所以,blockReport過程實際上就是namenode在接收到塊資訊彙報後,填充BlocksMap中每個block對應的datanodes列表的三元組資訊的過程。當所有的datanode彙報完block,namenode針對每個datanode的彙報進行過處理後,namenode的啟動過程到此結束。此時BlocksMap中block->datanodes的對應關係已經初始化完畢。如果此時已經達到安全模式的退出閾值,則hdfs主動退出安全模式,開始提供服務。

呼叫完namenode.blockReport方法之後 ,namenode會給一些指令給datanode,然後datanode再處理這些指令。

在這個過程中是datanode主動與namenode通訊,然後namenode傳給datanode一些函式的返回值,告訴datanode該做什麼。

DataProtocol中的註釋:

* The only way aNameNode can communicate with a DataNode is by

 *returning values from thesefunctions.

The you smile until forever 、、、、、、、、、、、、、、、、、、、、、