1. 程式人生 > >Leveldb原始碼分析--20

Leveldb原始碼分析--20

12 DB的開啟

先分析LevelDB是如何開啟db的,萬物始於建立。在開啟流程中有幾個輔助函式:DBImpl(),DBImpl::Recover, DBImpl::DeleteObsoleteFiles, DBImpl::RecoverLogFile, DBImpl::MaybeScheduleCompaction。

12.1 DB::Open()

開啟一個db,進行PUT、GET操作,就是前面的靜態函式DB::Open的工作。如果操作成功,它就返回一個db指標。前面說過DB就是一個介面類,其具體實現在DBImp類中,這是一個DB的子類。
函式宣告為:
Status DB::Open(const Options& options, const std::string&dbname, DB** dbptr);
分解來看,Open()函式主要有以下5個執行步驟。
S1 建立DBImpl物件,其後進入DBImpl::Recover()函式執行S2和S3。
S2 從已存在的db檔案恢復db資料,根據CURRENT記錄的MANIFEST檔案讀取db元資訊;這通過呼叫VersionSet::Recover()完成。
S3 然後過濾出那些最近的更新log,前一個版本可能新加了這些log,但並沒有記錄在MANIFEST中。然後依次根據時間順序,呼叫DBImpl::RecoverLogFile()從舊到新回放這些操作log。回放log時可能會修改db元資訊,比如dump了新的level 0檔案,因此它將返回一個VersionEdit物件,記錄db元資訊的變動。
S4 如果DBImpl::Recover()返回成功,就執行VersionSet::LogAndApply()應用VersionEdit,並儲存當前的DB資訊到新的MANIFEST檔案中。
S5 最後刪除一些過期檔案,並檢查是否需要執行compaction,如果需要,就啟動後臺執行緒執行。
下面就來具體分析Open函式的程式碼,在Open函式中涉及到上面的3個流程。
S1 首先建立DBImpl物件,鎖定並試圖做Recover操作。Recover操作用來處理建立flag,比如存在就返回失敗等等,嘗試從已存在的sstable檔案恢復db。並返回db元資訊的變動資訊,一個VersionEdit物件。
  DBImpl* impl = newDBImpl(options, dbname);
  impl->mutex_.Lock(); // 鎖db
  VersionEdit edit;
  Status s =impl->Recover(&edit); // 處理flag&恢復:create_if_missing,error_if_exists
S2 如果Recover返回成功,則呼叫VersionSet取得新的log檔案編號——實際上是在當前基礎上+1,準備新的log檔案。如果log檔案建立成功,則根據log檔案建立log::Writer。然後執行VersionSet::LogAndApply,根據edit記錄的增量變動生成新的current version,並寫入MANIFEST檔案。
函式NewFileNumber(){returnnext_file_number_++;},直接返回next_file_number_。
    uint64_t new_log_number =impl->versions_->NewFileNumber();
    WritableFile* lfile;
    s =options.env->NewWritableFile(LogFileName(dbname, new_log_number),&lfile);
    if (s.ok()) {
      edit.SetLogNumber(new_log_number);
      impl->logfile_ = lfile;
      impl->logfile_number_ =new_log_number;
      impl->log_ = newlog::Writer(lfile);
      s =impl->versions_->LogAndApply(&edit, &impl->mutex_);
    }

 S3 如果VersionSet::LogAndApply返回成功,則刪除過期檔案,檢查是否需要執行compaction,最終返回建立的DBImpl物件。

    if (s.ok()) {
     impl->DeleteObsoleteFiles();
     impl->MaybeScheduleCompaction();
    }
  impl->mutex_.Unlock();
  if (s.ok()) *dbptr = impl;
  return s;
以上就是DB::Open的主題邏輯。

12.2 DBImpl::DBImpl()

