1. 程式人生 > >HDFS原始碼分析資料塊校驗之DataBlockScanner

HDFS原始碼分析資料塊校驗之DataBlockScanner

        DataBlockScanner是執行在資料節點DataNode上的一個後臺執行緒。它為所有的塊池管理塊掃描。針對每個塊池,一個BlockPoolSliceScanner物件將會被建立,其執行在一個單獨的執行緒中,為該塊池掃描、校驗資料塊。當一個BPOfferService服務變成活躍或死亡狀態,該類中的blockPoolScannerMap將會更新。

        我們先看下DataBlockScanner的成員變數,如下:

  // 所屬資料節點DataNode例項
  private final DataNode datanode;
  // 所屬儲存FsDatasetSpi例項
  private final FsDatasetSpi<? extends FsVolumeSpi> dataset;
  // 配置資訊Configuration例項
  private final Configuration conf;
  
  // 執行緒休眠週期,5s
  static final int SLEEP_PERIOD_MS = 5 * 1000;

  /**
   * Map to find the BlockPoolScanner for a given block pool id. This is updated
   * when a BPOfferService becomes alive or dies.
   * 儲存塊池ID到對應BlockPoolScanner例項的對映。
   * 當一個BPOfferService服務變成活躍或死亡狀態,blockPoolScannerMap將會隨之更新。
   */
  private final TreeMap<String, BlockPoolSliceScanner> blockPoolScannerMap = 
    new TreeMap<String, BlockPoolSliceScanner>();
  
  // 資料塊掃描執行緒
  Thread blockScannerThread = null;
        首先是由建構函式確定的三個成員變數:所屬資料節點DataNode例項datanode、所屬儲存FsDatasetSpi例項dataset、配置資訊Configuration例項conf,對應建構函式如下:
  // 建構函式
  DataBlockScanner(DataNode datanode,
      FsDatasetSpi<? extends FsVolumeSpi> dataset,
      Configuration conf) {
    this.datanode = datanode;
    this.dataset = dataset;
    this.conf = conf;
  }
        然後設定了一個靜態變數,5s的執行緒休眠週期,即SLEEP_PERIOD_MS,另外兩個重要的成員變數是:

       1、TreeMap<String, BlockPoolSliceScanner> blockPoolScannerMap

             儲存塊池ID到對應BlockPoolScanner例項的對映。當一個BPOfferService服務變成活躍或死亡狀態,blockPoolScannerMap將會隨之更新。

        2、Thread blockScannerThread

              資料塊掃描執行緒。

        既然DataBlockScanner實現了Runnable介面,那麼它肯定是作為一個執行緒在DataNode節點上執行的,我們看下DataNode是如何對其進行構造及啟動的,程式碼如下:

  /**
   * See {@link DataBlockScanner}
   */
  private synchronized void initDataBlockScanner(Configuration conf) {
    
	// 如果blockScanner不為null,直接返回
	if (blockScanner != null) {
      return;
    }
	
	// 資料塊校驗功能無法開啟的原因
    String reason = null;
    assert data != null;
    
    // 如果引數dfs.datanode.scan.period.hours未配置,或者配置為0,說明資料塊校驗功能已關閉
    if (conf.getInt(DFS_DATANODE_SCAN_PERIOD_HOURS_KEY,
                    DFS_DATANODE_SCAN_PERIOD_HOURS_DEFAULT) < 0) {
      reason = "verification is turned off by configuration";
      
    // SimulatedFSDataset不支援資料塊校驗
    } else if ("SimulatedFSDataset".equals(data.getClass().getSimpleName())) {
      reason = "verifcation is not supported by SimulatedFSDataset";
    }
    
    // 如果資料塊校驗功能無法開啟的原因為null,構造DataBlockScanner例項,並呼叫其start()方法啟動該執行緒
    if (reason == null) {
      blockScanner = new DataBlockScanner(this, data, conf);
      blockScanner.start();
    } else {
    	
      // 否則在日誌檔案中記錄週期性資料塊校驗掃描無法啟用的原因
      LOG.info("Periodic Block Verification scan disabled because " + reason);
    }
  }
        首先,如果blockScanner不為null,直接返回,說明之前已經初始化並啟動了,然後,確定資料塊校驗功能無法開啟的原因reason:

        1、如果引數dfs.datanode.scan.period.hours未配置,或者配置為0,說明資料塊校驗功能已關閉;

        2、SimulatedFSDataset不支援資料塊校驗;

        如果資料塊校驗功能無法開啟的原因為null,構造DataBlockScanner例項,並呼叫其start()方法啟動該執行緒,否則在日誌檔案中記錄週期性資料塊校驗掃描無法啟用的原因。

        DataBlockScanner執行緒啟動的start()方法如下:

  public void start() {
	  
	// 基於DataBlockScanner例項建立一個執行緒blockScannerThread
    blockScannerThread = new Thread(this);
    // 將執行緒blockScannerThread設定為後臺執行緒
    blockScannerThread.setDaemon(true);
    // 啟動執行緒blockScannerThread
    blockScannerThread.start();
  }
        實際上它是基於DataBlockScanner例項建立一個執行緒blockScannerThread,將執行緒blockScannerThread設定為後臺執行緒,然後啟動執行緒blockScannerThread。

        DataBlockScanner執行緒已建立,並啟動,那麼我們看下它是如何工作的,接下來看下它的run()方法,程式碼如下:

  // 執行緒核心run()方法
  @Override
  public void run() {
	  
	// 當前塊池ID,預設為空
    String currentBpId = "";
    
    // 第一次執行標誌,預設當然應該為true
    boolean firstRun = true;
    
    // 如果所屬資料節點DataNode例項datanode正常執行,且當前執行緒沒有被中斷
    while (datanode.shouldRun && !Thread.interrupted()) {
      //Sleep everytime except in the first iteration.
    	
      // 如果不是第一次執行,執行緒休眠5s
      if (!firstRun) {
        try {
          Thread.sleep(SLEEP_PERIOD_MS);
        } catch (InterruptedException ex) {
          // Interrupt itself again to set the interrupt status
        	
          // 如果發生InterruptedException異常,中斷blockScannerThread執行緒,然後跳過,繼續下一輪迴圈
          blockScannerThread.interrupt();
          continue;
        }
      } else {
    	// 第一次執行時先將firstRun標誌設定為false
        firstRun = false;
      }
      
      // 獲取下一個塊池切片掃描器BlockPoolSliceScanner例項bpScanner
      BlockPoolSliceScanner bpScanner = getNextBPScanner(currentBpId);
      
      // 如果bpScanner為null,跳過,繼續下一輪迴圈
      if (bpScanner == null) {
        // Possible if thread is interrupted
        continue;
      }
      
      // 設定當前塊池ID,即currentBpId,從塊池切片掃描器BlockPoolSliceScanner例項bpScanner中獲取
      currentBpId = bpScanner.getBlockPoolId();
      
      // If BPOfferService for this pool is not alive, don't process it
      // 如果當前塊池對應的心跳服務BPOfferService不是活躍的,不對它進行處理,呼叫removeBlockPool()方法從blockPoolScannerMap中移除資料,
      // 並關閉對應BlockPoolSliceScanner,然後跳過,執行下一輪迴圈
      if (!datanode.isBPServiceAlive(currentBpId)) {
        LOG.warn("Block Pool " + currentBpId + " is not alive");
        // Remove in case BP service died abruptly without proper shutdown
        removeBlockPool(currentBpId);
        continue;
      }
      
      // 呼叫塊池切片掃描器BlockPoolSliceScanner例項bpScanner的scanBlockPoolSlice()方法,
      // 掃描對應塊池裡的資料塊,進行資料塊校驗
      bpScanner.scanBlockPoolSlice();
    }

    // Call shutdown for each allocated BlockPoolSliceScanner.
    // 退出迴圈後,遍歷blockPoolScannerMap中的每個BlockPoolSliceScanner例項bpss,
    // 挨個呼叫對應shutdown()方法,停止塊池切片掃描器BlockPoolSliceScanner
    for (BlockPoolSliceScanner bpss: blockPoolScannerMap.values()) {
      bpss.shutdown();
    }
  }
        run()方法邏輯比較清晰,大體如下:

        1、首先初始化當前塊池ID,即currentBpId,預設為空,再確定第一次執行標誌firstRun,預設當然應該為true;

        2、接下來進入一個while迴圈,迴圈的條件是如果所屬資料節點DataNode例項datanode正常執行,且當前執行緒沒有被中斷:

               2.1、處理第一次執行標誌位firstRun:

                         2.1.1、如果不是第一次執行,執行緒休眠5s:即firstRun為false,這時如果發生InterruptedException異常,中斷blockScannerThread執行緒,然後跳過,繼續下一輪迴圈;

                         2.1.2、第一次執行時先將firstRun標誌設定為false;

               2.2、獲取下一個塊池切片掃描器BlockPoolSliceScanner例項bpScanner,通過呼叫getNextBPScanner()方法,傳入當前塊池ID,即currentBpId來實現,首次迴圈,currentBpId為空,後續會傳入之前處理的值,下面會對其進行更新;

               2.3、如果bpScanner為null,跳過,繼續下一輪迴圈;

               2.4、設定當前塊池ID,即currentBpId,從塊池切片掃描器BlockPoolSliceScanner例項bpScanner中獲取;

               2.5、如果當前塊池對應的心跳服務BPOfferService不是活躍的,不對它進行處理,呼叫removeBlockPool()方法從blockPoolScannerMap中移除資料,並關閉對應BlockPoolSliceScanner,然後跳過,執行下一輪迴圈;

               2.6、呼叫塊池切片掃描器BlockPoolSliceScanner例項bpScanner的scanBlockPoolSlice()方法,掃描對應塊池裡的資料塊,進行資料塊校驗;

        3、退出迴圈後,遍歷blockPoolScannerMap中的每個BlockPoolSliceScanner例項bpss,挨個呼叫對應shutdown()方法,停止塊池切片掃描器BlockPoolSliceScanner。

        我們接下來看下比較重要的getNextBPScanner()方法,程式碼如下:

  /**
   * Find next block pool id to scan. There should be only one current
   * verification log file. Find which block pool contains the current
   * verification log file and that is used as the starting block pool id. If no
   * current files are found start with first block-pool in the blockPoolSet.
   * However, if more than one current files are found, the one with latest 
   * modification time is used to find the next block pool id.
   * 尋找下一個塊池ID以進行scan。
   * 此時應該只有一個當前驗證日誌檔案。
   */
  private BlockPoolSliceScanner getNextBPScanner(String currentBpId) {
    
    String nextBpId = null;
    
    // 如果所屬資料節點DataNode例項datanode正常執行,且當前blockScannerThread執行緒沒有被中斷
    while (datanode.shouldRun && !blockScannerThread.isInterrupted()) {
    	
      // 等待初始化
      waitForInit();
      
      synchronized (this) {
    	  
    	// 當blockPoolScannerMap大小大於0,即存在BlockPoolSliceScanner例項時,做以下處理:
        if (getBlockPoolSetSize() > 0) {          
          // Find nextBpId by the minimum of the last scan time
          // lastScanTime用於記錄上次瀏覽時間
          long lastScanTime = 0;
          
          // 遍歷blockPoolScannerMap集合,取出每個塊池ID,即bpid
          for (String bpid : blockPoolScannerMap.keySet()) {
        	  
        	// 根據塊池ID,即bpid,取出其對應BlockPoolSliceScanner例項的上次瀏覽時間t
            final long t = getBPScanner(bpid).getLastScanTime();
            
            // 如果t不為0,且如果塊池ID為null,或者t小於lastScanTime,則將t賦值給lastScanTime,bpid賦值給nextBpId
            // 也就是計算最早的上次瀏覽時間lastScanTime,和對應塊池ID,即nextBpId
            if (t != 0L) {
              if (bpid == null || t < lastScanTime) {
                lastScanTime =  t;
                nextBpId = bpid;
              }
            }
          }
          
          // nextBpId can still be null if no current log is found,
          // find nextBpId sequentially.
          
          // 如果對應塊池ID,即nextBpId為null,則取比上次處理的塊池currentBpId高的key作為nextBpId,
          // 如果還不能取出的話,那麼取第一個塊池ID,作為nextBpId
          if (nextBpId == null) {
            nextBpId = blockPoolScannerMap.higherKey(currentBpId);
            if (nextBpId == null) {
              nextBpId = blockPoolScannerMap.firstKey();
            }
          }
          
          // 如果nextBpId不為空,那麼從blockPoolScannerMap中獲取其對應BlockPoolSliceScanner例項返回
          if (nextBpId != null) {
            return getBPScanner(nextBpId);
          }
        }
      }
      
      // 記錄warn日誌,No block pool is up, going to wait,然後等待
      LOG.warn("No block pool is up, going to wait");
      
      try {
    	// 執行緒休眠5s
        Thread.sleep(5000);
      } catch (InterruptedException ex) {
        LOG.warn("Received exception: " + ex);
        blockScannerThread.interrupt();
        return null;
      }
    }
    return null;
  }
        它的主要作用就是尋找下一個塊池ID以進行scan,其存在一個整體的while迴圈,迴圈的條件為如果所屬資料節點DataNode例項datanode正常執行,且當前blockScannerThread執行緒沒有被中斷,迴圈內做以下處理:

        1、呼叫waitForInit()方法等待初始化;

        2、當前物件上使用synchronized進行同步,當blockPoolScannerMap大小大於0,即存在BlockPoolSliceScanner例項時,做以下處理:

               2.1、設定lastScanTime用於記錄上次瀏覽時間,預設值為0;

               2.2、遍歷blockPoolScannerMap集合,取出每個塊池ID,即bpid,計算最早的上次瀏覽時間lastScanTime,和對應塊池ID,即nextBpId:

                        2.2.1、根據塊池ID,即bpid,取出其對應BlockPoolSliceScanner例項的上次瀏覽時間t;

                        2.2.2、如果t不為0,且如果塊池ID為null,或者t小於lastScanTime,則將t賦值給lastScanTime,bpid賦值給nextBpId,也就是計算最早的上次瀏覽時間lastScanTime,和對應塊池ID,即nextBpId;

               2.3、如果對應塊池ID,即nextBpId為null,則取比上次處理的塊池currentBpId高的key作為nextBpId,如果還不能取出的話,那麼取第一個塊池ID,作為nextBpId;

               2.4、如果nextBpId不為空,那麼從blockPoolScannerMap中獲取其對應BlockPoolSliceScanner例項返回;

        3、如果blockPoolScannerMap大小等於0,或者上述2找不到的話,記錄warn日誌,No block pool is up, going to wait,然後等待5s後繼續下一輪迴圈;

        最後,實在找不到就返回null。

        可見,getNextBPScanner()方法優先選取最早處理過的塊池,找不到的話再按照之前處理過的塊池ID增長的順序,找下一個塊池ID,按照塊池ID大小順序到尾部的話,再折回取第一個。

        其中等待初始化的waitForInit()方法比較簡單,程式碼如下:

  // Wait for at least one block pool to be up
  private void waitForInit() {
	  
	// 如果BlockPoolSliceScanner的個數小於資料節點所有BpOS個數,或者BlockPoolSliceScanner的個數小於1,一直等待
	// BpOS你可以理解為DataNode上每個塊池或名稱空間對應的一個例項,它處理該名稱空間到對應活躍或備份狀態NameNode的心跳。
    while ((getBlockPoolSetSize() < datanode.getAllBpOs().length)
        || (getBlockPoolSetSize() < 1)) {
      try {
    	  
    	// 執行緒休眠5s
        Thread.sleep(SLEEP_PERIOD_MS);
      } catch (InterruptedException e) {
    	  
    	// 如果發生InterruptedException異常,中斷blockScannerThread執行緒,然後返回
        blockScannerThread.interrupt();
        return;
      }
    }
  }
        它本質上是等所有塊池都被上報至blockPoolScannerMap集合後,才認為已完成初始化,然後再挑選塊池ID,否則執行緒休眠5s,繼續等待。程式碼註釋比較詳細,這裡不再贅述!

        獲取到塊池ID,並獲取到其對應的塊池切片掃描器BlockPoolSliceScanner例項bpScanner了,接下來就是呼叫bpScanner的scanBlockPoolSlice()方法,掃描該塊池的資料塊,並做資料塊校驗工作了。這方面的內容,請閱讀《HDFS原始碼分析資料塊校驗之BlockPoolSliceScanner》一文,這裡不再做介紹。

        到了這裡,各位看官可能有個疑問,選取塊池所依賴的blockPoolScannerMap集合中的資料是哪裡來的呢?答案就在處理資料節點心跳的BPServiceActor執行緒中,在完成資料塊彙報、處理來自名位元組點NameNode的相關命令等操作後,有如下程式碼被執行:

        // Now safe to start scanning the block pool.
        // If it has already been started, this is a no-op.
        // 現在可以安全地掃描塊池,如果它已經啟動,這是一個空操作。
        if (dn.blockScanner != null) {
          dn.blockScanner.addBlockPool(bpos.getBlockPoolId());
        }
        很簡單,資料節點彙報資料塊給名位元組點,並執行來自名位元組點的相關命令後,就可以通過資料節點DataNode中成員變數blockScanner的addBlockPool()方法,新增塊池,程式碼如下:
  public synchronized void addBlockPool(String blockPoolId) {
    
	// 如果blockPoolScannerMap集合中存在塊池blockPoolId,直接返回
	if (blockPoolScannerMap.get(blockPoolId) != null) {
      return;
    }
	
	// 根據塊池blockPoolId、資料節點datanode、儲存dataset、配置資訊conf等構造BlockPoolSliceScanner例項bpScanner
    BlockPoolSliceScanner bpScanner = new BlockPoolSliceScanner(blockPoolId,
        datanode, dataset, conf);
    
    // 將塊池blockPoolId與bpScanner的對映關係儲存到blockPoolScannerMap中
    blockPoolScannerMap.put(blockPoolId, bpScanner);
    
    // 記錄日誌資訊
    LOG.info("Added bpid=" + blockPoolId + " to blockPoolScannerMap, new size="
        + blockPoolScannerMap.size());
  }
        邏輯很簡單,首先需要看看blockPoolScannerMap集合中是否存在塊池blockPoolId,存在即返回,否則根據塊池blockPoolId、資料節點datanode、儲存dataset、配置資訊conf等構造BlockPoolSliceScanner例項bpScanner,將塊池blockPoolId與bpScanner的對映關係儲存到blockPoolScannerMap中,最後記錄日誌資訊。

        我們在上面也提到了如果當前塊池對應的心跳服務BPOfferService不是活躍的,那麼會呼叫removeBlockPool()方法,移除對應的塊池,程式碼如下:

  public synchronized void removeBlockPool(String blockPoolId) {
	  
	// 根據塊池blockPoolId,從blockPoolScannerMap中移除資料,並得到對應BlockPoolSliceScanner例項bpss
    BlockPoolSliceScanner bpss = blockPoolScannerMap.remove(blockPoolId);
    
    // 呼叫bpss的shutdown()方法,關閉bpss
    if (bpss != null) {
      bpss.shutdown();
    }
    
    // 記錄日誌資訊
    LOG.info("Removed bpid="+blockPoolId+" from blockPoolScannerMap");
  }
        程式碼很簡單,不再贅述。

        總結

        DataBlockScanner是執行在資料節點DataNode上的一個後臺執行緒,它負責管理所有塊池的資料塊掃描工作。當資料節點DataNode傳送心跳給名位元組點NameNode進行資料塊彙報並執行完返回的命令時,會在DataBlockScanner的內部集合blockPoolScannerMap中註冊塊池ID與為此新建立的BlockPoolSliceScanner物件的關係,然後DataBlockScanner內部執行緒blockScannerThread週期性的挑選塊池currentBpId,並獲取塊池切片掃描器BlockPoolSliceScanner例項bpScanner,繼而呼叫其scanBlockPoolSlice()方法,掃描對應塊池裡的資料塊,進行資料塊校驗。塊池選擇的主要依據就是優先選擇掃描時間最早的,也就是自上次掃描以來最長時間沒有進行掃描的,按照這一依據選擇不成功的話,則預設按照塊池ID遞增的順序迴圈選取塊池。