1. 程式人生 > >Hadoop原始碼:namenode格式化和啟動過程實現

Hadoop原始碼:namenode格式化和啟動過程實現

基於原始碼原始碼分析hadoop namenode格式化和啟動過程實現 (According to the source code analysis hadoop namenode formatting and startup process implementation.)

Namenode 管理hdfs元資料和RPC服務響應客戶端,初次使用時需格式化;元資料儲存在edits和fsimage檔案,其中 fsimage 儲存最新的元資料資訊,edits 儲存自最新的元資料資訊後的變化;edits 儲存的資訊會在下一次啟動 namenode 時與 fsimage 合併產生新的 fsimage 檔案。我們先以1.0版本原始碼為例介紹,後期再補充新版本的擴充套件和改變,由於篇幅原因,部分程式碼被去除。

1 namenode 格式化

初次啟動namenode時,hadoop要求必須格式化namenode;

hadoop namenode -format

格式化會刪除namenode image和edits檔案,再建立新的 image 和 edits;

public static void format(File dir, Configuration conf)
  throws IOException {
    File image = new File(dir, "image");
    File edits = new File(dir, "edits");

    if (!((!image.exists() || FileUtil.fullyDelete(image, conf)) &&
          (!edits.exists() || edits.delete()) &&image.mkdirs())) {

      throw new IOException("Unable to format: "+dir);
    }
}

所以已經生產使用的叢集,謹慎使用。

2 namenode初始化和啟動

Configuration conf = new Configuration();
NameNode namenode = new NameNode(conf);
namenode.join();

namenode初始化和啟動過程中四個重要的操作:

1、根據(fs.default.name(舊版本引數)/fs.defaultFS(新版本引數))配置的主機名和埠號建立套接字地址,這個地址就是 namenode 檔案系統元資料地址; 2、載入 namenode 元資料;首先將現有的 image 資料以層級方式載入到記憶體列表,載入完成後存放到活動列表中;

TreeSet activeBlocks = new TreeSet();
DataInputStream in = new DataInputStream(new BufferedInputStream(new FileInputStream(curFile)));
try {
    int numFiles = in.readInt();
    for (int i = 0; i < numFiles; i++) {
        UTF8 name = new UTF8();
        name.readFields(in);
        int numBlocks = in.readInt();
        if (numBlocks == 0) {
            unprotectedAddFile(name, null);
        } else {
            Block blocks[] = new Block[numBlocks];
            for (int j = 0; j < numBlocks; j++) {
                blocks[j] = new Block();
                blocks[j].readFields(in);
            }
            unprotectedAddFile(name, blocks);
        }
    }
} finally {
    in.close();
}

對於儲存了大量資料的叢集,元資料載入到記憶體會比較耗時,總的元資料數 / 載入完成後存放到活動列表 的數量 * 100% 就是我們在前端介面看到的namenode啟動進度。image 載入完成後,繼續載入最新的 edits ,並將 edits 合併到 image 產生新的 image 檔案,並建立一個新的 edits 檔案記錄系統執行時的變化。

public FSDirectory(File dir) throws IOException {
    File fullimage = new File(dir, "image");
    if (! fullimage.exists()) {
      throw new IOException("NameNode not formatted: " + dir);
    }
    File edits = new File(dir, "edits");
    if (loadFSImage(fullimage, edits)) {
        saveFSImage(fullimage, edits);
    }

    synchronized (this) {
        this.ready = true;
        this.notifyAll();
        this.editlog = new DataOutputStream(new FileOutputStream(edits));
    }
}

3、建立datanode節點和namenode心跳監控、建立租憑管理監控(可以理解為鎖管理,比如釋放鎖);

this.hbthread = new Daemon(new HeartbeatMonitor());
this.lmthread = new Daemon(new LeaseMonitor());
hbthread.start();
lmthread.start();

namenode通過間隔的檢查與datanode心跳來判斷datanode的存活;租憑管理監控通俗的理解就是當某個操作(比如:寫資料)超出namenode規定的最大操作時間時,這個租憑管理監控程序就會認為該操作過期了,將它釋放清除。

while ((sortedLeases.size() > 0) &&((top = (Lease) sortedLeases.first()) != null)) {
    if (top.expired()) {
        top.releaseLocks();
        leases.remove(top.holder);
        LOG.info("Removing lease " + top + ", leases remaining: " + sortedLeases.size());
        if (!sortedLeases.remove(top)) {
            LOG.info("Unknown failure trying to remove " + top + " from lease set.");
        }
    } else {
        break;
    }
}

4、為namenode建立RPC伺服器(執行緒池),這個執行緒池的作用是用來處理客戶端的遠端過程呼叫及叢集守護程序的呼叫

public NameNode(Configuration conf) throws IOException {
    this(getDir(conf),DataNode.createSocketAddr(conf.get("fs.default.name", "local")).getPort(), conf);
}

public NameNode(File dir, int port, Configuration conf) throws IOException {
    this.namesystem = new FSNamesystem(dir, conf);
    this.handlerCount = conf.getInt("dfs.namenode.handler.count", 10);
    this.server = RPC.getServer(this, port, handlerCount, false, conf);
    this.server.start();
}

通過dfs.namenode.handler.count 引數設定執行緒池大小,預設是10;

public synchronized void start() throws IOException {
  Listener listener = new Listener();
  listener.start();

  for (int i = 0; i < handlerCount; i++) {
    Handler handler = new Handler(i);
    handler.start();
  }
}

在生產環境中,該引數的設定需要多方面考慮和商榷,對於大叢集,來自不同DataNode的併發心跳以及客戶端併發的元資料操作可能巨大,如果執行緒池太小,總是忙碌狀態,客戶端連線NameNode的時候總是超時或者連線被拒絕,這意味著需要更大的池來處理;執行緒池太大,會引發呼叫延遲和增加主機負載等問題;

this.server = RPC.getServer(this, port, handlerCount, false, conf);
this.server.start();

到此RPC伺服器(執行緒池)構建並啟動,整個namenode啟動完成,RPC伺服器(執行緒池)進入監聽響應狀態;

210116 222524 Server listener on port 9000: starting
210116 222524 Server handler 0 on 9000: starting
210116 222524 Server handler 1 on 9000: starting
210116 222524 Server handler 2 on 9000: starting
210116 222524 Server handler 3 on 9000: starting
210116 222524 Server handler 4 on 9000: starting
210116 222524 Server handler 5 on 9000: starting
210116 222524 Server handler 6 on 9000: starting
210116 222524 Server handler 7 on 9000: starting
210116 222524 Server handler 8 on 9000: starting
210116 222524 Server handler 9 on 9000: starting

3 總結

Namenode 格式化就是刪除已經存在的edits和fsimage檔案,建立新的;Namenode啟動時建立套接字地址、載入 namenode 元資料、建立監控程序(datanode節點和namenode心跳監控、租憑管理監控)、建立RPC伺服器(執行緒池)。

參考文獻

  • https://hadoop.apache.org/docs/r1.0.4 - hadoop documenation