建構函式做的都是初始化操作,DBImpl::DBImpl(const Options& options, const std::string&dbname)
首先是初始化列表中,直接根據引數賦值,或者直接初始化。Comparator和filter policy都是引數傳入的。在傳遞option時會首先將option中的引數合法化,logfile_number_初始化為0,指標初始化為NULL。
建立MemTable,並增加引用計數,建立WriteBatch。
      mem_(newMemTable(internal_comparator_)),
      tmp_batch_(new WriteBatch),
      mem_->Ref();
  // 然後在函式體中,建立TableCache和VersionSet。
  // 為其他預留10個檔案,其餘的都給TableCache.
  const int table_cache_size =options.max_open_files - 10;
  table_cache_ = newTableCache(dbname_, &options_, table_cache_size);
  versions_ = newVersionSet(dbname_, &options_, table_cache_, &internal_comparator_);

12.3 DBImp::NewDB()

當外部在呼叫DB::Open()時設定了option指定如果db不存在就建立,如果db不存在leveldb就會呼叫函式建立新的db。判斷db是否存在的依據是<db name>/CURRENT檔案是否存在。其邏輯很簡單。

// S1首先生產DB元資訊,設定comparator名,以及log檔案編號、檔案編號,以及seq no。
  VersionEdit new_db;
  new_db.SetComparatorName(user_comparator()->Name());
  new_db.SetLogNumber(0);
  new_db.SetNextFile(2);
  new_db.SetLastSequence(0);
// S2 生產MANIFEST檔案,將db元資訊寫入MANIFEST檔案。
  const std::string manifest =DescriptorFileName(dbname_, 1);
  WritableFile* file;
  Status s =env_->NewWritableFile(manifest, &file);
  if (!s.ok()) return s;
  {
    log::Writer log(file);
    std::string record;
    new_db.EncodeTo(&record);
    s = log.AddRecord(record);
    if (s.ok()) s =file->Close();
  }
  delete file;
// S3 如果成功,就把MANIFEST檔名寫入到CURRENT檔案中
  if (s.ok()) s =SetCurrentFile(env_, dbname_, 1);
  elseenv_->DeleteFile(manifest);
  return s;
這就是建立新DB的邏輯,很簡單。

12.4 DBImpl::Recover()

函式宣告為:StatusDBImpl::Recover(VersionEdit* edit),如果呼叫成功則設定VersionEdit。Recover的基本功能是:首先是處理建立flag,比如存在就返回失敗等等;然後是嘗試從已存在的sstable檔案恢復db;最後如果發現有大於原資訊記錄的log編號的log檔案,則需要回放log,更新db資料。回放期間db可能會dump新的level 0檔案,因此需要把db元資訊的變動記錄到edit中返回。函式邏輯如下:
S1 建立目錄,目錄以db name命名,忽略任何建立錯誤,然後嘗試獲取db name/LOCK檔案鎖,失敗則返回。
    env_->CreateDir(dbname_);
    Status s =env_->LockFile(LockFileName(dbname_), &db_lock_);
    if (!s.ok()) return s;
S2 根據CURRENT檔案是否存在,以及option引數執行檢查。
如果檔案不存在&create_is_missing=true,則呼叫函式NewDB()建立;否則報錯。
如果檔案存在& error_if_exists=true,則報錯。
S3 呼叫VersionSet的Recover()函式,就是從檔案中恢復資料。如果出錯則開啟失敗,成功則向下執行S4。
    s = versions_->Recover();
S4嘗試從所有比manifest檔案中記錄的log要新的log檔案中恢復(前一個版本可能會新增新的log檔案,卻沒有記錄在manifest中)。另外,函式PrevLogNumber()已經不再用了,僅為了相容老版本。
//  S4.1 這裡先找出所有滿足條件的log檔案:比manifest檔案記錄的log編號更新。
  SequenceNumber max_sequence(0);
  const uint64_t min_log =versions_->LogNumber();
  const uint64_t prev_log =versions_->PrevLogNumber();
  std::vector<std::string>filenames;
  s =env_->GetChildren(dbname_, &filenames); // 列出目錄內的所有檔案
  uint64_t number;
  FileType type;
  std::vector<uint64_t>logs;
  for (size_t i = 0; i <filenames.size(); i++) { // 檢查log檔案是否比min log更新
    if(ParseFileName(filenames[i], &number, &type) && type ==kLogFile
        && ((number >=min_log) || (number == prev_log))) {
      logs.push_back(number);
    }
  }
