1. 程式人生 > >leveldb之Compaction操作上之呼叫流程

leveldb之Compaction操作上之呼叫流程

由之前對Compaction的介紹: 可知,Compaction操作主要是用來合併檔案的。同時Compaction可以分為兩種:

1)將Memtable中的資料匯出到SSTable檔案中

2)合併不同level的SSTable檔案

在寫資料時,可能會導致Memtable寫滿,此時會需要將Memtable轉化為SSTable。

由之前對FileMetaData資料結構的分析可知,當一個.sst檔案的查詢次數超過最大允許值時,也需要將其合併。

因此在讀資料時,也可能會觸發合併。

 在DBImpl::Write()中首先會呼叫到DBImpl::MakeRoomForWrite(bool force)來確保有空間可以寫入。

Status status = MakeRoomForWrite(my_batch == NULL);

1、DBImpl::MakeRoomForWrite() 和 DBImpl::Get()

1)當有資料要寫入時,my_batch != NULL,則force為false。force==true,則沒有資料需要寫入,表示只是為了進行合併。DBImpl::MakeRoomForWrite()的實現如下(暫時考慮有資料寫入,即force = false的情況):
Status DBImpl::MakeRoomForWrite(bool force) { //false
  mutex_.AssertHeld();
  bool allow_delay = !force;  //true
  Status s;
  while (true) {
    if (!bg_error_.ok()) {
      s = bg_error_;
      break;
    } else if (
        allow_delay &&    //true
        versions_->NumLevelFiles(0) >= config::kL0_SlowdownWritesTrigger) {//當L0的檔案數到達8個時,則會減慢寫入速度,表現的是延遲一段時間再寫入
      mutex_.Unlock();  //釋放鎖,則在休眠時可以進行其他操作
      env_->SleepForMicroseconds(1000);//休眠1ms
      allow_delay = false;  //變為false,則下次迴圈不會進入這裡,保證不會多次延遲同一次寫操作
      mutex_.Lock();
    } else if (!force &&
               (mem_->ApproximateMemoryUsage() <= options_.write_buffer_size)) {
      // 當前Memtable中還有空間可以寫入資料
      break;
    } else if (imm_ != NULL) {   //imm_不為空,表明正在進行合併,需要等待合併完成
      bg_cv_.Wait();
    } else if (versions_->NumLevelFiles(0) >= config::kL0_StopWritesTrigger) {//L0最多有12個檔案,當超過時,不會在繼續寫入,因此會等待合併完成再寫入
      bg_cv_.Wait();
    } else {//當前Memtable沒有空間了,且此時也沒有在進行合併操作,因此需要建立一個新的Memtable
      uint64_t new_log_number = versions_->NewFileNumber();//獲取新的檔案編號
      WritableFile* lfile = NULL;
      s = env_->NewWritableFile(LogFileName(dbname_, new_log_number), &lfile);//建立一個新的log檔案
      if (!s.ok()) {
        versions_->ReuseFileNumber(new_log_number);
        break;
      }
      delete log_;//當前Memtable已經寫滿,則log檔案可以丟棄
      delete logfile_;
      logfile_ = lfile;
      logfile_number_ = new_log_number;
      log_ = new log::Writer(lfile);
      imm_ = mem_;//將Memtable轉化為immutable Memtable
      has_imm_.Release_Store(imm_);
      mem_ = new MemTable(internal_comparator_);//然後建立一個新的Memtable
      mem_->Ref();
      force = false;   // Do not force another compaction if have room
      MaybeScheduleCompaction();//判斷是否需要進行合併
    }
  }
  return s;
}

即噹噹前Memtable已經寫滿且暫時沒有進行合併操作時,建立一個新的Memtable,將原來的Memtable轉化為imm_,然後呼叫MaybeScheduleCompaction()來判斷是否需要進行合併操作。

2)當從資料庫中讀取資料時,如果在memtable或者immutable memtable中找到,則不會合並。但是如果沒有找到,而是要繼續在SSTable檔案中查詢時,就可能會導致有的SSTable檔案的查詢次數超過上限,此時就需要對該檔案進行合併了。

