1. 程式人生 > 實用技巧 >【資料庫核心】RocksDB:事務鎖設計與實現

【資料庫核心】RocksDB:事務鎖設計與實現

本文主要介紹 RocksDB 鎖結構設計、加鎖解鎖過程,並與 InnoDB 鎖實現做一個簡單對比。

本文由作者授權釋出,未經許可,請勿轉載。

作者:王剛,網易杭研資料庫核心開發工程師

MyRocks 引擎目前是支援行鎖的,包括共享鎖和排它鎖,主要是在 RocksDB 層面實現的,與 InnoDB 引擎的鎖系統相比,簡單很多。

本文主要介紹 RocksDB 鎖結構設計、加鎖解鎖過程,並與 InnoDB 鎖實現做一個簡單對比。

事務鎖的實現類是:TransactionLockMgr ,它的主要資料成員包括:

private:
PessimisticTransactionDB* txn_db_impl_;
// 預設16個lock map 分片
const size_t default_num_stripes_;
// 每個column family 最大行鎖數
const int64_t max_num_locks_;
// lock map 互斥鎖
InstrumentedMutex lock_map_mutex_; // Map of ColumnFamilyId to locked key info
using LockMaps = std::unordered_map<uint32_t, std::shared_ptr<LockMap>>;
LockMaps lock_maps_;
std::unique_ptr<ThreadLocalPtr> lock_maps_cache_;
// Must be held when modifying wait_txn_map_ and rev_wait_txn_map_.
std::mutex wait_txn_map_mutex_;
// Maps from waitee -> number of waiters.
HashMap<TransactionID, int> rev_wait_txn_map_;
// Maps from waiter -> waitee.
HashMap<TransactionID, TrackedTrxInfo> wait_txn_map_;
DeadlockInfoBuffer dlock_buffer_;
// Used to allocate mutexes/condvars to use when locking keys
std::shared_ptr<TransactionDBMutexFactory> mutex_factory_;

加鎖的入口函式是:TransactionLockMgr::TryLock

Status TransactionLockMgr::TryLock(PessimisticTransaction* txn, //加鎖的事務
uint32_t column_family_id, //所屬的CF
const std::string& key, //加鎖的健 Env* env,
bool exclusive //是否排它鎖) {
// Lookup lock map for this column family id
std::shared_ptr<LockMap> lock_map_ptr = GetLockMap(column_family_id); //1. 根據 cf id 查詢其 LockMap
LockMap* lock_map = lock_map_ptr.get();
if (lock_map == nullptr) {
char msg[];
snprintf(msg, sizeof(msg), "Column family id not found: %" PRIu32,
column_family_id); return Status::InvalidArgument(msg);
} // Need to lock the mutex for the stripe that this key hashes to
size_t stripe_num = lock_map->GetStripe(key);// 2. 根據key 的雜湊獲取 stripe_num,預設16個stripe
assert(lock_map->lock_map_stripes_.size() > stripe_num);
LockMapStripe* stripe = lock_map->lock_map_stripes_.at(stripe_num); LockInfo lock_info(txn->GetID(), txn->GetExpirationTime(), exclusive);
int64_t timeout = txn->GetLockTimeout(); return AcquireWithTimeout(txn, lock_map, stripe, column_family_id, key, env,
timeout, lock_info); // 實際加鎖函式
}

GetLockMap(column_family_id) 函式根據 cf id 查詢其 LockMap , 其邏輯包括兩步:

  1. 在 thread 本地快取 lock_maps_cache 中查詢;
  2. 第1步沒有查詢到,則去全域性的 lock_map_ 中查詢。
std::shared_ptr<LockMap> TransactionLockMgr::GetLockMap(
uint32_t column_family_id) {
// First check thread-local cache
if (lock_maps_cache_->Get() == nullptr) {
lock_maps_cache_->Reset(new LockMaps());
}
auto lock_maps_cache = static_cast<LockMaps*>(lock_maps_cache_->Get());
auto lock_map_iter = lock_maps_cache->find(column_family_id);
if (lock_map_iter != lock_maps_cache->end()) {
// Found lock map for this column family.
return lock_map_iter->second;
}
// Not found in local cache, grab mutex and check shared LockMaps
InstrumentedMutexLock l(&lock_map_mutex_);
lock_map_iter = lock_maps_.find(column_family_id);
if (lock_map_iter == lock_maps_.end()) {
return std::shared_ptr<LockMap>(nullptr);
} else {
// Found lock map. Store in thread-local cache and return.
std::shared_ptr<LockMap>& lock_map = lock_map_iter->second;
lock_maps_cache->insert({column_family_id, lock_map});
return lock_map;
}
}

lock_maps_ 是全域性鎖結構:

 // Map of ColumnFamilyId to locked key info
using LockMaps = std::unordered_map<uint32_t, std::shared_ptr<LockMap>>;
LockMaps lock_maps_;

LockMap 是每個 CF 的鎖結構:

struct LockMap {

  // Number of sepearate LockMapStripes to create, each with their own Mutex
const size_t num_stripes_; // Count of keys that are currently locked in this column family.
// (Only maintained if TransactionLockMgr::max_num_locks_ is positive.)
std::atomic<int64_t> lock_cnt{}; std::vector<LockMapStripe*> lock_map_stripes_; };

為了減少加鎖時mutex 的爭用,LockMap 內部又進行了分片,num_stripes_ = 16(預設值),
LockMapStripe 是每個分片的鎖結構:

struct LockMapStripe {