//  S4.2 找到log檔案後,首先排序,保證按照生成順序,依次回放log。並把DB元資訊的變動(sstable檔案的變動)追加到edit中返回。
    std::sort(logs.begin(),logs.end());
    for (size_t i = 0; i <logs.size(); i++) {
      s = RecoverLogFile(logs[i],edit, &max_sequence);
      // 前一版可能在生成該log編號後沒有記錄在MANIFEST中,
     //所以這裡我們手動更新VersionSet中的檔案編號計數器
     versions_->MarkFileNumberUsed(logs[i]);
}
//  S4.3 更新VersionSet的sequence
    if (s.ok()) {
      if(versions_->LastSequence() < max_sequence)
         versions_->SetLastSequence(max_sequence);
}
上面就是Recover的執行流程。

12.5 DBImpl::DeleteObsoleteFiles()

這個是垃圾回收函式,如前所述,每次compaction和recovery之後都會有檔案被廢棄。DeleteObsoleteFiles就是刪除這些垃圾檔案的,它在每次compaction和recovery完成之後被呼叫。
其呼叫點包括:DBImpl::CompactMemTable,DBImpl::BackgroundCompaction, 以及DB::Open的recovery步驟之後。
它會刪除所有過期的log檔案,沒有被任何level引用到、或不是正在執行的compaction的output的sstable檔案。
該函式沒有引數,其程式碼邏輯也很直觀,就是列出db的所有檔案,對不同型別的檔案分別判斷,如果是過期檔案,就刪除之,如下:
// S1 首先,確保不會刪除pending檔案,將versionset正在使用的所有檔案加入到live中。
  std::set<uint64_t> live =pending_outputs_;
  versions_->AddLiveFiles(&live); //該函式其後分析
// S2 列舉db的所有檔案
  std::vector<std::string>filenames;
  env_->GetChildren(dbname_,&filenames);
// S3 遍歷所有列舉的檔案,根據檔案型別,分別處理;
  uint64_t number;
  FileType type;
  for (size_t i = 0; i <filenames.size(); i++) {
     if (ParseFileName(filenames[i], &number,&type)) {
         bool keep = true; //false表明是過期檔案
         // S3.1 kLogFile,log檔案,根據log編號判斷是否過期
              keep = ((number >=versions_->LogNumber()) ||
                  (number ==versions_->PrevLogNumber()));
         // S3.2 kDescriptorFile,MANIFEST檔案,根據versionset記錄的編號判斷
              keep = (number >=versions_->ManifestFileNumber());
         // S3.3 kTableFile,sstable檔案,只要在live中就不能刪除
         // S3.4 kTempFile,如果是正在寫的檔案,只要在live中就不能刪除
              keep = (live.find(number) != live.end());
         // S3.5 kCurrentFile,kDBLockFile, kInfoLogFile,不能刪除
              keep = true;
     // S3.6 如果keep為false,表明需要刪除檔案,如果是table還要從cache中刪除
          if (!keep) {
             if(type == kTableFile) table_cache_->Evict(number);
             Log(options_.info_log, "Delete type=%d #%lld\n",type, number);
             env_->DeleteFile(dbname_ + "/" +filenames[i]);
          }
     }
  }

這就是刪除過期檔案的邏輯,其中呼叫到了VersionSet::AddLiveFiles函式,保證不會刪除active的檔案。

函式DbImpl::MaybeScheduleCompaction()放在Compaction一節分析,基本邏輯就是如果需要compaction,就啟動後臺執行緒執行compaction操作。

12.6 DBImpl::RecoverLogFile()

函式宣告:StatusRecoverLogFile(uint64_t log_number, VersionEdit* edit,SequenceNumber* max_sequence)
引數說明:
@log_number是指定的log檔案編號
@edit記錄db元資訊的變化——sstable檔案變動
@max_sequence 返回max{log記錄的最大序號, *max_sequence}
該函式開啟指定的log檔案,回放日誌。期間可能會執行compaction,生產新的level 0sstable檔案,記錄檔案變動到edit中。
它聲明瞭一個區域性類LogReporter以列印錯誤日誌,沒什麼好說的,下面來看程式碼邏輯。
// S1 開啟log檔案返回SequentialFile*file,出錯就返回,否則向下執行S2。
// S2 根據log檔案控制代碼file建立log::Reader,準備讀取log。
  log::Reader reader(file,&reporter, true/*checksum*/,0/*initial_offset*/);
