hbase原始碼系列(十四)Compact和Split
先上一張圖講一下Compaction和Split的關係,這樣會比較直觀一些。
Compaction把多個MemStore flush出來的StoreFile合併成一個檔案,而Split則是把過大的檔案Split成兩個。
之前在Delete的時候,我們知道它其實並沒有真正刪除資料的,那總不能一直不刪吧,下面我們就介紹一下它刪除資料的過程,它就是Compaction。
在講原始碼之前,先說一下它的分類和作用。
Compaction主要起到如下幾個作用:
1)合併檔案
2)清除刪除、過期、多餘版本的資料
3)提高讀寫資料的效率
Minor & Major Compaction的區別
1)Minor操作只用來做部分檔案的合併操作以及包括minVersion=0並且設定ttl的過期版本清理,不做任何刪除資料、多版本資料的清理工作。
2)Major操作是對Region下的HStore下的所有StoreFile執行合併操作,最終的結果是整理合並出一個檔案。
先說一下怎麼使用吧,下面分別是它們是shell命令,可以在hbase的shell裡面執行。
//major compaction
major compact '表名或region名'
//minor compaction
compact '表名或region名'
下面我們開始看入口吧,入口在HBaseAdmin,找到compact方法,都知道我們compact可以對錶操作或者對region進行操作。
1、先把表或者region相關的region資訊和server資訊全部獲取出來
2、迴圈遍歷這些region資訊,依次請求compact操作
AdminService.BlockingInterface admin = this.connection.getAdmin(sn); CompactRegionRequest request = RequestConverter.buildCompactRegionRequest(hri.getRegionName(), major, family); try { admin.compactRegion(null, request); } catch (ServiceException se) { throw ProtobufUtil.getRemoteException(se); }
到這裡,客戶端的工作就結束了,我們直接到HRegionServer找compactRegion這個方法吧。
//major compaction多走這一步驟
if (major) {
if (family != null) {
store.triggerMajorCompaction();
} else {
region.triggerMajorCompaction();
}
}
//請求compaction走這裡
if(family != null) {
compactSplitThread.requestCompaction(region, store, log, Store.PRIORITY_USER, null);
} else {
compactSplitThread.requestCompaction(region, log, Store.PRIORITY_USER, null);
}
我們先看major compaction吧,直接去看triggerMajorCompaction和requestCompaction方法。
Compaction
進入方法裡面就發現了它把forceMajor置為true就完了,看來這個引數是major和minor的開關,接著看requestCompaction。
CompactionContext compaction = null;
if (selectNow) {
compaction = selectCompaction(r, s, priority, request);
if (compaction == null) return null; // message logged inside
}
// 要根據檔案的size來判斷用給個大的執行緒池還是小的執行緒池
long size = selectNow ? compaction.getRequest().getSize() : 0;
ThreadPoolExecutor pool = (!selectNow && s.throttleCompaction(size)) ? largeCompactions : smallCompactions;
pool.execute(new CompactionRunner(s, r, compaction, pool));
上面的步驟是執行selectCompaction建立一個CompactionContext,然後提交CompactionRunner。
我們接著看CompactionContext的建立過程吧,這裡還需要分是使用者建立的Compaction和系統建立的Compaction。
1、建立CompactionContext
2、判斷是否是非高峰時間,下面是這兩個引數的值
int startHour = conf.getInt("hbase.offpeak.start.hour", -1);
int endHour = conf.getInt("hbase.offpeak.end.hour", -1);
3、選擇需要進行compaction的檔案,新增到CompactionRequest和filesCompacting列表當中
compaction.select(this.filesCompacting, isUserCompaction, mayUseOffPeak, forceMajor && filesCompacting.isEmpty());
我們看看這個select的具體實現吧。
public boolean select(List<StoreFile> filesCompacting, boolean isUserCompaction,
boolean mayUseOffPeak, boolean forceMajor) throws IOException {
request = compactionPolicy.selectCompaction(storeFileManager.getStorefiles(),
filesCompacting, isUserCompaction, mayUseOffPeak, forceMajor);
return request != null;
}
這裡的select方法,從名字上看是壓縮策略的意思,它是由這個引數控制的hbase.hstore.defaultengine.compactionpolicy.class,預設是ExploringCompactionPolicy這個類。
接著看ExploringCompactionPolicy的selectCompaction方法,發現這個方法是繼承來的,找它的父類RatioBasedCompactionPolicy。
public CompactionRequest selectCompaction(Collection<StoreFile> candidateFiles,
final List<StoreFile> filesCompacting, final boolean isUserCompaction,
final boolean mayUseOffPeak, final boolean forceMajor) throws IOException {
ArrayList<StoreFile> candidateSelection = new ArrayList<StoreFile>(candidateFiles);
int futureFiles = filesCompacting.isEmpty() ? 0 : 1;
boolean mayBeStuck = (candidateFiles.size() - filesCompacting.size() + futureFiles)
>= storeConfigInfo.getBlockingFileCount();
//從candidateSelection排除掉filesCompacting中的檔案
candidateSelection = getCurrentEligibleFiles(candidateSelection, filesCompacting);long cfTtl = this.storeConfigInfo.getStoreFileTtl();
if (!forceMajor) {
// 如果不是強制major的話,包含了過期的檔案,先刪除過期的檔案
if (comConf.shouldDeleteExpired() && (cfTtl != Long.MAX_VALUE)) {
ArrayList<StoreFile> expiredSelection = selectExpiredStoreFiles(
candidateSelection, EnvironmentEdgeManager.currentTimeMillis() - cfTtl);
if (expiredSelection != null) {
return new CompactionRequest(expiredSelection);
}
}
//居然還要跳過大檔案,看來不是major的還是不行的,淨挑小的弄
candidateSelection = skipLargeFiles(candidateSelection);
}
// 是不是major的compaction還需要判斷,做這個操作還是比較謹慎的
boolean majorCompaction = (
(forceMajor && isUserCompaction)
|| ((forceMajor || isMajorCompaction(candidateSelection))
&& (candidateSelection.size() < comConf.getMaxFilesToCompact()))
|| StoreUtils.hasReferences(candidateSelection)
);
if (!majorCompaction) {
//過濾掉bulk load進來的檔案
candidateSelection = filterBulk(candidateSelection);
//過濾掉一些不滿足大小的檔案
candidateSelection = applyCompactionPolicy(candidateSelection, mayUseOffPeak, mayBeStuck);
//檢查檔案數是否滿足最小的要求,檔案不夠,也不做compaction
candidateSelection = checkMinFilesCriteria(candidateSelection);
}
//非major的超過最大可以compact的檔案數量也要剔除掉,major的只是警告一下
candidateSelection = removeExcessFiles(candidateSelection, isUserCompaction, majorCompaction);
CompactionRequest result = new CompactionRequest(candidateSelection);
result.setOffPeak(!candidateSelection.isEmpty() && !majorCompaction && mayUseOffPeak);
return result;
}
從上面可以看出來,major compaction的選擇檔案幾乎沒什麼限制,只要排除掉正在compacting的檔案就行了,反而是minor compact有諸多的排除選項,因為預設的compaction是定時執行的,所以它這方面的考慮吧,排除太大的檔案,選擇那些過期的檔案,排除掉bulkload的檔案等等內容。
Minor Compaction的檔案選擇策略
我們再簡單看看applyCompactionPolicy這個方法吧,它是minor的時候用的,它的過程就像下圖一樣。
這個是雙層迴圈:
從0開始,迴圈N遍(N=檔案數),就相當於視窗向右滑動,指標為start
----->從currentEnd=start + MinFiles(預設是3)-1,每次增加一個檔案作為考慮,類似擴張的動作, 視窗擴大, 指標為
-------------->從candidateSelection檔案裡面取出(start, currentEnd + 1)開始
-------------->小於最小compact數量檔案,預設是3,continue
-------------->大於最大compact數量檔案,預設是10,continue
-------------->獲取這部分檔案的大小
-------------->如果這部分檔案數量比上次選擇方案的檔案還小,替換為最小檔案方案
-------------->大於MemStore flush的大小128M並且符合有一個檔案不滿這個公式(FileSize(i) <= ( 檔案總大小- FileSize(i) ) * Ratio),continue
(注意上面的Ratio是幹嘛的,這個和前面提到的非高峰時間的數值有關係,非高峰時段這個數值是5,高峰時間段這個值是1.2, 這說明高峰時段不允許compact過大的檔案)
-------------->開始判斷是不是最優的選擇(下面講的mayBeStuck是從selectCompaction傳入的,可選擇的檔案超過7個的情況,上面黃色那部分程式碼)
1)如果mayBeStuck並且不是初次,如果 檔案平均大小 > 上次選擇的檔案的平均大小*1.05, 替換上次的選擇檔案方案成為最優解
2)初次或者不是mayBeStuck的情況,檔案更多的或者檔案相同、總檔案大小更小的會成為最新的選擇檔案方案
如果經過比較之後的最優檔案選擇方案不為空,就把它返回,否則就把最小檔案方案返回。
下面是之前的Ratio的引數值,需要配合之前提到的引數配合使用的。
hbase.hstore.compaction.ratio 高峰時段,預設值是1.2
hbase.hstore.compaction.ratio.offpeak 非高峰時段,預設值是5
到這裡先來個小結吧,從上面可以看得出來,這個Minor Compaction的檔案選擇策略就是選小的來,選最多的小檔案來合併。
選擇檔案結束,回到compact的主流程
4、把CompactionRequest放入CompactionRunner,走執行緒池提交
之前的程式碼我再貼一下,省得大家有點凌亂。
ThreadPoolExecutor pool = (!selectNow && s.throttleCompaction(size)) ? largeCompactions : smallCompactions;
pool.execute(new CompactionRunner(s, r, compaction, pool));
我們去看CompactionRunner的run方法吧,它也在當前的類裡面。
if (this.compaction == null) {this.compaction = selectCompaction(this.region, this.store, queuedPriority, null);
// 出口,實在選不出東西來了,它會走這裡跑掉
if (this.compaction == null) return;
// ....還有別的限制,和父親執行的執行緒池也要一致,尼瑪,什麼邏輯
}
boolean completed = region.compact(compaction, store);if (completed) {
// blocked的regions再來一次,這次又要一次compaction意欲何為啊
// 其實它的出口在上面的那段程式碼,它執行之後,沒有這裡這麼噁心
if (store.getCompactPriority() <= 0) {
requestSystemCompaction(region, store, "Recursive enqueue");
} else {
// compaction之後的region可能很大,超過split的數量就要split了
requestSplit(region);
}
先是對region進行compact,如果完成了,判斷一下優先順序,優先順序小於等於0,請求系統級別的compaction,否則請求split。
我們還是先看HRegion的compact方法,compact開始前,它要先上讀鎖,不讓讀了,然後呼叫HStore中的compact方法。
// 執行compact,生成新檔案
List<Path> newFiles = compaction.compact();
//把compact生成的檔案移動到正確的位置
sfs = moveCompatedFilesIntoPlace(cr, newFiles);
//記錄WALEdit日誌
writeCompactionWalRecord(filesToCompact, sfs);
//更新HStore相關的資料結構
replaceStoreFiles(filesToCompact, sfs);/
/歸檔舊的檔案,關閉reader,重新計算file的大小
completeCompaction(filesToCompact);
comact生成新檔案的方法很簡單,給原始檔建立一個StoreScanner,之前說過StoreScanner能從多個Scanner當中每次都取出最小的kv,然後用StoreFile.Append的方法不停地追加寫入即可,這些過程在前面的章節都介紹過了,這裡不再重複。
簡單的說,就是把這些檔案合併到一個檔案去了,尼瑪,怪不得io那麼大。
剩下的就是清理工作了,這裡面有意思的就是它會記錄一筆日誌到writeCompactionWalRecord當中,在之間日誌恢復那一章的時候,貼出來的程式碼裡面有,只是沒有詳細的講。因為走到這裡它已經完成了compaction的過程,只是沒有把舊的檔案移入歸檔檔案當中,它掛掉重啟的時候進行恢復乾的事情,就是替換檔案。
5、store.getCompactPriority() 下一步是天堂抑或是地獄?
compact完了,要判斷一下這個,真是天才啊
public int getStoreCompactionPriority() {
int blockingFileCount = conf.getInt(
HStore.BLOCKING_STOREFILES_KEY, HStore.DEFAULT_BLOCKING_STOREFILE_COUNT);
int priority = blockingFileCount - storefiles.size();
return (priority == HStore.PRIORITY_USER) ? priority + 1 : priority;
}
比較方法是這個,blockingFileCount的預設值是7,如果compact之後storefiles的檔案數量大於7的話,就很有可能再觸發一下,那麼major compaction觸發的可能性低,minor觸發的可能性非常大。
不過沒關係,實在選不出檔案來,它會退出的。咱們可以將它這個引數hbase.hstore.blockingStoreFiles設定得大一些,弄出來一個比較大的數字。
Split
好,我們接著看requestSplit。
if (shouldSplitRegion() && r.getCompactPriority() >= Store.PRIORITY_USER) {
byte[] midKey = r.checkSplit();
if (midKey != null) {
requestSplit(r, midKey);
return true;
}
}
先檢查一下是否可以進行split,如果可以,把中間的key返回來。
那條件是啥?在這裡,if的條件是成立的,條件判斷在IncreasingToUpperBoundRegionSplitPolicy的shouldSplit方法當中。
遍歷region裡面所有的store
1、Store當中不能有Reference檔案。
2、store.size > Math.min(getDesiredMaxFileSize(), this.flushSize * (tableRegionsCount * (long)tableRegionsCount)) 就返回ture,可以split。
getDesiredMaxFileSize()預設是10G,由這個引數來確定hbase.hregion.max.filesize, 當沒超過10G的時候它就會根據128MB * (該表在這個RS上的region數量)平方。
midKey怎麼找呢?找出最大的HStore,然後通過它來找這個分裂點,最大的檔案的中間點。
return StoreUtils.getLargestFile(this.storefiles).getFileSplitPoint(this.kvComparator);
但是如果是另外一種情況,我們通過客戶端來分裂Region,我們強制指定的分裂點,這種情況是按照我們設定的分裂點來進行分裂。
分裂點有了,我們接著看,我們發現它又提交了一個SplitRequest執行緒,看run方法。
1、先獲得一個tableLock,給這個表上鎖
2、執行SplitTransaction的prepare方法,然後execute
3、結束了釋放tableLock
// 先做準備工作,然後再execute執行主流程,過程當中出錯了,就rollback
if (!st.prepare()) return;
try {
st.execute(this.server, this.server);
} catch (Exception e) {
try {
if (st.rollback(this.server, this.server)) {
} catch (RuntimeException ee) {this.server.abort(msg);
}
return;
}
prepare方法當中,主要做了這麼件事,new了兩個新的region出來
this.hri_a = new HRegionInfo(hri.getTable(), startKey, this.splitrow, false, rid);
this.hri_b = new HRegionInfo(hri.getTable(), this.splitrow, endKey, false, rid);
我們接著看execute方法,這個是重頭戲。
PairOfSameType<HRegion> regions = createDaughters(server, services);
openDaughters(server, services, regions.getFirst(), regions.getSecond());
transitionZKNode(server, services, regions.getFirst(), regions.getSecond());
總共分三步:
1、建立子region
2、上線子region
3、更改zk當中的狀態
我們先看createDaughters
//在region-in-transition節點下給父region建立一個splitting的節點
createNodeSplitting(server.getZooKeeper(), parent.getRegionInfo(), server.getServerName(), hri_a, hri_b);
this.journal.add(JournalEntry.SET_SPLITTING_IN_ZK);//在parent的region目錄下建立.splits目錄
this.parent.getRegionFileSystem().createSplitsDir();
this.journal.add(JournalEntry.CREATE_SPLIT_DIR);
Map<byte[], List<StoreFile>> hstoreFilesToSplit = null;
//關閉parent,然後返回相應的列族和storefile的map
hstoreFilesToSplit = this.parent.close(false);
//從線上列表裡下線parent
services.removeFromOnlineRegions(this.parent, null);
this.journal.add(JournalEntry.OFFLINED_PARENT);
// 把parent的storefile均分給兩個daughter,所謂均分,只是建立引用檔案而已
splitStoreFiles(hstoreFilesToSplit);
// 把臨時的Region A目錄重名為正式的region A 的目錄
this.journal.add(JournalEntry.STARTED_REGION_A_CREATION);
HRegion a = this.parent.createDaughterRegionFromSplits(this.hri_a);
// 把臨時的Region B目錄重名為正式的region B的目錄
this.journal.add(JournalEntry.STARTED_REGION_B_CREATION);
HRegion b = this.parent.createDaughterRegionFromSplits(this.hri_b);
this.journal.add(JournalEntry.PONR);
// 修改meta表中的資訊,設定parent的狀態為下線、並且split過,在增加兩列左右孩子,左右孩子的資訊也通過put插入到meta中
MetaEditor.splitRegion(server.getCatalogTracker(), parent.getRegionInfo(),
a.getRegionInfo(), b.getRegionInfo(), server.getServerName());
return new PairOfSameType<HRegion>(a, b);
在splitStoreFiles這塊的,它給每個檔案都開一個執行緒去進行split。
fs.splitStoreFile(this.hri_a, familyName, sf, this.splitrow, false);
fs.splitStoreFile(this.hri_b, familyName, sf, this.splitrow, true);
這裡其實是給每個檔案都建立了Reference檔案,無論它的檔案當中包不包括splitRow。
//parentRegion/.splits/region/familyName目錄
Path splitDir = new Path(getSplitsDir(hri), familyName);
// 其實它並沒有真正的split,而是通過建立Reference
Reference r = top ? Reference.createTopReference(splitRow): Reference.createBottomReference(splitRow);
String parentRegionName = regionInfo.getEncodedName();
// 原來通過這麼關聯啊,storefile名字 + 父parent的name
Path p = new Path(splitDir, f.getPath().getName() + "." + parentRegionName);
return r.write(fs, p);
把引用檔案生成在每個子region對應的目錄,以便下一步直接重命令目錄即可。
重新命名目錄之後,就是修改Meta表了,splitRegion的方法是通過Put來進行操作的,它修改parent的regioninfo這一列更新為最新的資訊,另外又增加了splitA和splitB兩列,hri_a和hri_b則通過另外兩個Put插入到Meta表當中。
這個過程當中如果出現任何問題,就需要根據journal記錄的過程資訊進行回滾操作。
怎麼open這兩個子region就不講了,之前講《HMaster啟動過程》的時候講過了。
到這裡split的過程就基本結束了,鑑於Compaction和Split的對io方面的巨大影響,所以在任何資料裡面都是推薦遮蔽自動執行,寫指令碼在晚上自動進行這些操作。