leveldb之Compaction操作上之呼叫流程
阿新 • • 發佈:2019-01-30
由之前對Compaction的介紹: 可知,Compaction操作主要是用來合併檔案的。同時Compaction可以分為兩種:
1)將Memtable中的資料匯出到SSTable檔案中
2)合併不同level的SSTable檔案
在寫資料時,可能會導致Memtable寫滿,此時會需要將Memtable轉化為SSTable。
由之前對FileMetaData資料結構的分析可知,當一個.sst檔案的查詢次數超過最大允許值時,也需要將其合併。
因此在讀資料時,也可能會觸發合併。
在DBImpl::Write()中首先會呼叫到DBImpl::MakeRoomForWrite(bool force)來確保有空間可以寫入。
Status status = MakeRoomForWrite(my_batch == NULL);
1、DBImpl::MakeRoomForWrite() 和 DBImpl::Get()
1)當有資料要寫入時,my_batch != NULL,則force為false。force==true,則沒有資料需要寫入,表示只是為了進行合併。DBImpl::MakeRoomForWrite()的實現如下(暫時考慮有資料寫入,即force = false的情況):Status DBImpl::MakeRoomForWrite(bool force) { //false mutex_.AssertHeld(); bool allow_delay = !force; //true Status s; while (true) { if (!bg_error_.ok()) { s = bg_error_; break; } else if ( allow_delay && //true versions_->NumLevelFiles(0) >= config::kL0_SlowdownWritesTrigger) {//當L0的檔案數到達8個時,則會減慢寫入速度,表現的是延遲一段時間再寫入 mutex_.Unlock(); //釋放鎖,則在休眠時可以進行其他操作 env_->SleepForMicroseconds(1000);//休眠1ms allow_delay = false; //變為false,則下次迴圈不會進入這裡,保證不會多次延遲同一次寫操作 mutex_.Lock(); } else if (!force && (mem_->ApproximateMemoryUsage() <= options_.write_buffer_size)) { // 當前Memtable中還有空間可以寫入資料 break; } else if (imm_ != NULL) { //imm_不為空,表明正在進行合併,需要等待合併完成 bg_cv_.Wait(); } else if (versions_->NumLevelFiles(0) >= config::kL0_StopWritesTrigger) {//L0最多有12個檔案,當超過時,不會在繼續寫入,因此會等待合併完成再寫入 bg_cv_.Wait(); } else {//當前Memtable沒有空間了,且此時也沒有在進行合併操作,因此需要建立一個新的Memtable uint64_t new_log_number = versions_->NewFileNumber();//獲取新的檔案編號 WritableFile* lfile = NULL; s = env_->NewWritableFile(LogFileName(dbname_, new_log_number), &lfile);//建立一個新的log檔案 if (!s.ok()) { versions_->ReuseFileNumber(new_log_number); break; } delete log_;//當前Memtable已經寫滿,則log檔案可以丟棄 delete logfile_; logfile_ = lfile; logfile_number_ = new_log_number; log_ = new log::Writer(lfile); imm_ = mem_;//將Memtable轉化為immutable Memtable has_imm_.Release_Store(imm_); mem_ = new MemTable(internal_comparator_);//然後建立一個新的Memtable mem_->Ref(); force = false; // Do not force another compaction if have room MaybeScheduleCompaction();//判斷是否需要進行合併 } } return s; }
即噹噹前Memtable已經寫滿且暫時沒有進行合併操作時,建立一個新的Memtable,將原來的Memtable轉化為imm_,然後呼叫MaybeScheduleCompaction()來判斷是否需要進行合併操作。
2)當從資料庫中讀取資料時,如果在memtable或者immutable memtable中找到,則不會合並。但是如果沒有找到,而是要繼續在SSTable檔案中查詢時,就可能會導致有的SSTable檔案的查詢次數超過上限,此時就需要對該檔案進行合併了。
在查詢SSTable時,會通過變數stats記錄檔案的查詢資訊,當允許查詢次數不大於0時,就可能會導致合併。Status DBImpl::Get(const ReadOptions& options, const Slice& key, std::string* value) { Status s; MutexLock l(&mutex_); SequenceNumber snapshot; bool have_stat_update = false; Version::GetStats stats; { mutex_.Unlock(); LookupKey lkey(key, snapshot); if (mem->Get(lkey, value, &s)) { //在Memtable中查詢 // Done } else if (imm != NULL && imm->Get(lkey, value, &s)) { //imm_還沒有合併時,若沒有找到,則繼續在imm_中查詢 // Done } else { //若還沒有找到,則繼續在SSTable中查詢,此時可能會觸發合併 s = current->Get(options, lkey, value, &stats); have_stat_update = true; } mutex_.Lock(); } if (have_stat_update && current->UpdateStats(stats)) { //基於查詢的統計資訊stats,來判斷是否有檔案需要合併 MaybeScheduleCompaction();//判斷是否有檔案需要進行合併 } return s; }
2、Compaction操作的呼叫流程
1)首先在DBImpl::MaybeScheduleCompaction()中判斷是否需要進行合併
void DBImpl::MaybeScheduleCompaction() {
mutex_.AssertHeld();
if (bg_compaction_scheduled_) { // Already scheduled
} else if (shutting_down_.Acquire_Load()) { // DB is being deleted; no more background compactions
} else if (!bg_error_.ok()) { // Already got an error; no more changes
} else if (imm_ == NULL &&manual_compaction_ == NULL &&!versions_->NeedsCompaction()) { // No work to be done
} else {
bg_compaction_scheduled_ = true;
env_->Schedule(&DBImpl::BGWork, this); //當需要合併時,後臺排程
}
}
2)當需要合併時,在BGWork()中呼叫BackgroundCall()
void DBImpl::BGWork(void* db) {
reinterpret_cast<DBImpl*>(db)->BackgroundCall();
}
在BackgroundCall()中首先呼叫BackgroundCompaction()進行合併,合併完成後可能導致有的level的檔案數過多,因此會再次呼叫MaybeScheduleCompaction()判斷是否需要繼續進行合併。
void DBImpl::BackgroundCall() {
MutexLock l(&mutex_);
if (shutting_down_.Acquire_Load()) { // No more background work when shutting down.
} else if (!bg_error_.ok()) { // No more background work after a background error.
} else {
BackgroundCompaction(); //進行合併
}
bg_compaction_scheduled_ = false;
MaybeScheduleCompaction();//之前的合併可能導致有的level的檔案數過多,因此需要繼續排程判斷是否需要繼續進行合併
bg_cv_.SignalAll();
}
3)在BackgroundCompaction()中分別完成三種不同的Compaction:
- 對Memtable進行合併
- trivial Compaction,直接將檔案移動到下一層
- 一般的合併,呼叫DoCompactionWork()實現
void DBImpl::BackgroundCompaction() {
mutex_.AssertHeld();
if (imm_ != NULL) {
CompactMemTable();//1、對Memtable進行合併
return;
}
Compaction* c;
bool is_manual = (manual_compaction_ != NULL); //manual_compaction預設為NULL,則is_manual預設為false
InternalKey manual_end;
if (is_manual) {
ManualCompaction* m = manual_compaction_;
c = versions_->CompactRange(m->level, m->begin, m->end);
m->done = (c == NULL);
if (c != NULL) {
manual_end = c->input(0, c->num_input_files(0) - 1)->largest;
}
} else {
c = versions_->PickCompaction();
}
Status status;
if (c == NULL) { // Nothing to do
} else if (!is_manual && c->IsTrivialMove()) {//2、trivial Compaction,直接移動到下一level
// Move file to next level
FileMetaData* f = c->input(0, 0);
c->edit()->DeleteFile(c->level(), f->number);
c->edit()->AddFile(c->level() + 1, f->number, f->file_size,
f->smallest, f->largest);
status = versions_->LogAndApply(c->edit(), &mutex_);
} else { //3、一般的合併
CompactionState* compact = new CompactionState(c);
status = DoCompactionWork(compact);
CleanupCompaction(compact);
c->ReleaseInputs();
DeleteObsoleteFiles();
}
delete c;
if (is_manual) { //false
ManualCompaction* m = manual_compaction_;
if (!status.ok()) {
m->done = true;
}
if (!m->done) {
m->tmp_storage = manual_end;
m->begin = &m->tmp_storage;
}
manual_compaction_ = NULL;
}
}