1. 程式人生 > 其它 >hbase原始碼系列(七)Snapshot的過程

hbase原始碼系列(七)Snapshot的過程

  在看這一章之前,建議大家先去看一下snapshot的使用。可能有人會有疑問為什麼要做Snapshot,hdfs不是自帶了3個備份嗎,這是個很大的誤區,要知道hdfs的3個備份是用於防止網路傳輸中的失敗或者別的異常情況導致資料塊丟失或者不正確,它不能避免人為的刪除資料導致的後果。它就想是給資料庫做備份,尤其是做刪除動作之前,不管是hbase還是hdfs,請經常做Snapshot,否則哪天手賤了。。。

  直接進入主題吧,上程式碼。

public void takeSnapshot(SnapshotDescription snapshot) throws IOException {
    // 清空之前完成的備份和恢復的任務
    cleanupSentinels();

    // 設定snapshot的版本
    snapshot = snapshot.toBuilder().setVersion(SnapshotDescriptionUtils.SNAPSHOT_LAYOUT_VERSION)
        .build();

    // if the table is enabled, then have the RS run actually the snapshot work
    TableName snapshotTable = TableName.valueOf(snapshot.getTable());
    AssignmentManager assignmentMgr = master.getAssignmentManager();
    //根據表的狀態選擇snapshot的型別
    if (assignmentMgr.getZKTable().isEnabledTable(snapshotTable)) {
      snapshotEnabledTable(snapshot);
    }
    // 被禁用的表走這個方法
    else if (assignmentMgr.getZKTable().isDisabledTable(snapshotTable)) {
      snapshotDisabledTable(snapshot);
    } else {
      throw new SnapshotCreationException("Table is not entirely open or closed", tpoe, snapshot);
    }
  }

  從程式碼上看得出來,啟用的表和被禁用的表走的是兩個不同的方法。

Snapshot啟用的表

  先看snapshotEnabledTable方法吧,看看線上的表是怎麼備份的。

private synchronized void snapshotEnabledTable(SnapshotDescription snapshot)
      throws HBaseSnapshotException {
    // snapshot準備工作
    prepareToTakeSnapshot(snapshot);
    
    // new一個handler
    EnabledTableSnapshotHandler handler =
        new EnabledTableSnapshotHandler(snapshot, master, this);
    //通過handler執行緒來備份
    snapshotTable(snapshot, handler);
 }

  這裡就兩步,先去看看snapshot前的準備工作吧,F3進入prepareToTakeSnapshot方法。這個方法裡面也沒幹啥,就是檢查一下是否可以對這個表做備份或者恢復的操作,然後就會重建這個工作目錄,這個工作目錄在.hbase-snapshot/.tmps下面,每個snapshot都有自己的目錄。

  在snapshotTable裡面把就執行緒提交一下,讓handler來處理了。

handler.prepare();
this.executorService.submit(handler);
this.snapshotHandlers.put(TableName.valueOf(snapshot.getTable()), handler);

  這些都不是重點,咱到handler那邊去看看吧,EnabledTableSnapshotHandler是繼承TakeSnapshotHandler的,prepare方法和process方法都一樣,區別在於snapshotRegions方法被重寫了。

看prepare方法還是檢查表的定義檔案在不在,我們直接進入process方法吧。

      // 把snapshot的資訊寫入到工作目錄
      SnapshotDescriptionUtils.writeSnapshotInfo(snapshot, workingDir, this.fs);
      // 開一個執行緒去複製表資訊檔案
      new TableInfoCopyTask(monitor, snapshot, fs, rootDir).call();
      monitor.rethrowException();
      //查詢該表相關的region和位置
      List<Pair<HRegionInfo, ServerName>> regionsAndLocations =
          MetaReader.getTableRegionsAndLocations(this.server.getCatalogTracker(),
              snapshotTable, false);

      // 開始snapshot
      snapshotRegions(regionsAndLocations);

      // 獲取serverNames列表,後面的校驗snapshot用到
      Set<String> serverNames = new HashSet<String>();
      for (Pair<HRegionInfo, ServerName> p : regionsAndLocations) {
        if (p != null && p.getFirst() != null && p.getSecond() != null) {
          HRegionInfo hri = p.getFirst();
          if (hri.isOffline() && (hri.isSplit() || hri.isSplitParent())) continue;
          serverNames.add(p.getSecond().toString());
        }
      }

      // 檢查snapshot是否合格
      status.setStatus("Verifying snapshot: " + snapshot.getName());
      verifier.verifySnapshot(this.workingDir, serverNames);
