1. 程式人生 > 其它 >hbase原始碼系列(十)HLog與日誌恢復

hbase原始碼系列(十)HLog與日誌恢復

HLog概述

hbase在寫入資料之前會先寫入MemStore,成功了再寫入HLog,當MemStore的資料丟失的時候,還可以用HLog的資料來進行恢復,下面先看看HLog的圖。

舊版的HLog是實際上是一個SequceneFile,0.96的已經使用Protobuf來進行序列化了。從Writer和Reader上來看HLog的都是Entry的,換句話說就是,它的每一條記錄就是一個Entry。

class Entry implements Writable {
    private WALEdit edit;
    private HLogKey key;
}

所以上面那個圖已經不準確了,HLogKey沒變,但是Value缺不是KeyValue,而是WALEdit。

下面我們看看HLogKey的五要素,region、tableName、log的順序、寫入時間戳、叢集id。

public HLogKey(final byte [] encodedRegionName, final TableName tablename,
      long logSeqNum, final long now, List<UUID> clusterIds){
    init(encodedRegionName, tablename, logSeqNum, now, clusterIds);
 }

protected void init(final byte [] encodedRegionName, final TableName tablename,
      long logSeqNum, final long now, List<UUID> clusterIds) {
    this.logSeqNum = logSeqNum;
    this.writeTime = now;
    this.clusterIds = clusterIds;
    this.encodedRegionName = encodedRegionName;
    this.tablename = tablename;
 }

下面看看WALEdit的屬性, 這裡只列出來一個重要的,它是內部持有的一群KeyValue。。

public class WALEdit implements Writable, HeapSize {
  ......private final ArrayList<KeyValue> kvs = new ArrayList<KeyValue>();

HLog的具體實現類是FSHLog,一個Region Server有兩個FSHLog,一個負責RS上面所有的使用者region的日誌,一個負責RS上面的META表的region的日誌。

對於日誌來說,我們關心的是它如何保證一致性和準確性,在需要它的時候可以發揮救命作用。

HLog同步

對於meta region的HLog寫入之後,它會立即同步到硬碟,非meta表的region,它會先把Entry新增到一個佇列裡面等待同步。

while(!this.isInterrupted() && !closeLogSyncer.get()) {
          try {
            if (unflushedEntries.get() <= syncedTillHere) {
              synchronized (closeLogSyncer) {
                closeLogSyncer.wait(this.optionalFlushInterval);
              }
            }// 同步已經新增的entry
            sync();
          } catch (IOException e) {
            LOG.error("Error while syncing, requesting close of hlog ", e);
            requestLogRoll();
            Threads.sleep(this.optionalFlushInterval);
          }
}

它這裡是有一個判斷條件的,如果判斷條件不成立就立即同步,等待this.optionalFlushInterval時間,預設的同步間隔是1000,它是通過引數hbase.regionserver.optionallogflushinterval設定。unflushedEntries是一個AtomicLong在寫入entry的時候遞增,syncedTillHere是一個volatile long,同步完成之後也是變大,因為可能被多個執行緒呼叫同步操作,所以它是volatile的,從條件上來看,如果沒有日誌需要同步就等待一秒再進行判斷,如果有日誌需要同步,也是立馬就寫入硬碟的,如果發生錯誤,就是呼叫requestLogRoll方法,進行回滾,這個回滾比較有意思,它是跑過去flush掉MemStore中的資料,把他們寫入硬碟。

下面是回滾的方法。中間我忽略了幾步,然後找到LogRoller中的這段程式碼。

byte [][] regionsToFlush = getWAL().rollWriter(rollLog.get());
        if (regionsToFlush != null) {
          for (byte [] r: regionsToFlush) scheduleFlush(r);
}

找出來需要flush的region,然後計劃flush。

regions = findMemstoresWithEditsEqualOrOlderThan(this.outputfiles.firstKey(),
          this.oldestUnflushedSeqNums);

static byte[][] findMemstoresWithEditsEqualOrOlderThan(
      final long walSeqNum, final Map<byte[], Long> regionsToSeqNums) {
    List<byte[]> regions = null;
    for (Map.Entry<byte[], Long> e : regionsToSeqNums.entrySet()) {
      //逐個對比,找出小於已輸出為檔案的最小的seq id的region
      if (e.getValue().longValue() <= walSeqNum) {
        if (regions == null) regions = new ArrayList<byte[]>();
        regions.add(e.getKey());
      }
    }
    return regions == null ? null : regions
        .toArray(new byte[][] { HConstants.EMPTY_BYTE_ARRAY });
}

逐個對比,找出來未flush MemStore的比輸出的檔案的HLog流水號還小的region,當它準備flush MemStore之前會呼叫startCacheFlush方法來把region從oldestUnflushedSeqNums這個map當中去除,新增到已經flush的map當中。

從日誌恢復

看過《HMaster啟動過程》的童鞋都知道,如果之前有region失敗的話,在啟動之前會把之前的HLog進行split,把屬於該region的為flush過的日誌提取出來,然後生成一個新的HLog到recovered.edits目錄下,中間的過程控制那塊有點兒類似於snapshot的那種,在zk裡面建立一個splitWAL節點,在這個節點下面建立任務,不一樣的是,snapshot那塊是自己處理自己的,這裡是別人的閒事它也管,處理完了之後就更新這個任務的狀態了,沒有snapshot那麼複雜的互動過程。

那啥時候會用到這個呢,在region開啟的時候,我們從HRegionServer的openRegion方法一路跟蹤,中間歷經OpenMetaHandler,再到HRegion.openHRegion方法,終於在initializeRegionStores方法裡面找到了那麼一句話。

