leveldb學習:Env
考慮到移植以及靈活性,leveldb將系統相關的處理(檔案/程序/時間)抽象成Evn,使用者可以自己實現相應的介面,作為option傳入,預設使用自帶的實現。
解壓目錄/util/中放有env的宣告和實現程式碼。env.h中聲明瞭:
- 虛基類env,在env_posix.cc中,派生類PosixEnv繼承自env類,是leveldb的預設實現。
- 虛基類WritableFile、SequentialFile、RandomAccessFile,分別是檔案的寫抽象類,順序讀抽象類和隨機讀抽象類
- 類Logger,log檔案的寫入介面,log檔案是防止系統異常終止造成資料丟失,是memtable在磁碟的備份
- 類FileLock,為檔案上鎖
- WriteStringToFile、ReadFileToString、Log三個全域性函式,封裝了上述介面
下面來看看env_posix.cc中為我們寫好的預設實現
順序讀:
class PosixSequentialFile: public SequentialFile {
private:
std::string filename_;
FILE* file_;
public:
PosixSequentialFile(const std::string& fname, FILE* f)
: filename_(fname), file_ (f) { }
virtual ~PosixSequentialFile() { fclose(file_); }
virtual Status Read(size_t n, Slice* result, char* scratch) {
Status s;
size_t r = fread_unlocked(scratch, 1, n, file_);
*result = Slice(scratch, r);
if (r < n) {
if (feof(file_)) {
// We leave status as ok if we hit the end of the file
} else {
// A partial read with an error: return a non-ok status
s = IOError(filename_, errno);
}
}
return s;
}
virtual Status Skip(uint64_t n) {
if (fseek(file_, n, SEEK_CUR)) {
return IOError(filename_, errno);
}
return Status::OK();
}
};
這就是leveldb從磁碟讀取檔案的介面了,用的是C的流檔案操作和FILE結構體。
隨機讀:
class PosixRandomAccessFile: public RandomAccessFile {
private:
std::string filename_;
int fd_;
public:
PosixRandomAccessFile(const std::string& fname, int fd)
: filename_(fname), fd_(fd) { }
virtual ~PosixRandomAccessFile() { close(fd_); }
virtual Status Read(uint64_t offset, size_t n, Slice* result,
char* scratch) const {
Status s;
ssize_t r = pread(fd_, scratch, n, static_cast<off_t>(offset));
*result = Slice(scratch, (r < 0) ? 0 : r);
if (r < 0) {
// An error: return a non-ok status
s = IOError(filename_, errno);
}
return s;
}
};
隨機讀取磁碟中檔案的資料,可以定位讀取檔案中offset偏移量的資料,核心函式pread()函式,可以帶偏移量地原子的從檔案中讀取資料,offset:讀取的
其實地址偏移量,讀取地址=檔案地址+offset。
寫入:
class PosixWritableFile : public WritableFile {
private:
std::string filename_;
FILE* file_;
public:
PosixWritableFile(const std::string& fname, FILE* f)
: filename_(fname), file_(f) { }
~PosixWritableFile() {
if (file_ != NULL) {
// Ignoring any potential errors
fclose(file_);
}
}
virtual Status Append(const Slice& data) {
size_t r = fwrite_unlocked(data.data(), 1, data.size(), file_);
if (r != data.size()) {
return IOError(filename_, errno);
}
return Status::OK();
}
virtual Status Close() {
Status result;
if (fclose(file_) != 0) {
result = IOError(filename_, errno);
}
file_ = NULL;
return result;
}
virtual Status Flush() {
if (fflush_unlocked(file_) != 0) {
return IOError(filename_, errno);
}
return Status::OK();
}
Status SyncDirIfManifest() {
const char* f = filename_.c_str();
const char* sep = strrchr(f, '/');
Slice basename;
std::string dir;
if (sep == NULL) {
dir = ".";
basename = f;
} else {
dir = std::string(f, sep - f);
basename = sep + 1;
}
Status s;
if (basename.starts_with("MANIFEST")) {
int fd = open(dir.c_str(), O_RDONLY);
if (fd < 0) {
s = IOError(dir, errno);
} else {
if (fsync(fd) < 0) {
s = IOError(dir, errno);
}
close(fd);
}
}
return s;
}
virtual Status Sync() {
// Ensure new files referred to by the manifest are in the filesystem.
Status s = SyncDirIfManifest();
if (!s.ok()) {
return s;
}
if (fflush_unlocked(file_) != 0 ||
fdatasync(fileno(file_)) != 0) {
s = Status::IOError(filename_, strerror(errno));
}
return s;
}
};
sync( )函式是同步快取和磁碟中的資料,核心函式fflush(清空輸出緩衝區,並把緩衝區內容輸出),在比較關鍵的寫入操作時,立即同步可以防止系統掉電時資料丟失。
定義完這三個檔案的讀寫抽象類,把他們加入到PosixEnv類中,定義三個NewSequentialFile、NewRandomAccessFile、NewWritableFile函式,產生檔案讀寫的物件,在程式上層中呼叫env_->NewWritableFile即可建立一個檔案,並可寫入資料。
檔案的上鎖:
static int LockOrUnlock(int fd, bool lock) {
errno = 0;
struct flock f;
memset(&f, 0, sizeof(f));
f.l_type = (lock ? F_WRLCK : F_UNLCK);
f.l_whence = SEEK_SET;
f.l_start = 0;
f.l_len = 0; // Lock/unlock entire file
return fcntl(fd, F_SETLK, &f);
}
fd是檔案的i節點(linux每個檔案和資料夾都對應一個唯一的i節點,用於管理檔案)。核心呼叫函式fcntl,F_WRLK和F_UNLK是命令的引數,上鎖和解鎖。在PosixEnv的成員函式LockFile和UnlockFile都呼叫了此函式,此外還將上鎖的檔案儲存在了PosixEnv::PosixLockTable locks_成員變數中,PosixLockTable是一個類,內有成員std::set locked_files_儲存上鎖的檔名,定義了insert和remove操作,然而並沒看出用處o(╯□╰)o?
PosixEnv還有一個很重要的功能,計劃任務,也就是後臺的compact程序,compact目的是維護資料庫的均衡性,保持資料庫查詢的高效率。PosixEnv中定義了一個任務佇列:
struct BGItem { void* arg; void (*function)(void*); };
//用的是deque雙向連結串列作為底層的資料結構,按時間順序執行任務,沒有設立優先順序佇列
typedef std::deque<BGItem> BGQueue;
BGQueue queue_;
主程序一旦判定需要進行compact操作,就把compact任務壓進佇列queue_中,BGItem是存有任務函式和db物件指標的結構。而後臺程序從一開始就不斷根據佇列中的函式指標執行compact任務。BGThread()函式就是不停的在queue_中取出函式指標,執行。
void PosixEnv::Schedule(void (*function)(void*), void* arg) {
PthreadCall("lock", pthread_mutex_lock(&mu_));
// Start background thread if necessary
//開啟後臺程序,如果沒有開啟
if (!started_bgthread_) {
started_bgthread_ = true;
PthreadCall(
"create thread",
pthread_create(&bgthread_, NULL, &PosixEnv::BGThreadWrapper, this));
}
// If the queue is currently empty, the background thread may currently be
// waiting.
if (queue_.empty()) {
PthreadCall("signal", pthread_cond_signal(&bgsignal_));
}
// Add to priority queue
//將任務壓入佇列
queue_.push_back(BGItem());
queue_.back().function = function;
queue_.back().arg = arg;
PthreadCall("unlock", pthread_mutex_unlock(&mu_));
}
void PosixEnv::BGThread() {
while (true) {
// Wait until there is an item that is ready to run
PthreadCall("lock", pthread_mutex_lock(&mu_));
while (queue_.empty()) {
PthreadCall("wait", pthread_cond_wait(&bgsignal_, &mu_));
}
//從佇列取出函式,執行
void (*function)(void*) = queue_.front().function;
void* arg = queue_.front().arg;
queue_.pop_front();
PthreadCall("unlock", pthread_mutex_unlock(&mu_));
(*function)(arg);
}
}
後臺程序一直執行queue_中的任務,由於queue_是動態的,自然需要考慮queue_空了怎麼辦,leveldb採用的是條件量pthread_cond_t bgsignal_,佇列空了就進入等待,直至主程序有新的任務加入進來,而條件變數一般是要和pthread_mutex_t mu_搭配使用,防止某些邏輯錯誤。核心函式有:
- pthread_mutex_init(),pthread_cond_init()條件變數、互斥鎖的初始化
- pthread_mutex_lock(),pthread_mutex_unlock()關鎖解鎖
- pthread_cond_signal(),pthread_cond_wait()條件變數成立,等待條件成立
- pthread_create建立子執行緒
此外PosixEnv中還有FileExists、GetChildren、DeleteFile、CreateDir、DeleteDir、GetFileSize、RenameFile等等函式,它們的作用就如它們的名字所說一樣。
好了,從env給我的收穫就是:
- 利用虛基類的特性提供了預設的實現,也開放了使用者自定義操作的許可權
- 面向物件程式設計正規化的學習,把一切操作定義成類
- 檔案的加鎖解鎖,執行緒的同步
- C的檔案流操作,對檔名的字元提取操作,建立、刪除檔案和路徑,這些都可以直接用到將來自己的專案中