// 備份完畢之後,把臨時目錄轉移到正式的目錄
      completeSnapshot(this.snapshotDir, this.workingDir, this.fs);

  1、寫一個.snapshotinfo檔案到工作目錄下

  2、把表的定義資訊寫一份到工作目錄下,即.tabledesc檔案

  3、查詢和表相關的Region Server和機器

  4、開始備份

  5、檢驗snapshot的結果

  6、確認沒問題了,就把臨時目錄rename到正式目錄

  我們直接到備份這一步去看吧,方法在EnabledTableSnapshotHandler裡面,重寫了。

  // 用分散式事務來備份線上的,太強悍了
    Procedure proc = coordinator.startProcedure(this.monitor, this.snapshot.getName(),
      this.snapshot.toByteArray(), Lists.newArrayList(regionServers));
    try {
      // 等待完成
      proc.waitForCompleted();
    // 備份split過的region
      Path snapshotDir = SnapshotDescriptionUtils.getWorkingSnapshotDir(snapshot, rootDir);
      for (Pair<HRegionInfo, ServerName> region : regions) {
        HRegionInfo regionInfo = region.getFirst();
        if (regionInfo.isOffline() && (regionInfo.isSplit() || regionInfo.isSplitParent())) {
          if (!fs.exists(new Path(snapshotDir, regionInfo.getEncodedName()))) {
            LOG.info("Take disabled snapshot of offline region=" + regionInfo);
            snapshotDisabledRegion(regionInfo);
          }
        }
      }

  這裡用到一個分散式事務,這裡被我叫做分散式事務,我也不知道它是不是事務,但是Procedure這個詞我真的不好翻譯,叫過程也不合適。

分散式事務

  我們進入ProcedureCoordinator的startProcedure看看吧。

Procedure proc = createProcedure(fed, procName, procArgs,expectedMembers);
if (!this.submitProcedure(proc)) {
      LOG.error("Failed to submit procedure '" + procName + "'");
      return null;
}

  先建立Procedure,然後提交它,這塊沒什麼特別的,繼續深入進去submitProcedure方法也找不到什麼有用的資訊,我們得回到Procedure類裡面去,它是一個Callable的類,奧祕就在call方法裡面。

  final public Void call() {
    try {
      //在acquired節點下面建立例項節點
      sendGlobalBarrierStart();

      // 等待所有的rs回覆
      waitForLatch(acquiredBarrierLatch, monitor, wakeFrequency, "acquired");

      //在reached節點下面建立例項節點
      sendGlobalBarrierReached();

      //等待所有的rs回覆
      waitForLatch(releasedBarrierLatch, monitor, wakeFrequency, "released");

    } finally {
      sendGlobalBarrierComplete();
      completedLatch.countDown();
    }
}

   從sendGlobalBarrierStart開始看吧,裡面就一句話。

  coord.getRpcs().sendGlobalBarrierAcquire(this, args, Lists.newArrayList(this.acquiringMembers));

   再追殺下去。

final public void sendGlobalBarrierAcquire(Procedure proc, byte[] info, List<String> nodeNames)
      throws IOException, IllegalArgumentException {
    String procName = proc.getName();
    // 獲取abort節點的名稱
    String abortNode = zkProc.getAbortZNode(procName);
    try {
      // 如果存在abort節點,就廣播錯誤,中斷該過程
      if (ZKUtil.watchAndCheckExists(zkProc.getWatcher(), abortNode)) {
        abort(abortNode);
      }
    } catch (KeeperException e) {throw new IOException("Failed while watching abort node:" + abortNode, e);
    }

    // 獲得acquire節點名稱
    String acquire = zkProc.getAcquiredBarrierNode(procName);
  try {
      // 建立acquire節點,info資訊是Snapshot的資訊,包括表名
      byte[] data = ProtobufUtil.prependPBMagic(info); 
      ZKUtil.createWithParents(zkProc.getWatcher(), acquire, data);
    // 監控acquire下面的節點,發現指定的節點,就報告給coordinator
      for (String node : nodeNames) {
        String znode = ZKUtil.joinZNode(acquire, node);if (ZKUtil.watchAndCheckExists(zkProc.getWatcher(), znode)) {
          coordinator.memberAcquiredBarrier(procName, node);
        }
      }
    } catch (KeeperException e) {
      throw new IOException("Failed while creating acquire node:" + acquire, e);
    }
  }

   1、首先是檢查abortNode ,什麼是abortNode ?每個procName在zk下面都有一個對應的節點,比如snapshot,然後在procName下面又分了acquired、reached、abort三個節點。檢查abort節點下面有沒有當前的例項。

  2、在acquired節點為該例項建立節點,建立例項節點的時候,把SnapshotDescription的資訊(在EnabledTableSnapshotHandler類裡面通過this.snapshot.toByteArray()傳進去的)放了進去,建立完成之後,在該例項節點下面監控各個Region Server的節點。如果發現已經有了,就更新Procedure中的acquiringMembers列表和inBarrierMembers,把節點從

acquiringMembers中刪除,然後新增到inBarrierMembers列表當中。

  3、到這一步服務端的工作就停下來了,等到所有RS接收到指令之後通過例項節點當中儲存的表資訊找到相應的region建立子過程,子過程在acquired節點下建立節點。

  4、收到所有RS的回覆之後,它才會開始在reached節點建立例項節點,然後繼續等待。

  5、RS完成任務之後,在reached的例項節點下面建立相應的節點,然後回覆。

  6、在確定所有的RS都完成工作之後,清理zk當中的相應proName節點。

  注意:在這個過程當中,有任務的錯誤,都會在abort節點下面建立該例項的節點,RS上面的子過程一旦發現abort存在該節點的例項,就會取消該過程。

  Snapshot這塊在Region Server是由RegionServerSnapshotManager類裡面的ProcedureMemberRpcs負責監測snapshot下面的節點變化,當發現acquired下面有例項之後,啟動新任務。

public ZKProcedureMemberRpcs(final ZooKeeperWatcher watcher, final String procType)
      throws KeeperException {
    this.zkController = new ZKProcedureUtil(watcher, procType) {
      @Override
      public void nodeCreated(String path) {
        if (!isInProcedurePath(path)) {
          return;
        }

        String parent = ZKUtil.getParent(path);
        // if its the end barrier, the procedure can be completed
        if (isReachedNode(parent)) {
          receivedReachedGlobalBarrier(path);
          return;
        } else if (isAbortNode(parent)) {
          abort(path);
          return;
        } else if (isAcquiredNode(parent)) {
          startNewSubprocedure(path);
        } else {
          LOG.debug("Ignoring created notification for node:" + path);
        }
      }

     };
  }

  這塊摺疊起來,不是咱們的重點,讓大家看看而已。我們直接進入Subprocedure這個類裡面看看吧。

final public Void call() {
    
    try {
      // 目前是什麼也沒幹
      acquireBarrier();
      // 在acquired的例項節點下面建立rs的節點
      rpcs.sendMemberAcquired(this);
      // 等待reached的例項節點的建立
      waitForReachedGlobalBarrier();
     // 幹活
      insideBarrier();
      // 活幹完了
      rpcs.sendMemberCompleted(this);
     } catch (Exception e) {
     } finally {
          releasedLocalBarrier.countDown();
     }
}

  insideBarrier的實現在FlushSnapshotSubprocedure這個類裡面,呼叫了flushSnapshot(),這個方法給每個region都開一個執行緒去提交。

for (HRegion region : regions) {
      taskManager.submitTask(new RegionSnapshotTask(region));
 }

Snapshot線上的region

  我們接下來看看RegionSnapshotTask的call方法

public Void call() throws Exception {
      // 上鎖,暫時不讓讀了
      region.startRegionOperation();
      try {
        region.flushcache();
        region.addRegionToSnapshot(snapshot, monitor);
      } finally {
        LOG.debug("Closing region operation on " + region);
        region.closeRegionOperation();
      }
      return null;
    }
}

  在對region操作之前,先上鎖,不讓讀了。然後就flushCache,這個方法很大,也好難懂哦,不過我們還是要迎接困難上,我摺疊起來吧,想看的就看,不想看的就看我下面的寫的步驟吧。

   MultiVersionConsistencyControl.WriteEntry w = null;
    this.updatesLock.writeLock().lock();
    long flushsize = this.memstoreSize.get();
    List<StoreFlushContext> storeFlushCtxs = new ArrayList<StoreFlushContext>(stores.size());
    long flushSeqId = -1L;
    //先flush日誌,再flush memstore到檔案
    try {
      // Record the mvcc for all transactions in progress.
      w = mvcc.beginMemstoreInsert();
      mvcc.advanceMemstore(w);

      if (wal != null) {
        //準備flush日誌,進入等待flush的佇列,這個startSeqId很重要,在恢復的時候就靠它了,它之前的日誌就是已經flush過了,不用恢復
        Long startSeqId = wal.startCacheFlush(this.getRegionInfo().getEncodedNameAsBytes());
        if (startSeqId == null) {
          return false;
        }
        flushSeqId = startSeqId.longValue();
      } else {
        flushSeqId = myseqid;
      }

      for (Store s : stores.values()) {
        storeFlushCtxs.add(s.createFlushContext(flushSeqId));
      }

      // 給MemStore做個snapshot,它的內部是兩個佇列,實際是從一個經常訪問的佇列放到另外一個不常訪問的佇列,那個佇列名叫snapshot
      for (StoreFlushContext flush : storeFlushCtxs) {
        flush.prepare();
      }
    } finally {
      this.updatesLock.writeLock().unlock();
    }
    // 同步未flush的日誌到硬碟上
    if (wal != null && !shouldSyncLog()) {
      wal.sync();
    }

    // 等待日誌同步完畢
    mvcc.waitForRead(w);
    boolean compactionRequested = false;
    try {//把memstore中的keyvalues全部flush到storefile儲存在臨時目錄當中,把flushSeqId追加到storefile裡
      for (StoreFlushContext flush : storeFlushCtxs) {
        flush.flushCache(status);
      }
      // 把之前生成在臨時目錄的檔案轉移到正式目錄
      for (StoreFlushContext flush : storeFlushCtxs) {
        boolean needsCompaction = flush.commit(status);
        if (needsCompaction) {
          compactionRequested = true;
        }
      }
      storeFlushCtxs.clear();
      // flush之後,就要減掉相應的memstore的大小
      this.addAndGetGlobalMemstoreSize(-flushsize);

  1、獲取WAL日誌的flushId(要寫入到hfile當中,以後恢復的時候,要拿日誌的flushId和hfile的flushId對比,小於hfile的flushId的就不用恢復了)

  2、給MemStore的做snapshot,從kvset集合轉移到snapshot集合

  3、同步日誌,寫入到硬碟

  4、把MemStore的的snapshot集合當中的內容寫入到hfile當中,MemStore當中儲存的是KeyValue的集合,寫入其實就是一個迴圈,呼叫StoreFile.Writer的append方法追加,具體的可以看我的那篇部落格《非mapreduce生成Hfile,然後匯入hbase當中》

  5、上一步的生成的檔案是儲存在臨時目錄中的,轉移到正式的目錄當中

  6、更新MemStore當中的大小

  好,我們繼續看addRegionToSnapshot方法,好累啊,尼瑪,這麼多步驟。

public void addRegionToSnapshot(SnapshotDescription desc,
      ForeignExceptionSnare exnSnare) throws IOException {// 獲取工作目錄
    Path rootDir = FSUtils.getRootDir(this.rsServices.getConfiguration());
    Path snapshotDir = SnapshotDescriptionUtils.getWorkingSnapshotDir(desc, rootDir);

    // 1. 在工作目錄建立region目錄和寫入region的資訊
    HRegionFileSystem snapshotRegionFs = HRegionFileSystem.createRegionOnFileSystem(conf,
        this.fs.getFileSystem(), snapshotDir, getRegionInfo());

    // 2. 為hfile建立引用
    for (Store store : stores.values()) {
      // 2.1. 分列族為store建立引用目錄,每個store屬於不同的列族
      Path dstStoreDir = snapshotRegionFs.getStoreDir(store.getFamily().getNameAsString());
      List<StoreFile> storeFiles = new ArrayList<StoreFile>(store.getStorefiles());// 2.2. 遍歷hfile,然後建立引用
      int sz = storeFiles.size();
      for (int i = 0; i < sz; i++) {
        StoreFile storeFile = storeFiles.get(i);
        Path file = storeFile.getPath();

        Path referenceFile = new Path(dstStoreDir, file.getName());
        boolean success = true;
        if (storeFile.isReference()) {
          // 把舊的引用檔案的內容寫入到新的引用檔案當中
          storeFile.getFileInfo().getReference().write(fs.getFileSystem(), referenceFile);
        } else {
          // 建立一個空的引用檔案
          success = fs.getFileSystem().createNewFile(referenceFile);
        }
        if (!success) {
          throw new IOException("Failed to create reference file:" + referenceFile);
        }
      }
    }
  }

  在工作目錄在.hbase-snapshot/.tmps/snapshotName/region/familyName/下面給hfile建立引用檔案。在建立引用檔案的時候,還要先判斷一下這個所謂的hfile是不是真的hfile,還是它本身就是一個引用檔案了。

  如果已經是引用檔案的話,把舊的引用檔案裡面的內容寫入到新的引用檔案當中。

  如果是一個正常的hfile的話,就建立一個空的引用檔案即可,以後我們可以通過它的名字找到它在snapshot下面相應的檔案。

  okay,到這裡,每個RS的工作都完成了。

備份split過的region

  完成執行分散式事務,就是備份split過的region了,把之前的程式碼再貼一次吧,摺疊起來,需要的自己看。

if (regionInfo.isOffline() && (regionInfo.isSplit() || regionInfo.isSplitParent())) {
    if (!fs.exists(new Path(snapshotDir, regionInfo.getEncodedName()))) {
          LOG.info("Take disabled snapshot of offline region=" + regionInfo);
          snapshotDisabledRegion(regionInfo);
     }
}
protected void snapshotDisabledRegion(final HRegionInfo regionInfo)
      throws IOException {
    // 建立新的region目錄和region資訊
    HRegionFileSystem regionFs = HRegionFileSystem.createRegionOnFileSystem(conf, fs,
      workingDir, regionInfo);

     // 把region下的recovered.edits目錄的檔案複製snapshot的對應region下面
    Path regionDir = HRegion.getRegionDir(rootDir, regionInfo);
    Path snapshotRegionDir = regionFs.getRegionDir();
    new CopyRecoveredEditsTask(snapshot, monitor, fs, regionDir, snapshotRegionDir).call();
    
    // 給每個列族的下面的檔案建立引用,所謂引用就是一個同名的空檔案
    new ReferenceRegionHFilesTask(snapshot, monitor, regionDir, fs, snapshotRegionDir).call();
}

  備份啟用的表,現在已經結束了,但是備份禁用的表吧,前面說了區別是snapshotRegions方法,但是方法除了做一些準備工作之外,就是snapshotDisabledRegion。。。。所以snapshot到這裡就完了,下面我們再回顧一遍吧。

  1、進行snapshot之前的準備,建立目錄,複製一些必要的資訊檔案等。

  2、對於啟用的表,啟動分散式事務,RS接到任務,flush掉WAL日誌和MemStore的資料,寫入檔案。

  3、為hfile建立引用檔案,這裡的引用檔案居然是空的檔案,而且名字一樣,它不是真的備份hfile,這是什麼回事呢?這個要到下一章,從snapshot中恢復,才能弄明白了,這個和hbase的歸檔檔案機制有關係,hbase刪除檔案的時候,不是直接刪除,而是把它先放入archive資料夾內。