1. 程式人生 > 其它 >hbase原始碼系列(十四)Compact和Split

hbase原始碼系列(十四)Compact和Split

先上一張圖講一下Compaction和Split的關係,這樣會比較直觀一些。

Compaction把多個MemStore flush出來的StoreFile合併成一個檔案,而Split則是把過大的檔案Split成兩個。

之前在Delete的時候,我們知道它其實並沒有真正刪除資料的,那總不能一直不刪吧,下面我們就介紹一下它刪除資料的過程,它就是Compaction。

在講原始碼之前,先說一下它的分類和作用。

Compaction主要起到如下幾個作用:

1)合併檔案

2)清除刪除、過期、多餘版本的資料

3)提高讀寫資料的效率

Minor & Major Compaction的區別

1)Minor操作只用來做部分檔案的合併操作以及包括minVersion=0並且設定ttl的過期版本清理,不做任何刪除資料、多版本資料的清理工作。

2)Major操作是對Region下的HStore下的所有StoreFile執行合併操作,最終的結果是整理合並出一個檔案。

先說一下怎麼使用吧,下面分別是它們是shell命令,可以在hbase的shell裡面執行。

//major compaction
major compact '表名或region名'

//minor compaction
compact '表名或region名'

下面我們開始看入口吧,入口在HBaseAdmin,找到compact方法,都知道我們compact可以對錶操作或者對region進行操作。

1、先把表或者region相關的region資訊和server資訊全部獲取出來

2、迴圈遍歷這些region資訊,依次請求compact操作

AdminService.BlockingInterface admin = this.connection.getAdmin(sn);
CompactRegionRequest request = RequestConverter.buildCompactRegionRequest(hri.getRegionName(), major, family);
try {
      admin.compactRegion(null, request);
} catch (ServiceException se) {
      throw ProtobufUtil.getRemoteException(se);
}

到這裡,客戶端的工作就結束了,我們直接到HRegionServer找compactRegion這個方法吧。

    //major compaction多走這一步驟
      if (major) {
        if (family != null) {
          store.triggerMajorCompaction();
        } else {
          region.triggerMajorCompaction();
        }
      }
    //請求compaction走這裡
      if(family != null) {
        compactSplitThread.requestCompaction(region, store, log, Store.PRIORITY_USER, null);
      } else {
        compactSplitThread.requestCompaction(region, log, Store.PRIORITY_USER, null);
      }

我們先看major compaction吧,直接去看triggerMajorCompaction和requestCompaction方法。

Compaction

進入方法裡面就發現了它把forceMajor置為true就完了,看來這個引數是major和minor的開關,接著看requestCompaction。

CompactionContext compaction = null;
if (selectNow) {
    compaction = selectCompaction(r, s, priority, request);
    if (compaction == null) return null; // message logged inside
}
// 要根據檔案的size來判斷用給個大的執行緒池還是小的執行緒池
long size = selectNow ? compaction.getRequest().getSize() : 0;
ThreadPoolExecutor pool = (!selectNow && s.throttleCompaction(size)) ? largeCompactions : smallCompactions;
pool.execute(new CompactionRunner(s, r, compaction, pool));

上面的步驟是執行selectCompaction建立一個CompactionContext,然後提交CompactionRunner。

我們接著看CompactionContext的建立過程吧,這裡還需要分是使用者建立的Compaction和系統建立的Compaction。

1、建立CompactionContext

2、判斷是否是非高峰時間,下面是這兩個引數的值

int startHour = conf.getInt("hbase.offpeak.start.hour", -1);
int endHour = conf.getInt("hbase.offpeak.end.hour", -1);

3、選擇需要進行compaction的檔案,新增到CompactionRequest和filesCompacting列表當中

compaction.select(this.filesCompacting, isUserCompaction, mayUseOffPeak, forceMajor && filesCompacting.isEmpty());

我們看看這個select的具體實現吧。

