1. 程式人生 > 其它 >Rocksdb iterator和snapshot 介面

Rocksdb iterator和snapshot 介面

Rocksdb提供迭代器來來訪問整個db中的資料,就像STL中的迭代器功能一樣,用來訪問容器中的具體的資料。

訪問形式以及訪問介面有如下幾種:

  • 遍歷所有的key-value
    //開啟db,並初始化一個迭代器指標
    rocksdb::Iterator* it = db->NewIterator(rocksdb::ReadOptions());
    for (it->SeekToFirst(); it->Valid(); it->Next()) {
        cout << it->key().ToString() << ": " << it->
    value().ToString() << endl; } assert(it->status().ok()); // Check for any errors found during the scan delete it;
  • 輸出一個範圍內的key-value,[small, big)
    for (it->Seek(small);
       it->Valid() && it->key().ToString() < big;
       it->Next()) {
    ...
    }
    assert(it->status().ok()); // Check for any errors found during the scan
  • 反向遍歷db中的元素
    for (it->SeekToLast(); it->Valid(); it->Prev()) {
    ...
    }
    assert(it->status().ok()); // Check for any errors found during the scan
    
  • 反向遍歷一個指定範圍的key,如(small, big]
    for (it->SeekForPrev(start);
       it->Valid() && it->key().ToString() > limit;
       it->Prev()) {
    ... } assert(it->status().ok()); // Check for any errors found during the scan

迭代器的介面可以算是 rocksdb針對客戶端的核心介面,主要是提供排序以及高效查詢的功能。

測試程式碼如下:

#include <iostream>
#include <string>
#include <rocksdb/db.h>
#include <rocksdb/iterator.h>
#include <rocksdb/table.h>
#include <rocksdb/options.h>
#include <rocksdb/env.h>

using namespace std;


static string rand_key(unsigned long long key_range) {
    char buff[30];
    unsigned long long n = 1;

    for (int i =1; i <= 4; ++i) {
        n *= (unsigned long long ) rand();
    }

    sprintf(buff, "%llu", n % key_range);

    string k(buff);
    return k;
}

int main() {
    rocksdb::DB *db;
    rocksdb::Options option;

    option.create_if_missing = true;
    option.compression = rocksdb::CompressionType::kNoCompression;

    rocksdb::Status s = rocksdb::DB::Open(option, "./iterator_db", &db);
    if (!s.ok()) {
        cout << "Open failed with " << s.ToString() << endl;
        exit(1);
    }

    rocksdb::DestroyDB("./iterator_db", option);

	cout << "seek all keys : " << endl;
    for(int i = 0; i < 5; i ++) {
        rocksdb::Status s = db->Put(rocksdb::WriteOptions(), 
                                rand_key(9), string(10, 'a' + (i % 26)) );

        if (!s.ok()) {
            cout << "Put failed with " << s.ToString() << endl;
            exit(1);
        }
    }   
    
    /* traverse rocksdb key-value */
    rocksdb::Iterator *it = db->NewIterator(rocksdb::ReadOptions());
    for (it->SeekToFirst(); it->Valid(); it->Next()) {
        cout << it->key().ToString() << ": " << it->value().ToString() << endl;
    }

    string limit="4";
    string start="2";
    cout << "seek from '2' to '4' : " << endl;
    for(it->Seek(start); it->Valid()&&it->key().ToString() < limit;
        it->Next()) {
            cout << it->key().ToString() << ": " << it->value().ToString() << endl;
        } 
    assert(it->status().ok());

    cout << "seek from last to start :" << endl;
    for (it->SeekToLast(); it->Valid(); it->Prev()) {
        cout << it->key().ToString() << ": " << it->value().ToString() << endl;
    }
    assert(it->status().ok());

    cout << "seek from '4' to '2' :" << endl;
    for(it->SeekForPrev(limit); it->Valid()&&it->key().ToString() > start;
        it->Prev()) {
            cout << it->key().ToString() << ": " << it->value().ToString() << endl;
        } 
    assert(it->status().ok());
        delete it;

    db->Close();
    delete db;

    return 0;
}

輸出如下:

seek all keys : 
3: cccccccccc
4: dddddddddd
7: bbbbbbbbbb
8: eeeeeeeeee
seek from '2' to '4' : 
3: cccccccccc
seek from last to start :
8: eeeeeeeeee
7: bbbbbbbbbb
4: dddddddddd
3: cccccccccc
seek from '4' to '2' :
4: dddddddddd
3: cccccccccc

且上層使用rocksdb迭代器介面時一般會和snapshot介面一同使用,用來實現MVCC的版本控制功能。
關於snapshot的實現,我們在Rocksdb事務:隔離性的實現中有提到,感興趣的可以看看。