// S3 依次讀取所有的log記錄,並插入到新生成的memtable中。這裡使用到了批量更新介面WriteBatch,具體後面再分析。
  std::string scratch;
  Slice record;
  WriteBatch batch;
  MemTable* mem = NULL;
  while(reader.ReadRecord(&record, &scratch) && status.ok()) { // 讀取全部log
    if (record.size() < 12) { // log資料錯誤,不滿足最小長度12
     reporter.Corruption(record.size(), Status::Corruption("log recordtoo small"));
      continue;
    }
   WriteBatchInternal::SetContents(&batch, record); // log內容設定到WriteBatch中
   if (mem == NULL) { // 建立memtable
      mem = new MemTable(internal_comparator_);
      mem->Ref();
    }
    status =WriteBatchInternal::InsertInto(&batch, mem); // 插入到memtable中
    MaybeIgnoreError(&status);
    if (!status.ok()) break;
    const SequenceNumber last_seq=
       WriteBatchInternal::Sequence(&batch) + WriteBatchInternal::Count(&batch)- 1;
    if (last_seq >*max_sequence) *max_sequence = last_seq; // 更新max sequence
    // 如果mem的記憶體超過設定值,則執行compaction,如果compaction出錯,
    // 立刻返回錯誤,DB::Open失敗
    if(mem->ApproximateMemoryUsage() > options_.write_buffer_size) {
      status =WriteLevel0Table(mem, edit, NULL);
      if (!status.ok()) break;
      mem->Unref(); // 釋放當前memtable
      mem = NULL;
    }
  }
// S4 掃尾工作,如果mem != NULL,說明還需要dump到新的sstable檔案中。
  if (status.ok() && mem!= NULL) {// 如果compaction出錯,立刻返回錯誤
    status = WriteLevel0Table(mem,edit, NULL);
  }
  if (mem != NULL)mem->Unref();
  delete file;
  return status;
把MemTabledump到sstable是函式WriteLevel0Table的工作,其實這是compaction的一部分,準備放在compaction一節來分析。

12.7 小結

如上DB開啟的邏輯就已經分析完了,開啟邏輯參見DB::Open()中描述的5個步驟。此外還有兩個東東:把Memtable dump到sstable的WriteLevel0Table()函式,以及批量修改WriteBatch。第一個放在後面的compaction一節,第二個放在DB更新操作中。接下來就是db的關閉。

13 DB的關閉&銷燬

13.1 DB關閉

外部呼叫者通過DB::Open()獲取一個DB*物件,如果要關閉開啟的DB*db物件,則直接delete db即可,這會呼叫到DBImpl的解構函式。
析構依次執行如下的5個邏輯:
S1 等待後臺compaction任務結束
S2 釋放db檔案鎖,<dbname>/lock檔案
S3 刪除VersionSet物件,並釋放MemTable物件
S4 刪除log相關以及TableCache物件
S5 刪除options的block_cache以及info_log物件

13.2 DB銷燬

函式宣告:StatusDestroyDB(const std::string& dbname, const Options& options)
該函式會刪除掉db的資料內容,要謹慎使用。函式邏輯為:
S1 獲取dbname目錄的檔案列表到filenames中,如果為空則直接返回,否則進入S2。
S2 鎖檔案<dbname>/lock,如果鎖成功就執行S3
S3 遍歷filenames檔案列表,過濾掉lock檔案,依次呼叫DeleteFile刪除。
S4 釋放lock檔案,並刪除之,然後刪除資料夾。
Destory就執行完了,如果刪除檔案出現錯誤,記錄之,依然繼續刪除下一個。最後返回錯誤程式碼。
看來這一章很短小。DB的開啟關閉分析完畢。