public boolean select(List<StoreFile> filesCompacting, boolean isUserCompaction,
        boolean mayUseOffPeak, boolean forceMajor) throws IOException {
      request = compactionPolicy.selectCompaction(storeFileManager.getStorefiles(),
          filesCompacting, isUserCompaction, mayUseOffPeak, forceMajor);
      return request != null;
}

這裡的select方法,從名字上看是壓縮策略的意思,它是由這個引數控制的hbase.hstore.defaultengine.compactionpolicy.class,預設是ExploringCompactionPolicy這個類。

接著看ExploringCompactionPolicy的selectCompaction方法,發現這個方法是繼承來的,找它的父類RatioBasedCompactionPolicy。

public CompactionRequest selectCompaction(Collection<StoreFile> candidateFiles,
      final List<StoreFile> filesCompacting, final boolean isUserCompaction,
      final boolean mayUseOffPeak, final boolean forceMajor) throws IOException {
    ArrayList<StoreFile> candidateSelection = new ArrayList<StoreFile>(candidateFiles);
    int futureFiles = filesCompacting.isEmpty() ? 0 : 1;
    boolean mayBeStuck = (candidateFiles.size() - filesCompacting.size() + futureFiles)
        >= storeConfigInfo.getBlockingFileCount();
    //從candidateSelection排除掉filesCompacting中的檔案
    candidateSelection = getCurrentEligibleFiles(candidateSelection, filesCompacting);long cfTtl = this.storeConfigInfo.getStoreFileTtl();
    if (!forceMajor) {
      // 如果不是強制major的話,包含了過期的檔案,先刪除過期的檔案
      if (comConf.shouldDeleteExpired() && (cfTtl != Long.MAX_VALUE)) {
        ArrayList<StoreFile> expiredSelection = selectExpiredStoreFiles(
            candidateSelection, EnvironmentEdgeManager.currentTimeMillis() - cfTtl);
        if (expiredSelection != null) {
          return new CompactionRequest(expiredSelection);
        }
      }
      //居然還要跳過大檔案,看來不是major的還是不行的,淨挑小的弄
      candidateSelection = skipLargeFiles(candidateSelection);
    }
    // 是不是major的compaction還需要判斷,做這個操作還是比較謹慎的
    boolean majorCompaction = (
      (forceMajor && isUserCompaction)
      || ((forceMajor || isMajorCompaction(candidateSelection))
          && (candidateSelection.size() < comConf.getMaxFilesToCompact()))
      || StoreUtils.hasReferences(candidateSelection)
      );

    if (!majorCompaction) {
     //過濾掉bulk load進來的檔案
      candidateSelection = filterBulk(candidateSelection);
      //過濾掉一些不滿足大小的檔案
      candidateSelection = applyCompactionPolicy(candidateSelection, mayUseOffPeak, mayBeStuck);
      //檢查檔案數是否滿足最小的要求,檔案不夠,也不做compaction
      candidateSelection = checkMinFilesCriteria(candidateSelection);
    }
    //非major的超過最大可以compact的檔案數量也要剔除掉,major的只是警告一下
    candidateSelection = removeExcessFiles(candidateSelection, isUserCompaction, majorCompaction);
    CompactionRequest result = new CompactionRequest(candidateSelection);
    result.setOffPeak(!candidateSelection.isEmpty() && !majorCompaction && mayUseOffPeak);
    return result;
  }

從上面可以看出來,major compaction的選擇檔案幾乎沒什麼限制,只要排除掉正在compacting的檔案就行了,反而是minor compact有諸多的排除選項,因為預設的compaction是定時執行的,所以它這方面的考慮吧,排除太大的檔案,選擇那些過期的檔案,排除掉bulkload的檔案等等內容。

Minor Compaction的檔案選擇策略

我們再簡單看看applyCompactionPolicy這個方法吧,它是minor的時候用的,它的過程就像下圖一樣。

這個是雙層迴圈: 

從0開始,迴圈N遍(N=檔案數),就相當於視窗向右滑動,指標為start