Status DBImpl::Get(const ReadOptions& options,
                   const Slice& key,
                   std::string* value) {
  Status s;
  MutexLock l(&mutex_);
  SequenceNumber snapshot;

  bool have_stat_update = false;
  Version::GetStats stats;

  {
    mutex_.Unlock();
    LookupKey lkey(key, snapshot);
    if (mem->Get(lkey, value, &s)) {   //在Memtable中查詢
      // Done
    } else if (imm != NULL && imm->Get(lkey, value, &s)) {   //imm_還沒有合併時,若沒有找到,則繼續在imm_中查詢
      // Done
    } else {    //若還沒有找到,則繼續在SSTable中查詢,此時可能會觸發合併
      s = current->Get(options, lkey, value, &stats);
      have_stat_update = true;
    }
    mutex_.Lock();
  }

  if (have_stat_update && current->UpdateStats(stats)) {  //基於查詢的統計資訊stats,來判斷是否有檔案需要合併
    MaybeScheduleCompaction();//判斷是否有檔案需要進行合併
  }

  return s;
}
在查詢SSTable時,會通過變數stats記錄檔案的查詢資訊,當允許查詢次數不大於0時,就可能會導致合併。

2、Compaction操作的呼叫流程

1)首先在DBImpl::MaybeScheduleCompaction()中判斷是否需要進行合併

void DBImpl::MaybeScheduleCompaction() {
  mutex_.AssertHeld();
  if (bg_compaction_scheduled_) { // Already scheduled
  } else if (shutting_down_.Acquire_Load()) {  // DB is being deleted; no more background compactions
  } else if (!bg_error_.ok()) {   // Already got an error; no more changes
  } else if (imm_ == NULL &&manual_compaction_ == NULL &&!versions_->NeedsCompaction()) {   // No work to be done
  } else {
    bg_compaction_scheduled_ = true;
    env_->Schedule(&DBImpl::BGWork, this);  //當需要合併時,後臺排程
  }
}

2)當需要合併時,在BGWork()中呼叫BackgroundCall()

void DBImpl::BGWork(void* db) {
  reinterpret_cast<DBImpl*>(db)->BackgroundCall();
}
在BackgroundCall()中首先呼叫BackgroundCompaction()進行合併,合併完成後可能導致有的level的檔案數過多,因此會再次呼叫MaybeScheduleCompaction()判斷是否需要繼續進行合併。
void DBImpl::BackgroundCall() {
  MutexLock l(&mutex_);
  if (shutting_down_.Acquire_Load()) {  // No more background work when shutting down.
  } else if (!bg_error_.ok()) {  // No more background work after a background error.
  } else {
    BackgroundCompaction(); //進行合併
  }
  bg_compaction_scheduled_ = false;

  MaybeScheduleCompaction();//之前的合併可能導致有的level的檔案數過多,因此需要繼續排程判斷是否需要繼續進行合併
  bg_cv_.SignalAll();
}

3)在BackgroundCompaction()中分別完成三種不同的Compaction:

  1. 對Memtable進行合併
  2.  trivial Compaction,直接將檔案移動到下一層
  3.  一般的合併,呼叫DoCompactionWork()實現
void DBImpl::BackgroundCompaction() {
  mutex_.AssertHeld();

  if (imm_ != NULL) {
    CompactMemTable();//1、對Memtable進行合併
    return;
  }

  Compaction* c;
  bool is_manual = (manual_compaction_ != NULL); //manual_compaction預設為NULL,則is_manual預設為false
  InternalKey manual_end;
  if (is_manual) {
    ManualCompaction* m = manual_compaction_;
    c = versions_->CompactRange(m->level, m->begin, m->end);
    m->done = (c == NULL);
    if (c != NULL) {
      manual_end = c->input(0, c->num_input_files(0) - 1)->largest;
    }
  } else {
    c = versions_->PickCompaction();
  }

  Status status;
  if (c == NULL) {   // Nothing to do
  } else if (!is_manual && c->IsTrivialMove()) {//2、trivial Compaction,直接移動到下一level
    // Move file to next level
    FileMetaData* f = c->input(0, 0);
    c->edit()->DeleteFile(c->level(), f->number);
    c->edit()->AddFile(c->level() + 1, f->number, f->file_size,
                       f->smallest, f->largest);
    status = versions_->LogAndApply(c->edit(), &mutex_);
  } else {   //3、一般的合併
    CompactionState* compact = new CompactionState(c);
    status = DoCompactionWork(compact);  
 
    CleanupCompaction(compact);
    c->ReleaseInputs();
    DeleteObsoleteFiles();
  }
  delete c;

  if (is_manual) {  //false
    ManualCompaction* m = manual_compaction_;
    if (!status.ok()) {
      m->done = true;
    }
    if (!m->done) {
      m->tmp_storage = manual_end;
      m->begin = &m->tmp_storage;
    }
    manual_compaction_ = NULL;
  }
}