HBase原始碼分析之HRegion上compact流程分析(二)
2016年03月03日 21:38:04 辰辰爸的技術部落格 閱讀數:2767
版權宣告:本文為博主原創文章,未經博主允許不得轉載。 https://blog.csdn.net/lipeng_bigdata/article/details/50791205
繼《HBase原始碼分析之HRegion上compact流程分析(一)》一文後,我們繼續HRegion上compact流程分析,接下來要講的是針對表中某個列簇下檔案的合併,即HStore的compact()方法,程式碼如下:
-
/**
-
* Compact the StoreFiles. This method may take some time, so the calling
-
* thread must be able to block for long periods.
-
*
-
* 合併儲存檔案。該方法可能花費一些時間,
-
*
-
* <p>During this time, the Store can work as usual, getting values from
-
* StoreFiles and writing new StoreFiles from the memstore.
-
* 在此期間,Store仍能像往常一樣工作,從StoreFiles獲取資料和從memstore寫入新的StoreFiles
-
*
-
* Existing StoreFiles are not destroyed until the new compacted StoreFile is
-
* completely written-out to disk.
-
*
-
* <p>The compactLock prevents multiple simultaneous compactions.
-
* The structureLock prevents us from interfering with other write operations.
-
*
-
* <p>We don't want to hold the structureLock for the whole time, as a compact()
-
* can be lengthy and we want to allow cache-flushes during this period.
-
*
-
* <p> Compaction event should be idempotent, since there is no IO Fencing for
-
* the region directory in hdfs. A region server might still try to complete the
-
* compaction after it lost the region. That is why the following events are carefully
-
* ordered for a compaction:
-
* 1. Compaction writes new files under region/.tmp directory (compaction output)
-
* 2. Compaction atomically moves the temporary file under region directory
-
* 3. Compaction appends a WAL edit containing the compaction input and output files.
-
* Forces sync on WAL.
-
* 4. Compaction deletes the input files from the region directory.
-
*
-
* Failure conditions are handled like this:
-
* - If RS fails before 2, compaction wont complete. Even if RS lives on and finishes
-
* the compaction later, it will only write the new data file to the region directory.
-
* Since we already have this data, this will be idempotent but we will have a redundant
-
* copy of the data.
-
* - If RS fails between 2 and 3, the region will have a redundant copy of the data. The
-
* RS that failed won't be able to finish snyc() for WAL because of lease recovery in WAL.
-
* - If RS fails after 3, the region region server who opens the region will pick up the
-
* the compaction marker from the WAL and replay it by removing the compaction input files.
-
* Failed RS can also attempt to delete those files, but the operation will be idempotent
-
*
-
* See HBASE-2231 for details.
-
*
-
* @param compaction compaction details obtained from requestCompaction()
-
* @throws IOException
-
* @return Storefile we compacted into or null if we failed or opted out early.
-
*/
-
@Override
-
public List<StoreFile> compact(CompactionContext compaction) throws IOException {
-
assert compaction != null;
-
List<StoreFile> sfs = null;
-
// 從合併上下文CompactionContext中獲得合併請求CompactionRequest,即cr
-
CompactionRequest cr = compaction.getRequest();;
-
try {
-
// Do all sanity checking in here if we have a valid CompactionRequest
-
// because we need to clean up after it on the way out in a finally
-
// block below
-
//
-
// 獲取compact開始時間compactionStartTime
-
long compactionStartTime = EnvironmentEdgeManager.currentTime();
-
// 確保合併請求request不為空,實際上getRequest已經判斷並確保request不為空了,這裡為什麼還要再做判斷和保證呢?先留個小小的疑問吧!
-
assert compaction.hasSelection();
-
// 從合併請求cr中獲得需要合併的檔案集合filesToCompact,集合中儲存的都是儲存檔案StoreFile的例項
-
// 這個檔案集合是在構造CompactionRequest請求,或者合併其他請求時,根據傳入的引數或者其他請求中附帶的檔案集合來確定的,
-
// 即請求一旦生成,需要合併的檔案集合filesToCompact就會存在
-
Collection<StoreFile> filesToCompact = cr.getFiles();
-
// 確保需要合併的檔案集合filesToCompact不為空
-
assert !filesToCompact.isEmpty();
-
// 確保filesCompacting中包含所有的待合併檔案filesToCompact
-
synchronized (filesCompacting) {
-
// sanity check: we're compacting files that this store knows about
-
// TODO: change this to LOG.error() after more debugging
-
Preconditions.checkArgument(filesCompacting.containsAll(filesToCompact));
-
}
-
// Ready to go. Have list of files to compact.
-
LOG.info("Starting compaction of " + filesToCompact.size() + " file(s) in "
-
+ this + " of " + this.getRegionInfo().getRegionNameAsString()
-
+ " into tmpdir=" + fs.getTempDir() + ", totalSize="
-
+ StringUtils.humanReadableInt(cr.getSize()));
-
// Commence the compaction.
-
// 開始合併,呼叫CompactionContext的compact()方法,獲得合併後的新檔案newFiles
-
List<Path> newFiles = compaction.compact();
-
// TODO: get rid of this!
-
// 根據引數hbase.hstore.compaction.complete確實是否要完整的完成compact
-
// 這裡有意思,這麼處理意味著,新舊檔案同時存在,新檔案沒有被挪到指定位置且新檔案的Reader被關閉,對外提供服務的還是舊檔案,啥目的呢?快速應用於讀?
-
if (!this.conf.getBoolean("hbase.hstore.compaction.complete", true)) {
-
LOG.warn("hbase.hstore.compaction.complete is set to false");
-
// 建立StoreFile列表sfs,大小為newFiles的大小
-
sfs = new ArrayList<StoreFile>(newFiles.size());
-
// 遍歷新產生的合併後的檔案newFiles,針對每個檔案建立StoreFile和Reader,關閉StoreFile上的Reader,
-
// 並將建立的StoreFile新增至列表sfs
-
for (Path newFile : newFiles) {
-
// Create storefile around what we wrote with a reader on it.
-
StoreFile sf = createStoreFileAndReader(newFile);
-
// 關閉其上的Reader
-
sf.closeReader(true);
-
sfs.add(sf);
-
}
-
// 返回合併後的檔案
-
return sfs;
-
}
-
// Do the steps necessary to complete the compaction.
-
// 執行必要的步驟以完成這個合併
-
// 移動已完成檔案至正確的地方,建立StoreFile和Reader,返回StoreFile列表sfs
-
sfs = moveCompatedFilesIntoPlace(cr, newFiles);
-
// 在WAL中寫入Compaction記錄
-
writeCompactionWalRecord(filesToCompact, sfs);
-
// 替換StoreFiles:
-
// 1、去除掉所有的合併前,即已被合併的檔案compactedFiles,將合併後的檔案sfs加入到StoreFileManager的storefiles中去,
-
// storefiles為Store中目前全部提供服務的儲存檔案列表;
-
// 2、正在合併的檔案列表filesCompacting中去除被合併的檔案filesToCompact;
-
replaceStoreFiles(filesToCompact, sfs);
-
// 根據合併的型別,針對不同的計數器做累加,方便系統性能指標監控
-
if (cr.isMajor()) {// 如果是Major合併
-
// 計數器累加,包括條數和大小
-
majorCompactedCellsCount += getCompactionProgress().totalCompactingKVs;
-
majorCompactedCellsSize += getCompactionProgress().totalCompactedSize;
-
} else {// 如果不是Major合併
-
// 計數器累加,包括條數和大小
-
compactedCellsCount += getCompactionProgress().totalCompactingKVs;
-
compactedCellsSize += getCompactionProgress().totalCompactedSize;
-
}
-
// At this point the store will use new files for all new scanners.
-
// 至此,store將會為所有新的scanners使用新的檔案
-
// 完成合並:歸檔舊檔案(在檔案系統中刪除已被合併的檔案compactedFiles,實際上是歸檔操作,將舊的檔案從原位置移到歸檔目錄下),關閉其上的Reader,並更新store大小
-
completeCompaction(filesToCompact, true); // Archive old files & update store size.
-
// 記錄日誌資訊
-
logCompactionEndMessage(cr, sfs, compactionStartTime);
-
// 返回StoreFile列表sfs
-
return sfs;
-
} finally {
-
// 完成Compaction請求:Region彙報合併請求至終端、filesCompacting中刪除請求中的所有待合併檔案
-
finishCompactionRequest(cr);
-
}
-
}
下面,我們來概述下整個流程:
1、首先,從合併上下文CompactionContext中獲得合併請求CompactionRequest,即cr;
2、獲取compact開始時間compactionStartTime;
3、確保合併請求request不為空:
實際上getRequest已經判斷並確保request不為空了,這裡為什麼還要再做判斷和保證呢?先留個小小的疑問吧!
4、從合併請求cr中獲得需要合併的檔案集合filesToCompact:
集合中儲存的都是儲存檔案StoreFile的例項,這個檔案集合是在構造CompactionRequest請求,或者合併其他請求時,根據傳入的引數或者其他請求中附帶的檔案集合來確定的,即請求一旦生成,需要合併的檔案集合filesToCompact就會存在。
5、確保需要合併的檔案集合filesToCompact不為空;
6、確保filesCompacting中包含所有的待合併檔案filesToCompact:
那麼這個filesCompacting中的檔案是何時新增的呢?
7、開始合併,呼叫CompactionContext的compact()方法,獲得合併後的新檔案newFiles:
這一步是核心流程,它會持有通過scanner訪問待合併檔案,然後將資料全部寫入新檔案,後續文章會著重分析。
8、根據引數hbase.hstore.compaction.complete確實是否要完整的完成compact,預設為true:
8.1、如果配置的是false,則:
8.1.1、建立StoreFile列表sfs,大小為newFiles的大小;
8.1.2、遍歷新產生的合併後的檔案newFiles,針對每個檔案建立StoreFile和Reader,關閉StoreFile上的Reader,並將建立的StoreFile新增至列表sfs;
8.1.3、返回合併後的檔案列表sfs;
8.2、如果配置的是true,則:
8.2.1、移動已完成檔案至正確的地方,建立StoreFile和Reader,返回StoreFile列表sfs;
8.2.2、在WAL中寫入Compaction記錄;
8.2.3、替換StoreFiles:包括去除掉所有的合併前,即已被合併的檔案compactedFiles,將合併後的檔案sfs加入到StoreFileManager的storefiles中去,storefiles為Store中目前全部提供服務的儲存檔案列表,還有正在合併的檔案列表filesCompacting中去除被合併的檔案filesToCompact。
8.2.4、根據合併的型別,針對不同的計數器做累加,方便系統性能指標監控;
8.2.5、完成合並:歸檔舊檔案(在檔案系統中刪除已被合併的檔案compactedFiles,實際上是歸檔操作,將舊的檔案從原位置移到歸檔目錄下),關閉其上的Reader,並更新store大小;
8.2.6、記錄日誌資訊;
8.2.7、完成Compaction請求:Region彙報合併請求至終端、filesCompacting中刪除請求中的所有待合併檔案;
8.2.8、返回StoreFile列表sfs。
至此,整個流程詳述完畢。接下來,我們針對其中的部分細節,再做詳細描述。
首先,真正執行合併的CompactionContext的compact()方法我們暫時不講,只需要知道它會持有通過scanner訪問待合併檔案,然後將資料全部寫入新檔案,並得到這些新檔案的集合newFiles即可,我們會在後續文章詳細介紹。
接下來,在獲得合併後的新檔案newFiles之後,我們會根據一個引數來確定後續處理流程,這個引數就是hbase.hstore.compaction.complete,由它來確定是否完整的結束一次合併操作,這完整與非完整的主要區別,或者說實質性區別就是:由誰來繼續對外提供資料讀取服務。
先來看下非完整性結束,它會為合併後的每個檔案建立StoreFile和Reader例項,同時關閉新檔案上的Reader,也就意味著扔繼續由舊檔案提供資料讀取服務,而新檔案與舊檔案同時存在,舊檔案位置不變,涉及到列簇CF下的目前所有可用storefiles列表不變,儲存的仍是舊檔案的StoreFile物件;
而對於完整性結束來說,它會移動已完成檔案至正確的地方,建立StoreFile和Reader,返回StoreFile列表sfs,然後在WAL中寫入Compaction記錄,並替換掉storefiles,根據合併的型別,針對不同的計數器做累加,方便系統性能指標監控,歸檔舊檔案(在檔案系統中刪除已被合併的檔案compactedFiles,實際上是歸檔操作,將舊的檔案從原位置移到歸檔目錄下),關閉其上的Reader,並更新store大小,完成Compaction請求:Region彙報合併請求至終端、filesCompacting中刪除請求中的所有待合併檔案等等,很多複雜的操作。不要著急,我們就其中複雜的地方,一個個的解釋:
1、移動已完成檔案至正確的地方,建立StoreFile和Reader,返回StoreFile列表sfs
這個是通過moveCompatedFilesIntoPlace()方法實現的,程式碼如下:
-
private List<StoreFile> moveCompatedFilesIntoPlace(
-
CompactionRequest cr, List<Path> newFiles) throws IOException {
-
// 建立StoreFile列表sfs
-
List<StoreFile> sfs = new ArrayList<StoreFile>(newFiles.size());
-
// 遍歷newFiles
-
for (Path newFile : newFiles) {
-
assert newFile != null;
-
// 將新檔案newFile挪至正確地點,並建立StoreFile和Reader
-
StoreFile sf = moveFileIntoPlace(newFile);
-
if (this.getCoprocessorHost() != null) {
-
this.getCoprocessorHost().postCompact(this, sf, cr);
-
}
-
assert sf != null;
-
sfs.add(sf);
-
}
-
return sfs;
-
}
首先呢,建立StoreFile列表sfs,遍歷合併後的檔案newFiles,將新檔案newFile挪至正確地點,並建立StoreFile和Reader。而檔案位置改變,則是通過moveFileIntoPlace()方法實現的,它的程式碼如下:
-
// Package-visible for tests
-
StoreFile moveFileIntoPlace(final Path newFile) throws IOException {
-
// 檢測新檔案
-
validateStoreFile(newFile);
-
// Move the file into the right spot
-
// 移動檔案至正確的地點
-
Path destPath = fs.commitStoreFile(getColumnFamilyName(), newFile);
-
// 建立StoreFile和Reader
-
return createStoreFileAndReader(destPath);
-
}
我們發現,移動檔案實際上是通過HStore的成員變數fs的commitStoreFile()方法來完成的。這個fs是HRegionFileSystem型別的變數,HRegionFileSystem是HRegion上檔案系統的一個抽象,它實現了各種檔案等的實際物理操作。我們來看下它的commitStoreFile()方法:
-
/**
-
* Move the file from a build/temp location to the main family store directory.
-
* @param familyName Family that will gain the file
-
* @param buildPath {@link Path} to the file to commit.
-
* @param seqNum Sequence Number to append to the file name (less then 0 if no sequence number)
-
* @param generateNewName False if you want to keep the buildPath name
-
* @return The new {@link Path} of the committed file
-
* @throws IOException
-
*/
-
private Path commitStoreFile(final String familyName, final Path buildPath,
-
final long seqNum, final boolean generateNewName) throws IOException {
-
// 根據列簇名familyName獲取儲存路徑storeDir
-
Path storeDir = getStoreDir(familyName);
-
// 如果在檔案系統fs中不存在路徑的情況下建立它時失敗則丟擲異常
-
if(!fs.exists(storeDir) && !createDir(storeDir))
-
throw new IOException("Failed creating " + storeDir);
-
String name = buildPath.getName();
-
if (generateNewName) {
-
name = generateUniqueName((seqNum < 0) ? null : "_SeqId_" + seqNum + "_");
-
}
-
Path dstPath = new Path(storeDir, name);
-
if (!fs.exists(buildPath)) {
-
throw new FileNotFoundException(buildPath.toString());
-
}
-
LOG.debug("Committing store file " + buildPath + " as " + dstPath);
-
// buildPath exists, therefore not doing an exists() check.
-
if (!rename(buildPath, dstPath)) {
-
throw new IOException("Failed rename of " + buildPath + " to " + dstPath);
-
}
-
return dstPath;
-
}
非常簡單,根據列簇名familyName獲取儲存路徑storeDir,檢測並在必要時建立storeDir,根據buildPath來獲取檔名name,然後利用storeDir和name來構造目標路徑storeDir,通過rename()方法實現檔案從buildPath至dstPath的移動即可。
而建立StoreFile和Reader的方法最終呼叫的是createStoreFileAndReader()方法,程式碼如下:
-
private StoreFile createStoreFileAndReader(final StoreFileInfo info)
-
throws IOException {
-
info.setRegionCoprocessorHost(this.region.getCoprocessorHost());
-
StoreFile storeFile = new StoreFile(this.getFileSystem(), info, this.conf, this.cacheConf,
-
this.family.getBloomFilterType());
-
storeFile.createReader();
-
return storeFile;
-
}
StoreFile是一個儲存資料檔案。Stores通常含有一個或多個StoreFile,而Reader是其內部類,由Reader來提供檔案資料的讀取服務。
2、在WAL中寫入Compaction記錄
這個過程是通過writeCompactionWalRecord()方法來完成的,程式碼如下:
-
/**
-
* Writes the compaction WAL record.
-
* 在WAL中寫入合併記錄
-
*
-
* @param filesCompacted Files compacted (input).
-
* @param newFiles Files from compaction.
-
*/
-
private void writeCompactionWalRecord(Collection<StoreFile> filesCompacted,
-
Collection<StoreFile> newFiles) throws IOException {
-
// 如果region中的WAL為空,則直接返回
-
if (region.getWAL() == null) return;
-
// 將被合併的檔案路徑新增至inputPaths列表
-
List<Path> inputPaths = new ArrayList<Path>(filesCompacted.size());
-
for (StoreFile f : filesCompacted) {
-
inputPaths.add(f.getPath());
-
}
-
// 將合併後的檔案路徑新增至inputPaths列表
-
List<Path> outputPaths = new ArrayList<Path>(newFiles.size());
-
for (StoreFile f : newFiles) {
-
outputPaths.add(f.getPath());
-
}
-
// 獲取HRegionInfo,即info
-
HRegionInfo info = this.region.getRegionInfo();
-
// 構造compaction的描述資訊CompactionDescriptor
-
CompactionDescriptor compactionDescriptor = ProtobufUtil.toCompactionDescriptor(info,
-
family.getName(), inputPaths, outputPaths, fs.getStoreDir(getFamily().getNameAsString()));
-
// 利用WALUtil工具類的writeCompactionMarker()方法,在WAL中寫入一個合併標記
-
WALUtil.writeCompactionMarker(region.getWAL(), this.region.getTableDesc(),
-
this.region.getRegionInfo(), compactionDescriptor, this.region.getSequenceId());
-
}
邏輯比較簡單:
1、將被合併的檔案路徑新增至inputPaths列表;
2、將合併後的檔案路徑新增至outputPaths列表;
3、獲取HRegionInfo,即info;
4、構造compaction的描述資訊CompactionDescriptor;
5、利用WALUtil工具類的writeCompactionMarker()方法,在WAL中寫入一個合併標記。
首先說下這個compaction的描述資訊CompactionDescriptor,其中包含了表名TableName、Region名EncodedRegionName、列簇名FamilyName、儲存Home路徑StoreHomeDir、合併的輸入CompactionInput、合併的輸出CompactionOutput等關鍵資訊,完整的描述了合併的全部詳細資訊。其構造程式碼如下:
-
public static CompactionDescriptor toCompactionDescriptor(HRegionInfo info, byte[] family,
-
List<Path> inputPaths, List<Path> outputPaths, Path storeDir) {
-
// compaction descriptor contains relative paths.
-
// input / output paths are relative to the store dir
-
// store dir is relative to region dir
-
CompactionDescriptor.Builder builder = CompactionDescriptor.newBuilder()
-
.setTableName(ByteStringer.wrap(info.getTableName()))
-
.setEncodedRegionName(ByteStringer.wrap(info.getEncodedNameAsBytes()))
-
.setFamilyName(ByteStringer.wrap(family))
-
.setStoreHomeDir(storeDir.getName()); //make relative
-
for (Path inputPath : inputPaths) {
-
builder.addCompactionInput(inputPath.getName()); //relative path
-
}
-
for (Path outputPath : outputPaths) {
-
builder.addCompactionOutput(outputPath.getName());
-
}
-
builder.setRegionName(ByteStringer.wrap(info.getRegionName()));
-
return builder.build();
-
}
最後,利用WALUtil工具類的writeCompactionMarker()方法,在WAL中寫入一個合併標記,我們來看下程式碼:
-
/**
-
* Write the marker that a compaction has succeeded and is about to be committed.
-
* This provides info to the HMaster to allow it to recover the compaction if
-
* this regionserver dies in the middle (This part is not yet implemented). It also prevents
-
* the compaction from finishing if this regionserver has already lost its lease on the log.
-
* @param sequenceId Used by WAL to get sequence Id for the waledit.
-
*/
-
public static void writeCompactionMarker(WAL log, HTableDescriptor htd, HRegionInfo info,
-
final CompactionDescriptor c, AtomicLong sequenceId) throws IOException {
-
// 從合併資訊CompactionDescriptor中獲取表名tn
-
TableName tn = TableName.valueOf(c.getTableName().toByteArray());
-
// we use HLogKey here instead of WALKey directly to support legacy coprocessors.
-
// 根據region的名字、表明tn,建立一個WALKey
-
WALKey key = new HLogKey(info.getEncodedNameAsBytes(), tn);
-
// WAL中新增一條記錄,包括表的描述資訊HTableDescriptor、WALKey、Compaction資訊WALEdit、序列號sequenceId
-
// Compaction資訊WALEdit是根據WALEdit的createCompaction()方法,由HRegionInfo、CompactionDescriptor獲取的
-
//
-
log.append(htd, info, key, WALEdit.createCompaction(info, c), sequenceId, false, null);
-
// 同步日誌
-
log.sync();
-
if (LOG.isTraceEnabled()) {
-
LOG.trace("Appended compaction marker " + TextFormat.shortDebugString(c));
-
}
-
}
它實際上在WAL中append了一條記錄,包括表的描述資訊HTableDescriptor、WALKey、Compaction資訊WALEdit、序列號sequenceId,而Compaction資訊WALEdit是根據WALEdit的createCompaction()方法,由HRegionInfo、CompactionDescriptor構造的。程式碼如下:
-
/**
-
* Create a compacion WALEdit
-
* @param c
-
* @return A WALEdit that has <code>c</code> serialized as its value
-
*/
-
public static WALEdit createCompaction(final HRegionInfo hri, final CompactionDescriptor c) {
-
// 將CompactionDescriptor轉化成byte []
-
byte [] pbbytes = c.toByteArray();
-
// 構造KeyValue,包括Region的startKey、“METAFAMILY”字串、
-
// "HBASE::COMPACTION"字串、當前時間和合並描述CompactionDescriptor的二進位制形式
-
KeyValue kv = new KeyValue(getRowForRegion(hri), METAFAMILY, COMPACTION,
-
EnvironmentEdgeManager.currentTime(), pbbytes);
-
// 將KeyValue新增至WALEdit,並返回WALEdit例項
-
return new WALEdit().add(kv); //replication scope null so that this won't be replicated
-
}
程式碼註釋比較詳細,不再贅述。
3、替換StoreFiles,其中包括亮點:
(1)去除掉所有的合併前,即已被合併的檔案compactedFiles,將合併後的檔案sfs加入到StoreFileManager的storefiles中去,storefiles為Store中目前全部提供服務的儲存檔案列表;
(2)正在合併的檔案列表filesCompacting中去除被合併的檔案filesToCompact;
具體程式碼replaceStoreFiles()方法如下:
-
@VisibleForTesting
-
void replaceStoreFiles(final Collection<StoreFile> compactedFiles,
-
final Collection<StoreFile> result) throws IOException {
-
// 加鎖,上讀寫鎖ReentrantReadWriteLock的寫鎖,意味著這是一把互斥鎖
-
this.lock.writeLock().lock();
-
try {
-
// 通過StoreFileManager的addCompactionResults()方法,將被合併的檔案
-
// 去除掉所有的合併前,即已被合併的檔案compactedFiles
-
// 將合併後的檔案加入到StoreFileManager的storefiles中去,storefiles為Store中目前全部提供服務的儲存檔案列表
-
this.storeEngine.getStoreFileManager().addCompactionResults(compactedFiles, result);
-
// 正在合併的檔案列表filesCompacting中去除被合併的檔案
-
filesCompacting.removeAll(compactedFiles); // safe bc: lock.writeLock();
-
} finally {
-
// 解鎖
-
this.lock.writeLock().unlock();
-
}
-
}
4、完成合並:歸檔舊檔案(在檔案系統中刪除已被合併的檔案compactedFiles,實際上是歸檔操作,將舊的檔案從原位置移到歸檔目錄下),關閉其上的Reader,並更新store大小。completeCompaction()程式碼如下:
-
/**
-
* <p>It works by processing a compaction that's been written to disk.
-
*
-
* <p>It is usually invoked at the end of a compaction, but might also be
-
* invoked at HStore startup, if the prior execution died midway through.
-
*
-
* <p>Moving the compacted TreeMap into place means:
-
* <pre>
-
* 1) Unload all replaced StoreFile, close and collect list to delete.
-
* 2) Compute new store size
-
* </pre>
-
*
-
* @param compactedFiles list of files that were compacted
-
*/
-
@VisibleForTesting
-
protected void completeCompaction(final Collection<StoreFile> compactedFiles, boolean removeFiles)
-
throws IOException {
-
try {
-
// Do not delete old store files until we have sent out notification of
-
// change in case old files are still being accessed by outstanding scanners.
-
// Don't do this under writeLock; see HBASE-4485 for a possible deadlock
-
// scenario that could have happened if continue to hold the lock.
-
// 通知Reader觀察者
-
notifyChangedReadersObservers();
-
// At this point the store will use new files for all scanners.
-
// let the archive util decide if we should archive or delete the files
-
LOG.debug("Removing store files after compaction...");
-
// 遍歷已被合併的檔案completeCompaction,關閉其上的Reader
-
for (StoreFile compactedFile : compactedFiles) {
-
compactedFile.closeReader(true);
-
}
-
// 在檔案系統中刪除已被合併的檔案compactedFiles,實際上是歸檔操作,將舊的檔案從原位置移到歸檔目錄下
-
if (removeFiles) {
-
this.fs.removeStoreFiles(this.getColumnFamilyName(), compactedFiles);
-
}
-
} catch (IOException e) {
-
e = RemoteExceptionHandler.checkIOException(e);
-
LOG.error("Failed removing compacted files in " + this +
-
". Files we were trying to remove are " + compactedFiles.toString() +
-
"; some of them may have been already removed", e);
-
}
-
// 4. Compute new store size
-
// 計算新的store大小
-
this.storeSize = 0L;
-
this.totalUncompressedBytes = 0L;
-
// 遍歷StoreFiles,計算storeSize、totalUncompressedBytes等大小
-
for (StoreFile hsf : this.storeEngine.getStoreFileManager().getStorefiles()) {
-
StoreFile.Reader r = hsf.getReader();
-
if (r == null) {
-
LOG.warn("StoreFile " + hsf + " has a null Reader");
-
continue;
-
}
-
this.storeSize += r.length();
-
this.totalUncompressedBytes += r.getTotalUncompressedBytes();
-
}
-
}
其他程式碼註釋中都有,這裡,我們要單獨說下HRegionFileSystem的removeStoreFiles()方法,如下:
-
/**
-
* Closes and archives the specified store files from the specified family.
-
* @param familyName Family that contains the store files
-
* @param storeFiles set of store files to remove
-
* @throws IOException if the archiving fails
-
*/
-
public void removeStoreFiles(final String familyName, final Collection<StoreFile> storeFiles)
-
throws IOException {
-
HFileArchiver.archiveStoreFiles(this.conf, this.fs, this.regionInfoForFs,
-
this.tableDir, Bytes.toBytes(familyName), storeFiles);
-
}
它最終是通過HFileArchiver的archiveStoreFiles()方法來完成的,程式碼如下:
-
/**
-
* Remove the store files, either by archiving them or outright deletion
-
* @param conf {@link Configuration} to examine to determine the archive directory
-
* @param fs the filesystem where the store files live
-
* @param regionInfo {@link HRegionInfo} of the region hosting the store files
-
* @param family the family hosting the store files
-
* @param compactedFiles files to be disposed of. No further reading of these files should be
-
* attempted; otherwise likely to cause an {@link IOException}
-
* @throws IOException if the files could not be correctly disposed.
-
*/
-
public static void archiveStoreFiles(Configuration conf, FileSystem fs, HRegionInfo regionInfo,
-
Path tableDir, byte[] family, Collection<StoreFile> compactedFiles) throws IOException {
-
// sometimes in testing, we don't have rss, so we need to check for that
-
if (fs == null) {
-
LOG.warn("Passed filesystem is null, so just deleting the files without archiving for region:"
-
+ Bytes.toString(regionInfo.getRegionName()) + ", family:" + Bytes.toString(family));
-
deleteStoreFilesWithoutArchiving(compactedFiles);
-
return;
-
}
-
// short circuit if we don't have any files to delete
-
// 判斷被合併檔案列表compactedFiles的大小,如果為0,立即返回
-
if (compactedFiles.size() == 0) {
-
LOG.debug("No store files to dispose, done!");
-
return;
-
}
-
// build the archive path
-
if (regionInfo == null || family == null) throw new IOException(
-
"Need to have a region and a family to archive from.");
-
// 獲取歸檔儲存路徑
-
Path storeArchiveDir = HFileArchiveUtil.getStoreArchivePath(conf, regionInfo, tableDir, family);
-
// make sure we don't archive if we can't and that the archive dir exists
-
// 建立路徑
-
if (!fs.mkdirs(storeArchiveDir)) {
-
throw new IOException("Could not make archive directory (" + storeArchiveDir + ") for store:"
-
+ Bytes.toString(family) + ", deleting compacted files instead.");
-
}
-
// otherwise we attempt to archive the store files
-
if (LOG.isDebugEnabled()) LOG.debug("Archiving compacted store files.");
-
// Wrap the storefile into a File
-
StoreToFile getStorePath = new StoreToFile(fs);
-
Collection<File> storeFiles = Collections2.transform(compactedFiles, getStorePath);
-
// do the actual archive
-
// 通過resolveAndArchive()執行歸檔
-
if (!resolveAndArchive(fs, storeArchiveDir, storeFiles)) {
-
throw new IOException("Failed to archive/delete all the files for region:"
-
+ Bytes.toString(regionInfo.getRegionName()) + ", family:" + Bytes.toString(family)
-
+ " into " + storeArchiveDir + ". Something is probably awry on the filesystem.");
-
}
-
}
層層呼叫啊,接著來吧,繼續看關鍵程式碼:
-
// 如果是檔案
-
if (file.isFile()) {
-
// attempt to archive the file
-
if (!resolveAndArchiveFile(baseArchiveDir, file, startTime)) {
-
LOG.warn("Couldn't archive " + file + " into backup directory: " + baseArchiveDir);
-
failures.add(file);
-
}
-
}
而這個resolveAndArchiveFile()方法不是簡單的刪除檔案,而是通過rename()方法將舊的儲存檔案挪至了歸檔路徑下,程式碼如下:
-
// move the archive file to the stamped backup
-
Path backedupArchiveFile = new Path(archiveDir, filename + SEPARATOR + archiveStartTime);
-
if (!fs.rename(archiveFile, backedupArchiveFile)) {
-
LOG.error("Could not rename archive file to backup: " + backedupArchiveFile
-
+ ", deleting existing file in favor of newer.");
-
// try to delete the exisiting file, if we can't rename it
-
if (!fs.delete(archiveFile, false)) {
-
throw new IOException("Couldn't delete existing archive file (" + archiveFile
-
+ ") or rename it to the backup file (" + backedupArchiveFile
-
+ ") to make room for similarly named file.");
-
}
-
}
5、完成Compaction請求:Region彙報合併請求至終端、filesCompacting中刪除請求中的所有待合併檔案
這部分是由方法finishCompactionRequest()完成的,程式碼如下:
-
private void finishCompactionRequest(CompactionRequest cr) {
-
// Region彙報合併請求至終端
-
this.region.reportCompactionRequestEnd(cr.isMajor(), cr.getFiles().size(), cr.getSize());
-
//
-
if (cr.isOffPeak()) {
-
offPeakCompactionTracker.set(false);
-
cr.setOffPeak(false);
-
}
-
// filesCompacting中刪除請求中的所有待合併檔案
-
synchronized (filesCompacting) {
-
filesCompacting.removeAll(cr.getFiles());
-
}
-
}
讀者可自行分析,不再贅述。
好了,就先到這裡吧,且待下回分解!