----->從currentEnd=start + MinFiles(預設是3)-1,每次增加一個檔案作為考慮,類似擴張的動作, 視窗擴大, 指標為

-------------->從candidateSelection檔案裡面取出(start, currentEnd + 1)開始

-------------->小於最小compact數量檔案,預設是3,continue

-------------->大於最大compact數量檔案,預設是10,continue

-------------->獲取這部分檔案的大小

-------------->如果這部分檔案數量比上次選擇方案的檔案還小,替換為最小檔案方案

-------------->大於MemStore flush的大小128M並且符合有一個檔案不滿這個公式(FileSize(i) <= ( 檔案總大小- FileSize(i) ) * Ratio),continue

       (注意上面的Ratio是幹嘛的,這個和前面提到的非高峰時間的數值有關係,非高峰時段這個數值是5,高峰時間段這個值是1.2, 這說明高峰時段不允許compact過大的檔案)

-------------->開始判斷是不是最優的選擇(下面講的mayBeStuck是從selectCompaction傳入的,可選擇的檔案超過7個的情況,上面黃色那部分程式碼)

          1)如果mayBeStuck並且不是初次,如果 檔案平均大小 > 上次選擇的檔案的平均大小*1.05, 替換上次的選擇檔案方案成為最優解

          2)初次或者不是mayBeStuck的情況,檔案更多的或者檔案相同、總檔案大小更小的會成為最新的選擇檔案方案

如果經過比較之後的最優檔案選擇方案不為空,就把它返回,否則就把最小檔案方案返回。

下面是之前的Ratio的引數值,需要配合之前提到的引數配合使用的。

hbase.hstore.compaction.ratio              高峰時段,預設值是1.2
hbase.hstore.compaction.ratio.offpeak      非高峰時段,預設值是5

到這裡先來個小結吧,從上面可以看得出來,這個Minor Compaction的檔案選擇策略就是選小的來,選最多的小檔案來合併。

選擇檔案結束,回到compact的主流程

4、把CompactionRequest放入CompactionRunner,走執行緒池提交

之前的程式碼我再貼一下,省得大家有點凌亂。

ThreadPoolExecutor pool = (!selectNow && s.throttleCompaction(size)) ? largeCompactions : smallCompactions;
pool.execute(new CompactionRunner(s, r, compaction, pool));