關於snapshot的客戶端介面主要有:

  • sp1 = db->GetSnapshot(); 在當前db狀態下建立一個snapshot,新增到內部維護的一個全域性的snapshotImpl的雙向連結串列中,並返回該snapshot的物件
  • read_option.snapshot = sp1; 將獲取到的snapshot 傳給read_option,進行Get操作
  • db->ReleaseSnapshot(sp1); 釋放snapshot相關的資源(從雙向連結串列中刪除該節點)

隔離性的測試程式碼如下:

#include <iostream>
#include <string>
#include <rocksdb/db.h>
#include <rocksdb/iterator.h>
#include <rocksdb/table.h>
#include <rocksdb/options.h>
#include <rocksdb/env.h>

using namespace std;

int main() {
    rocksdb::DB *db;
    rocksdb::Options option;

    option.create_if_missing = true;
    option.compression = rocksdb::CompressionType::kNoCompression;

    rocksdb::Status s = rocksdb::DB::Open(option, "./iterator_db", &db);
    if (!s.ok()) {
        cout << "Open failed with " << s.ToString() << endl;
        exit(1);
    }
    // set a snapshot before put
    const rocksdb::Snapshot *sp1 = db->GetSnapshot(); 

    s = db->Put(rocksdb::WriteOptions(), "sp2", "value_sp2");
    assert(s.ok());

	// set a snapshot after put
    const rocksdb::Snapshot *sp2 = db->GetSnapshot();

    rocksdb::ReadOptions read_option;
    read_option.snapshot = sp1;
    string value = "";
    //預期獲取不到sp2的value,因為這裡用的是sp1的快照
    s = db->Get(read_option, "sp2", &value); 
    if(value == "") {
        cout << "Can't get sp2 at sp1!" << endl;
    }

    read_option.snapshot = sp2;
    // 能夠獲取到,使用的是sp2的快照,其是在put之後設定的
    s = db->Get(read_option, "sp2", &value); 
    assert(s.ok());
    if(value != "") {
        cout << "Got sp2's value: " << value << endl;
    }

    db->ReleaseSnapshot(sp1);
    db->ReleaseSnapshot(sp2);

輸出如下:

Can't get sp2 at sp1!
Got sp2's value: value_sp2

當然rocksdb也提供了更為複雜的mvcc特性,來以事務的方式支援不同的隔離級別。

Rocksdb提供迭代器來來訪問整個db中的資料,就像STL中的迭代器功能一樣,用來訪問容器中的具體的資料。

訪問形式以及訪問介面有如下幾種:

  • 遍歷所有的key-value
    //開啟db,並初始化一個迭代器指標
    rocksdb::Iterator* it = db->NewIterator(rocksdb::ReadOptions());
    for (it->SeekToFirst(); it->Valid(); it->Next()) {
        cout << it->key().ToString() << ": " << it->value().ToString() << endl;
    }
    assert(it->status().ok()); // Check for any errors found during the scan
    delete it;
    
  • 輸出一個範圍內的key-value,[small, big)
    for (it->Seek(small);
       it->Valid() && it->key().ToString() < big;
       it->Next()) {
    ...
    }
    assert(it->status().ok()); // Check for any errors found during the scan
    
  • 反向遍歷db中的元素
    for (it->SeekToLast(); it->Valid(); it->Prev()) {
    ...
    }
    assert(it->status().ok()); // Check for any errors found during the scan
    
  • 反向遍歷一個指定範圍的key,如(small, big]
    for (it->SeekForPrev(start);
       it->Valid() && it->key().ToString() > limit;
       it->Prev()) {
    ...
    }
    assert(it->status().ok()); // Check for any errors found during the scan
    

迭代器的介面可以算是 rocksdb針對客戶端的核心介面,主要是提供排序以及高效查詢的功能。

測試程式碼如下:

#include <iostream>
#include <string>
#include <rocksdb/db.h>
#include <rocksdb/iterator.h>
#include <rocksdb/table.h>
#include <rocksdb/options.h>
#include <rocksdb/env.h>

using namespace std;


static string rand_key(unsigned long long key_range) {
    char buff[30];
    unsigned long long n = 1;

    for (int i =1; i <= 4; ++i) {
        n *= (unsigned long long ) rand();
    }

    sprintf(buff, "%llu", n % key_range);

    string k(buff);
    return k;
}

int main() {
    rocksdb::DB *db;
    rocksdb::Options option;

    option.create_if_missing = true;
    option.compression = rocksdb::CompressionType::kNoCompression;

    rocksdb::Status s = rocksdb::DB::Open(option, "./iterator_db", &db);
    if (!s.ok()) {
        cout << "Open failed with " << s.ToString() << endl;
        exit(1);
    }

    rocksdb::DestroyDB("./iterator_db", option);

	cout << "seek all keys : " << endl;
    for(int i = 0; i < 5; i ++) {
        rocksdb::Status s = db->Put(rocksdb::WriteOptions(), 
                                rand_key(9), string(10, 'a' + (i % 26)) );

        if (!s.ok()) {
            cout << "Put failed with " << s.ToString() << endl;
            exit(1);
        }
    }   
    
    /* traverse rocksdb key-value */
    rocksdb::Iterator *it = db->NewIterator(rocksdb::ReadOptions());
    for (it->SeekToFirst(); it->Valid(); it->Next()) {
        cout << it->key().ToString() << ": " << it->value().ToString() << endl;
    }

    string limit="4";
    string start="2";
    cout << "seek from '2' to '4' : " << endl;
    for(it->Seek(start); it->Valid()&&it->key().ToString() < limit;
        it->Next()) {
            cout << it->key().ToString() << ": " << it->value().ToString() << endl;
        } 
    assert(it->status().ok());

    cout << "seek from last to start :" << endl;
    for (it->SeekToLast(); it->Valid(); it->Prev()) {
        cout << it->key().ToString() << ": " << it->value().ToString() << endl;
    }
    assert(it->status().ok());

    cout << "seek from '4' to '2' :" << endl;
    for(it->SeekForPrev(limit); it->Valid()&&it->key().ToString() > start;
        it->Prev()) {
            cout << it->key().ToString() << ": " << it->value().ToString() << endl;
        } 
    assert(it->status().ok());
        delete it;

    db->Close();
    delete db;

    return 0;
}

輸出如下:

seek all keys : 
3: cccccccccc
4: dddddddddd
7: bbbbbbbbbb
8: eeeeeeeeee
seek from '2' to '4' : 
3: cccccccccc
seek from last to start :
8: eeeeeeeeee
7: bbbbbbbbbb
4: dddddddddd
3: cccccccccc
seek from '4' to '2' :
4: dddddddddd
3: cccccccccc

且上層使用rocksdb迭代器介面時一般會和snapshot介面一同使用,用來實現MVCC的版本控制功能。
關於snapshot的實現,我們在Rocksdb事務:隔離性的實現中有提到,感興趣的可以看看。

關於snapshot的客戶端介面主要有:

  • sp1 = db->GetSnapshot(); 在當前db狀態下建立一個snapshot,新增到內部維護的一個全域性的snapshotImpl的雙向連結串列中,並返回該snapshot的物件
  • read_option.snapshot = sp1; 將獲取到的snapshot 傳給read_option,進行Get操作
  • db->ReleaseSnapshot(sp1); 釋放snapshot相關的資源(從雙向連結串列中刪除該節點)

隔離性的測試程式碼如下:

#include <iostream>
#include <string>
#include <rocksdb/db.h>
#include <rocksdb/iterator.h>
#include <rocksdb/table.h>
#include <rocksdb/options.h>
#include <rocksdb/env.h>

using namespace std;

int main() {
    rocksdb::DB *db;
    rocksdb::Options option;

    option.create_if_missing = true;
    option.compression = rocksdb::CompressionType::kNoCompression;

    rocksdb::Status s = rocksdb::DB::Open(option, "./iterator_db", &db);
    if (!s.ok()) {
        cout << "Open failed with " << s.ToString() << endl;
        exit(1);
    }
    // set a snapshot before put
    const rocksdb::Snapshot *sp1 = db->GetSnapshot(); 

    s = db->Put(rocksdb::WriteOptions(), "sp2", "value_sp2");
    assert(s.ok());

	// set a snapshot after put
    const rocksdb::Snapshot *sp2 = db->GetSnapshot();

    rocksdb::ReadOptions read_option;
    read_option.snapshot = sp1;
    string value = "";
    //預期獲取不到sp2的value,因為這裡用的是sp1的快照
    s = db->Get(read_option, "sp2", &value); 
    if(value == "") {
        cout << "Can't get sp2 at sp1!" << endl;
    }

    read_option.snapshot = sp2;
    // 能夠獲取到,使用的是sp2的快照,其是在put之後設定的
    s = db->Get(read_option, "sp2", &value); 
    assert(s.ok());
    if(value != "") {
        cout << "Got sp2's value: " << value << endl;
    }

    db->ReleaseSnapshot(sp1);
    db->ReleaseSnapshot(sp2);

輸出如下:

Can't get sp2 at sp1!
Got sp2's value: value_sp2

當然rocksdb也提供了更為複雜的mvcc特性,來以事務的方式支援不同的隔離級別。