1. 程式人生 > >leveldb學習:Env

leveldb學習:Env

考慮到移植以及靈活性,leveldb將系統相關的處理(檔案/程序/時間)抽象成Evn,使用者可以自己實現相應的介面,作為option傳入,預設使用自帶的實現。
解壓目錄/util/中放有env的宣告和實現程式碼。env.h中聲明瞭:

  1. 虛基類env,在env_posix.cc中,派生類PosixEnv繼承自env類,是leveldb的預設實現。
  2. 虛基類WritableFile、SequentialFile、RandomAccessFile,分別是檔案的寫抽象類,順序讀抽象類和隨機讀抽象類
  3. 類Logger,log檔案的寫入介面,log檔案是防止系統異常終止造成資料丟失,是memtable在磁碟的備份
  4. 類FileLock,為檔案上鎖
  5. WriteStringToFile、ReadFileToString、Log三個全域性函式,封裝了上述介面

下面來看看env_posix.cc中為我們寫好的預設實現

順序讀:

class PosixSequentialFile: public SequentialFile {
 private:
  std::string filename_;
  FILE* file_;
 public:
  PosixSequentialFile(const std::string& fname, FILE* f)
      : filename_(fname), file_
(f) { } virtual ~PosixSequentialFile() { fclose(file_); } virtual Status Read(size_t n, Slice* result, char* scratch) { Status s; size_t r = fread_unlocked(scratch, 1, n, file_); *result = Slice(scratch, r); if (r < n) { if (feof(file_)) { // We leave status as ok if we hit the end of the file
} else { // A partial read with an error: return a non-ok status s = IOError(filename_, errno); } } return s; } virtual Status Skip(uint64_t n) { if (fseek(file_, n, SEEK_CUR)) { return IOError(filename_, errno); } return Status::OK(); } };

這就是leveldb從磁碟讀取檔案的介面了,用的是C的流檔案操作和FILE結構體。

隨機讀:

class PosixRandomAccessFile: public RandomAccessFile {
 private:
  std::string filename_;
  int fd_;
 public:
  PosixRandomAccessFile(const std::string& fname, int fd)
      : filename_(fname), fd_(fd) { }
  virtual ~PosixRandomAccessFile() { close(fd_); }
  virtual Status Read(uint64_t offset, size_t n, Slice* result,
                      char* scratch) const {
    Status s;
    ssize_t r = pread(fd_, scratch, n, static_cast<off_t>(offset));
    *result = Slice(scratch, (r < 0) ? 0 : r);
    if (r < 0) {
      // An error: return a non-ok status
      s = IOError(filename_, errno);
    }
    return s;
  }
};

隨機讀取磁碟中檔案的資料,可以定位讀取檔案中offset偏移量的資料,核心函式pread()函式,可以帶偏移量地原子的從檔案中讀取資料,offset:讀取的
其實地址偏移量,讀取地址=檔案地址+offset。

寫入:

class PosixWritableFile : public WritableFile {
 private:
  std::string filename_;
  FILE* file_;
 public:
  PosixWritableFile(const std::string& fname, FILE* f)
      : filename_(fname), file_(f) { }
  ~PosixWritableFile() {
    if (file_ != NULL) {
      // Ignoring any potential errors
      fclose(file_);
    }
  }
  virtual Status Append(const Slice& data) {
    size_t r = fwrite_unlocked(data.data(), 1, data.size(), file_);
    if (r != data.size()) {
      return IOError(filename_, errno);
    }
    return Status::OK();
  }
  virtual Status Close() {
    Status result;
    if (fclose(file_) != 0) {
      result = IOError(filename_, errno);
    }
    file_ = NULL;
    return result;
  }
  virtual Status Flush() {
    if (fflush_unlocked(file_) != 0) {
      return IOError(filename_, errno);
    }
    return Status::OK();
  }
  Status SyncDirIfManifest() {
    const char* f = filename_.c_str();
    const char* sep = strrchr(f, '/');
    Slice basename;
    std::string dir;
    if (sep == NULL) {
      dir = ".";
      basename = f;
    } else {
      dir = std::string(f, sep - f);
      basename = sep + 1;
    }
    Status s;
    if (basename.starts_with("MANIFEST")) {
      int fd = open(dir.c_str(), O_RDONLY);
      if (fd < 0) {
        s = IOError(dir, errno);
      } else {
        if (fsync(fd) < 0) {
          s = IOError(dir, errno);
        }
        close(fd);
      }
    }
    return s;
  }
  virtual Status Sync() {
    // Ensure new files referred to by the manifest are in the filesystem.
    Status s = SyncDirIfManifest();
    if (!s.ok()) {
      return s;
    }
    if (fflush_unlocked(file_) != 0 ||
        fdatasync(fileno(file_)) != 0) {
      s = Status::IOError(filename_, strerror(errno));
    }
    return s;
  }
};

sync( )函式是同步快取和磁碟中的資料,核心函式fflush(清空輸出緩衝區,並把緩衝區內容輸出),在比較關鍵的寫入操作時,立即同步可以防止系統掉電時資料丟失。
定義完這三個檔案的讀寫抽象類,把他們加入到PosixEnv類中,定義三個NewSequentialFile、NewRandomAccessFile、NewWritableFile函式,產生檔案讀寫的物件,在程式上層中呼叫env_->NewWritableFile即可建立一個檔案,並可寫入資料。

檔案的上鎖:

static int LockOrUnlock(int fd, bool lock) {
  errno = 0;
  struct flock f;
  memset(&f, 0, sizeof(f));
  f.l_type = (lock ? F_WRLCK : F_UNLCK);
  f.l_whence = SEEK_SET;
  f.l_start = 0;
  f.l_len = 0;        // Lock/unlock entire file
  return fcntl(fd, F_SETLK, &f);
}

fd是檔案的i節點(linux每個檔案和資料夾都對應一個唯一的i節點,用於管理檔案)。核心呼叫函式fcntl,F_WRLK和F_UNLK是命令的引數,上鎖和解鎖。在PosixEnv的成員函式LockFile和UnlockFile都呼叫了此函式,此外還將上鎖的檔案儲存在了PosixEnv::PosixLockTable locks_成員變數中,PosixLockTable是一個類,內有成員std::set locked_files_儲存上鎖的檔名,定義了insert和remove操作,然而並沒看出用處o(╯□╰)o?

PosixEnv還有一個很重要的功能,計劃任務,也就是後臺的compact程序,compact目的是維護資料庫的均衡性,保持資料庫查詢的高效率。PosixEnv中定義了一個任務佇列:

  struct BGItem { void* arg; void (*function)(void*); };
  //用的是deque雙向連結串列作為底層的資料結構,按時間順序執行任務,沒有設立優先順序佇列
  typedef std::deque<BGItem> BGQueue;
  BGQueue queue_;

主程序一旦判定需要進行compact操作,就把compact任務壓進佇列queue_中,BGItem是存有任務函式和db物件指標的結構。而後臺程序從一開始就不斷根據佇列中的函式指標執行compact任務。BGThread()函式就是不停的在queue_中取出函式指標,執行。

void PosixEnv::Schedule(void (*function)(void*), void* arg) {
  PthreadCall("lock", pthread_mutex_lock(&mu_));
  // Start background thread if necessary
  //開啟後臺程序,如果沒有開啟
  if (!started_bgthread_) {
    started_bgthread_ = true;
    PthreadCall(
        "create thread",
        pthread_create(&bgthread_, NULL,  &PosixEnv::BGThreadWrapper, this));
  }
  // If the queue is currently empty, the background thread may currently be
  // waiting.
  if (queue_.empty()) {
    PthreadCall("signal", pthread_cond_signal(&bgsignal_));
  }
  // Add to priority queue
  //將任務壓入佇列
  queue_.push_back(BGItem());
  queue_.back().function = function;
  queue_.back().arg = arg;
  PthreadCall("unlock", pthread_mutex_unlock(&mu_));
}
void PosixEnv::BGThread() {
  while (true) {
    // Wait until there is an item that is ready to run
    PthreadCall("lock", pthread_mutex_lock(&mu_));
    while (queue_.empty()) {
      PthreadCall("wait", pthread_cond_wait(&bgsignal_, &mu_));
    }
    //從佇列取出函式,執行
    void (*function)(void*) = queue_.front().function;
    void* arg = queue_.front().arg;
    queue_.pop_front();
    PthreadCall("unlock", pthread_mutex_unlock(&mu_));
    (*function)(arg);
  }
}

後臺程序一直執行queue_中的任務,由於queue_是動態的,自然需要考慮queue_空了怎麼辦,leveldb採用的是條件量pthread_cond_t bgsignal_,佇列空了就進入等待,直至主程序有新的任務加入進來,而條件變數一般是要和pthread_mutex_t mu_搭配使用,防止某些邏輯錯誤。核心函式有:

  • pthread_mutex_init(),pthread_cond_init()條件變數、互斥鎖的初始化
  • pthread_mutex_lock(),pthread_mutex_unlock()關鎖解鎖
  • pthread_cond_signal(),pthread_cond_wait()條件變數成立,等待條件成立
  • pthread_create建立子執行緒

此外PosixEnv中還有FileExists、GetChildren、DeleteFile、CreateDir、DeleteDir、GetFileSize、RenameFile等等函式,它們的作用就如它們的名字所說一樣。

好了,從env給我的收穫就是:

  1. 利用虛基類的特性提供了預設的實現,也開放了使用者自定義操作的許可權
  2. 面向物件程式設計正規化的學習,把一切操作定義成類
  3. 檔案的加鎖解鎖,執行緒的同步
  4. C的檔案流操作,對檔名的字元提取操作,建立、刪除檔案和路徑,這些都可以直接用到將來自己的專案中