  // Mutex must be held before modifying keys map
std::shared_ptr<TransactionDBMutex> stripe_mutex; // Condition Variable per stripe for waiting on a lock
std::shared_ptr<TransactionDBCondVar> stripe_cv; // Locked keys mapped to the info about the transactions that locked them.
// TODO(agiardullo): Explore performance of other data structures.
std::unordered_map<std::string, LockInfo> keys;
};

LockMapStripe 內部還是一個 unordered_map, 還包括 stripe_mutex、stripe_cv 。這樣設計避免了一把大鎖的尷尬,減小鎖的粒度常用的方法,LockInfo 包含事務id:

struct LockInfo {
bool exclusive;
autovector<TransactionID> txn_ids;
// Transaction locks are not valid after this time in us
uint64_t expiration_time;
};

LockMaps 、LockMap、LockMapStripe 、LockInfo就是RocksDB 事務鎖用到的資料結構了,可以看到並不複雜,程式碼實現起來簡單,代價當然也有,後文在介紹再介紹。

AcquireWithTimeout 函式內部先獲取 stripe mutex ,獲取到了在進入AcquireLocked 函式:

if (timeout < ) {
// If timeout is negative, we wait indefinitely to acquire the lock
result = stripe->stripe_mutex->Lock();
} else {
result = stripe->stripe_mutex->TryLockFor(timeout);
}

獲取了 stripe_mutex之後,準備獲取鎖:

// Acquire lock if we are able to
uint64_t expire_time_hint = ;
autovector<TransactionID> wait_ids;
result = AcquireLocked(lock_map, stripe, key, env, lock_info,
&expire_time_hint, &wait_ids);

AcquireLocked 函式實現獲取鎖邏輯,它的實現邏輯是:

  1. 在 stripe 的 map 中查詢該key 是否已經被鎖住。
  2. 如果key沒有被鎖住,判斷是否超過了max_num_locks_,沒超過則在 stripe的map 中插入{key, txn_lock_info},超過了max_num_locks_,加鎖失敗,返回狀態資訊。
  3. 如果key已經被鎖住了,要判斷加在key上的鎖是排它鎖還是共享鎖,如果是共享鎖,那事務的加鎖請求可以滿足;
  4. 如果是排它鎖,如果是同一事務,加鎖請求可以滿足,如果不是同一事務,如果鎖沒有超時,則加鎖請求失敗,否則搶佔過來。
  5. 以上是加鎖過程,解鎖過程類似,也是需要根據cf_id 和 key 計算出落到哪個stripe上,然後就是從map中把資料清理掉,同時還要喚醒該stripe上的等待執行緒,這個喚醒的粒度有點大。
void TransactionLockMgr::UnLock(PessimisticTransaction* txn,
uint32_t column_family_id,
const std::string& key, Env* env) {
std::shared_ptr<LockMap> lock_map_ptr = GetLockMap(column_family_id);//通過cf_id 獲取Lock_Map
LockMap* lock_map = lock_map_ptr.get();
if (lock_map == nullptr) {
// Column Family must have been dropped.
return;
}
// Lock the mutex for the stripe that this key hashes to
size_t stripe_num = lock_map->GetStripe(key);//根據key 計算落到哪個stripe上
assert(lock_map->lock_map_stripes_.size() > stripe_num);
LockMapStripe* stripe = lock_map->lock_map_stripes_.at(stripe_num);
stripe->stripe_mutex->Lock();
UnLockKey(txn, key, stripe, lock_map, env); //從鎖的map中清理掉該key
stripe->stripe_mutex->UnLock();
// Signal waiting threads to retry locking
stripe->stripe_cv->NotifyAll();
}

ocksdb lock整體結構如下:

如果一個事務需要鎖住大量記錄,rocksdb 鎖的實現方式可能要比innodb 消耗更多的記憶體,innodb 的鎖結構如下圖所示:

由於鎖資訊是常駐記憶體,我們簡單分析下RocksDB鎖佔用的記憶體。每個鎖實際上是unordered_map中的一個元素,則鎖佔用的記憶體為key_length+8+8+1,假設key為bigint,佔8個位元組,則100w行記錄,需要消耗大約22M記憶體。但是由於記憶體與key_length正相關,導致RocksDB的記憶體消耗不可控。我們可以簡單算算RocksDB作為MySQL儲存引擎時,key_length的範圍。對於單列索引,最大值為2048個位元組,具體可以參考max_supported_key_part_length實現;對於複合索引,索引最大長度為3072個位元組,具體可以參考max_supported_key_length實現。

假設最壞的情況,key_length=3072,則100w行記錄,需要消耗3G記憶體,如果是鎖1億行記錄,則需要消耗300G記憶體,這種情況下記憶體會有撐爆的風險。因此RocksDB提供引數配置rocksdb_max_row_locks,確保記憶體可控,預設rocksdb_max_row_locks設定為1048576,對於大部分key為bigint場景,極端情況下,也需要消耗22G記憶體。而在這方面,InnoDB則比較友好,hash表的key是(space_id, page_no),所以無論key有多大,key部分的記憶體消耗都是恆定的。InnoDB在一個事務需要鎖大量記錄場景下是有優化的,多個記錄可以公用一把鎖,這樣也間接可以減少記憶體。

總結

RocksDB 事務鎖的實現整體來說不復雜,只支援行鎖,還不支援gap lock ,鎖佔的資源也比較大,可通過rocksdb_max_row_locks 限制事務施加行鎖的數量。

利益相關:

網易輕舟微服務,提供分散式事務框架GTXS,支援跨服務事務、跨資料來源事務、混合事務、事務狀態監控、異常事務處理等能力,應用案例工商銀行