我們去看CompactionRunner的run方法吧,它也在當前的類裡面。

      if (this.compaction == null) {this.compaction = selectCompaction(this.region, this.store, queuedPriority, null); 
     // 出口,實在選不出東西來了,它會走這裡跑掉
     if (this.compaction == null) return;
     // ....還有別的限制,和父親執行的執行緒池也要一致,尼瑪,什麼邏輯  
    }
        
    boolean completed = region.compact(compaction, store);if (completed) {
       // blocked的regions再來一次,這次又要一次compaction意欲何為啊
       // 其實它的出口在上面的那段程式碼,它執行之後,沒有這裡這麼噁心
       if (store.getCompactPriority() <= 0) {
           requestSystemCompaction(region, store, "Recursive enqueue");
       } else {
         // compaction之後的region可能很大,超過split的數量就要split了
         requestSplit(region);
       }
      

先是對region進行compact,如果完成了,判斷一下優先順序,優先順序小於等於0,請求系統級別的compaction,否則請求split。

我們還是先看HRegion的compact方法,compact開始前,它要先上讀鎖,不讓讀了,然後呼叫HStore中的compact方法。

     // 執行compact,生成新檔案
      List<Path> newFiles = compaction.compact();
      //把compact生成的檔案移動到正確的位置
      sfs = moveCompatedFilesIntoPlace(cr, newFiles);
      //記錄WALEdit日誌
      writeCompactionWalRecord(filesToCompact, sfs);
      //更新HStore相關的資料結構
      replaceStoreFiles(filesToCompact, sfs);/
      /歸檔舊的檔案,關閉reader,重新計算file的大小
      completeCompaction(filesToCompact);

comact生成新檔案的方法很簡單,給原始檔建立一個StoreScanner,之前說過StoreScanner能從多個Scanner當中每次都取出最小的kv,然後用StoreFile.Append的方法不停地追加寫入即可,這些過程在前面的章節都介紹過了,這裡不再重複。

簡單的說,就是把這些檔案合併到一個檔案去了,尼瑪,怪不得io那麼大。

剩下的就是清理工作了,這裡面有意思的就是它會記錄一筆日誌到writeCompactionWalRecord當中,在之間日誌恢復那一章的時候,貼出來的程式碼裡面有,只是沒有詳細的講。因為走到這裡它已經完成了compaction的過程,只是沒有把舊的檔案移入歸檔檔案當中,它掛掉重啟的時候進行恢復乾的事情,就是替換檔案。

5、store.getCompactPriority() 下一步是天堂抑或是地獄?

compact完了,要判斷一下這個,真是天才啊

public int getStoreCompactionPriority() {
    int blockingFileCount = conf.getInt(
        HStore.BLOCKING_STOREFILES_KEY, HStore.DEFAULT_BLOCKING_STOREFILE_COUNT);
    int priority = blockingFileCount - storefiles.size();
    return (priority == HStore.PRIORITY_USER) ? priority + 1 : priority;
}

比較方法是這個,blockingFileCount的預設值是7,如果compact之後storefiles的檔案數量大於7的話,就很有可能再觸發一下,那麼major compaction觸發的可能性低,minor觸發的可能性非常大。

不過沒關係,實在選不出檔案來,它會退出的。咱們可以將它這個引數hbase.hstore.blockingStoreFiles設定得大一些,弄出來一個比較大的數字。

Split

好,我們接著看requestSplit。

if (shouldSplitRegion() && r.getCompactPriority() >= Store.PRIORITY_USER) {
      byte[] midKey = r.checkSplit();
      if (midKey != null) {
        requestSplit(r, midKey);
        return true;
      }
}

先檢查一下是否可以進行split,如果可以,把中間的key返回來。

那條件是啥?在這裡,if的條件是成立的,條件判斷在IncreasingToUpperBoundRegionSplitPolicy的shouldSplit方法當中。

遍歷region裡面所有的store

1、Store當中不能有Reference檔案。

2、store.size > Math.min(getDesiredMaxFileSize(), this.flushSize * (tableRegionsCount * (long)tableRegionsCount)) 就返回ture,可以split。

getDesiredMaxFileSize()預設是10G,由這個引數來確定hbase.hregion.max.filesize, 當沒超過10G的時候它就會根據128MB * (該表在這個RS上的region數量)平方。

midKey怎麼找呢?找出最大的HStore,然後通過它來找這個分裂點,最大的檔案的中間點。

return StoreUtils.getLargestFile(this.storefiles).getFileSplitPoint(this.kvComparator);

但是如果是另外一種情況,我們通過客戶端來分裂Region,我們強制指定的分裂點,這種情況是按照我們設定的分裂點來進行分裂。

分裂點有了,我們接著看,我們發現它又提交了一個SplitRequest執行緒,看run方法。

1、先獲得一個tableLock,給這個表上鎖

2、執行SplitTransaction的prepare方法,然後execute

3、結束了釋放tableLock

      // 先做準備工作,然後再execute執行主流程,過程當中出錯了,就rollback
      if (!st.prepare()) return;
      try {
        st.execute(this.server, this.server);
      } catch (Exception e) {
        try {
        if (st.rollback(this.server, this.server)) {
        } catch (RuntimeException ee) {this.server.abort(msg);
        }
        return;
      }

prepare方法當中,主要做了這麼件事,new了兩個新的region出來

this.hri_a = new HRegionInfo(hri.getTable(), startKey, this.splitrow, false, rid);
this.hri_b = new HRegionInfo(hri.getTable(), this.splitrow, endKey, false, rid);

我們接著看execute方法,這個是重頭戲。

PairOfSameType<HRegion> regions = createDaughters(server, services);
openDaughters(server, services, regions.getFirst(), regions.getSecond());
transitionZKNode(server, services, regions.getFirst(), regions.getSecond());

總共分三步:

1、建立子region

2、上線子region

3、更改zk當中的狀態

我們先看createDaughters

    //在region-in-transition節點下給父region建立一個splitting的節點
    createNodeSplitting(server.getZooKeeper(), parent.getRegionInfo(), server.getServerName(), hri_a, hri_b);
    this.journal.add(JournalEntry.SET_SPLITTING_IN_ZK);//在parent的region目錄下建立.splits目錄
    this.parent.getRegionFileSystem().createSplitsDir();
    this.journal.add(JournalEntry.CREATE_SPLIT_DIR);

    Map<byte[], List<StoreFile>> hstoreFilesToSplit = null;
    //關閉parent,然後返回相應的列族和storefile的map
    hstoreFilesToSplit = this.parent.close(false);
    //從線上列表裡下線parent
    services.removeFromOnlineRegions(this.parent, null);
    this.journal.add(JournalEntry.OFFLINED_PARENT);
    // 把parent的storefile均分給兩個daughter,所謂均分,只是建立引用檔案而已
    splitStoreFiles(hstoreFilesToSplit);

    // 把臨時的Region A目錄重名為正式的region A 的目錄    
    this.journal.add(JournalEntry.STARTED_REGION_A_CREATION);
    HRegion a = this.parent.createDaughterRegionFromSplits(this.hri_a);

    // 把臨時的Region B目錄重名為正式的region B的目錄
    this.journal.add(JournalEntry.STARTED_REGION_B_CREATION);
    HRegion b = this.parent.createDaughterRegionFromSplits(this.hri_b);
    this.journal.add(JournalEntry.PONR);

    // 修改meta表中的資訊,設定parent的狀態為下線、並且split過,在增加兩列左右孩子,左右孩子的資訊也通過put插入到meta中
    MetaEditor.splitRegion(server.getCatalogTracker(), parent.getRegionInfo(),
          a.getRegionInfo(), b.getRegionInfo(), server.getServerName());
    return new PairOfSameType<HRegion>(a, b);

在splitStoreFiles這塊的,它給每個檔案都開一個執行緒去進行split。

fs.splitStoreFile(this.hri_a, familyName, sf, this.splitrow, false);
fs.splitStoreFile(this.hri_b, familyName, sf, this.splitrow, true);

這裡其實是給每個檔案都建立了Reference檔案,無論它的檔案當中包不包括splitRow。

    //parentRegion/.splits/region/familyName目錄
    Path splitDir = new Path(getSplitsDir(hri), familyName);
    // 其實它並沒有真正的split,而是通過建立Reference
    Reference r = top ? Reference.createTopReference(splitRow): Reference.createBottomReference(splitRow);
    String parentRegionName = regionInfo.getEncodedName();
    // 原來通過這麼關聯啊,storefile名字 + 父parent的name
    Path p = new Path(splitDir, f.getPath().getName() + "." + parentRegionName);
    return r.write(fs, p);

把引用檔案生成在每個子region對應的目錄,以便下一步直接重命令目錄即可。

重新命名目錄之後,就是修改Meta表了,splitRegion的方法是通過Put來進行操作的,它修改parent的regioninfo這一列更新為最新的資訊,另外又增加了splitA和splitB兩列,hri_a和hri_b則通過另外兩個Put插入到Meta表當中。

這個過程當中如果出現任何問題,就需要根據journal記錄的過程資訊進行回滾操作。

怎麼open這兩個子region就不講了,之前講《HMaster啟動過程》的時候講過了。

到這裡split的過程就基本結束了,鑑於Compaction和Split的對io方面的巨大影響,所以在任何資料裡面都是推薦遮蔽自動執行,寫指令碼在晚上自動進行這些操作。