    // 如果recovered.edits有日誌的話,就恢復日誌
    maxSeqId = Math.max(maxSeqId, replayRecoveredEditsIfAny(
        this.fs.getRegionDir(), maxSeqIdInStores, reporter, status));

高潮來了!!!

    HLog.Reader reader = null;
    try {
      //建立reader讀取hlog
      reader = HLogFactory.createReader(fs, edits, conf);
      long currentEditSeqId = -1;
      long firstSeqIdInLog = -1;
      long skippedEdits = 0;
      long editsCount = 0;
      long intervalEdits = 0;
      HLog.Entry entry;
      Store store = null;
      boolean reported_once = false;

      try {//逐個讀取
        while ((entry = reader.next()) != null) {
          HLogKey key = entry.getKey();
          WALEdit val = entry.getEdit();
          //例項化firstSeqIdInLog
          if (firstSeqIdInLog == -1) {
            firstSeqIdInLog = key.getLogSeqNum();
          }
          boolean flush = false;
          for (KeyValue kv: val.getKeyValues()) {
            // 從WALEdits裡面取出kvs
            if (kv.matchingFamily(WALEdit.METAFAMILY) ||
                !Bytes.equals(key.getEncodedRegionName(),
                  this.getRegionInfo().getEncodedNameAsBytes())) {//是meta表的kv就有compaction
              CompactionDescriptor compaction = WALEdit.getCompaction(kv);
              if (compaction != null) {
                //完成compaction未完成的事情,校驗輸入輸出檔案,完成檔案替換等操作
                completeCompactionMarker(compaction);
              }

              skippedEdits++;
              continue;
            }
            // 獲得kv對應的store
            if (store == null || !kv.matchingFamily(store.getFamily().getName())) {
              store = this.stores.get(kv.getFamily());
            }
            if (store == null) {
              // 應該不會發生,缺少它對應的列族
              skippedEdits++;
              continue;
            }
            // seq id小,呵呵,說明已經被處理過了這個日誌
            if (key.getLogSeqNum() <= maxSeqIdInStores.get(store.getFamily().getName())) {
              skippedEdits++;
              continue;
            }
            currentEditSeqId = key.getLogSeqNum();
            // 這個就是我們要處理的日誌,新增到MemStore裡面就ok了
            flush = restoreEdit(store, kv);
            editsCount++;
          }
          //MemStore太大了,需要flush掉
          if (flush) internalFlushcache(null, currentEditSeqId, status);

         }
      } catch (IOException ioe) {
        // 就是把名字改了,然後在後面加上".時間戳",這個有毛意思?
        if (ioe.getCause() instanceof ParseException) {
          Path p = HLogUtil.moveAsideBadEditsFile(fs, edits);
          msg = "File corruption encountered!  " +
              "Continuing, but renaming " + edits + " as " + p;
        } else {// 不知道是啥錯誤,拋錯誤吧,處理不了
          throw ioe;
        }
      }
      status.markComplete(msg);
      return currentEditSeqId;
    } finally {
      status.cleanup();
      if (reader != null) {
         reader.close();
      }
    }

呵呵,讀取recovered.edits下面的日誌,符合條件的就加到MemStore裡面去,完成之後,就把這些檔案刪掉。大家也看到了,這裡通篇講到一個logSeqNum,哪裡都有它的身影,它實際上是FSHLog當中的一個遞增的AtomicLong,每當往FSLog裡面寫入一條日誌的時候,它都會加一,然後MemStore請求flush的時候,會呼叫FSLog的startCacheFlush方法,獲取(logSeqNum+1)回來,然後寫入到StoreFile的sequenceid欄位,再次拿出來的時候,就遍歷這個HStore下面的StoreFile的logSeqNum,取出來最大的跟它比較,小於它的都已經寫過了,沒必要再寫了。

好了,HLog結束了,累死我了,要睡了。