【RocksDB】TransactionDB源碼分析
RocksDB版本:v5.13.4
- 概述
得益於LSM-Tree結構,RocksDB所有的寫入並非是update in-place,所以他支持起來事務的難度也相對較小,主要原理就是利用WriteBatch將事務所有寫操作在內存緩存打包,然後在commit時一次性將WriteBatch寫入,保證了原子,另外通過Sequence和Key鎖來解決沖突實現隔離。
RocksDB的Transaction分為兩類:Pessimistic和Optimistic,類似悲觀鎖和樂觀鎖的區別,PessimisticTransaction的沖突檢測和加鎖是在事務中每次寫操作之前做的(commit後釋放),如果失敗則該操作失敗;OptimisticTransaction不加鎖,沖突檢測是在commit階段做的,commit時發現沖突則失敗。
具體使用時需要結合實際場景來選擇,如果並發事務寫入操作的Key重疊度不高,那麽用Optimistic更合適一些(省掉Pessimistic中額外的鎖操作)
- 用法
介紹實現原理前,先來看一下用法:
【1. 基本用法】
Options options; TransactionDBOptions txn_db_options; options.create_if_missing = true; TransactionDB* txn_db; // 打開DB(默認Pessimistic) Status s = TransactionDB::Open(options, txn_db_options, kDBPath, &txn_db); assert(s.ok()); // 創建一個事務 Transaction* txn = txn_db->BeginTransaction(write_options); assert(txn); // 事務txn讀取一個key s = txn->Get(read_options, "abc", &value); assert(s.IsNotFound()); // 事務txn寫一個key s = txn->Put("abc", "def"); assert(s.ok()); // 通過TransactionDB::Get在事務外讀取一個key s = txn_db->Get(read_options, "abc", &value); // 通過TrasactionDB::Put在事務外寫一個key // 這裏並不會有影響,因為寫的不是"abc",不沖突 // 如果是"abc"的話 // 則Put會一直卡住直到超時或等待事務Commit(本例中會超時) s = txn_db->Put(write_options, "xyz", "zzz"); s = txn->Commit(); assert(s.ok()); // 析構事務 delete txn; delete txn_db;
通過BeginTransaction打開一個事務,然後調用Put、Get等接口進行事務操作,最後調用Commit進行提交。
【2. 回滾】
... // 事務txn寫入abc s = txn->Put("abc", "def"); assert(s.ok()); // 設置回滾點 txn->SetSavePoint(); // 事務txn寫入cba s = txn->Put("cba", "fed"); assert(s.ok()); // 回滾至回滾點 s = txn->RollbackToSavePoint(); // 提交,此時事務中不包含對cba的寫入 s = txn->Commit(); assert(s.ok()); ...
【3. GetForUpdate】
...
// 事務txn讀取abc並獨占該key,確保不被外部事務再修改
s = txn->GetForUpdate(read_options, “abc”, &value);
assert(s.ok());
// 通過TransactionDB::Put接口在事務外寫abc
// 不會成功
s = txn_db->Put(write_options, “abc”, “value0”);
s = txn->Commit();
assert(s.ok());
...
有時候在事務中需要對某一個key進行先讀後寫,此時則不能在寫時才進行該key的獨占及沖突檢測操作,所以使用GetForUpdate接口讀取該key並進行獨占
【4. SetSnapshot】
txn = txn_db->BeginTransaction(write_options);
// 設置事務txn使用的snapshot為當前全局Sequence Number
txn->SetSnapshot();
// 使用TransactionDB::Put接口在事務外部寫abc
// 此時全局Sequence Number會加1
db->Put(write_options, “key1”, “value0”);
assert(s.ok());
// 事務txn寫入abc
s = txn->Put(“abc”, “value1”);
s = txn->Commit();
// 這裏會失敗,因為在事務設置了snapshot之後,事務後來寫的key
// 在事務外部有過其他寫操作,所以這裏不會成功
// Pessimistic會在Put時失敗,Optimistic會在Commit時失敗
前面說過,TransactionDB在事務中需要寫入某個key時才對其進行獨占或沖突檢測,有時希望在事務一開始就對其之後所有要寫入的所有key進行獨占,此時可以通過SetSnapshot來實現,設置了Snapshot後,外部一旦對事務中將要進行寫操作key做過修改,則該事務最終會失敗(失敗點取決於是Pessimistic還是Optimistic,Pessimistic因為在Put時就進行沖突檢測,所以Put時就失敗,而Optimistic則會在Commit是檢測到沖突,失敗)
- 實現
3.1 WriteBatch & WriteBatchWithIndex
WriteBatch就不展開說了,事務會將所有的寫操作追加進同一個WriteBatch,直到Commit時才向DB原子寫入。
WriteBatchWithIndex在WriteBatch之外,額外搞一個Skiplist來記錄每一個操作在WriteBatch中的offset等信息。在事務沒有commit之前,數據還不在Memtable中,而是存在WriteBatch裏,如果有需要,這時候可以通過WriteBatchWithIndex來拿到自己剛剛寫入的但還沒有提交的數據。
事務的SetSavePoint和RollbackToSavePoint也是通過WriteBatch來實現的,SetSavePoint記錄當前WriteBatch的大小及統計信息,若幹操作之後,若想回滾,則只需要將WriteBatch truncate到之前記錄的大小並恢復統計信息即可。
3.2 PessimisticTransaction
PessimisticTransactionDB通過TransactionLockMgr進行行鎖管理。事務中的每次寫入操作之前都需要TryLock進Key鎖的獨占及沖突檢測,以Put為例:
Status TransactionBaseImpl::Put(ColumnFamilyHandle* column_family,
const Slice& key, const Slice& value) {
// 調用TryLock搶鎖及沖突檢測
Status s =
TryLock(column_family, key, false /* read_only */, true /* exclusive */);
if (s.ok()) {
s = GetBatchForWrite()->Put(column_family, key, value);
if (s.ok()) {
num_puts_++;
}
}
return s;
}
可以看到Put接口定義在TransactionBase中,無論Pessimistic還是Optimistic的Put都是這段邏輯,二者的區別是在對TryLock的重載。先看Pessimistic的,TransactionBaseImpl::TryLock通過TransactionBaseImpl::TryLock -> PessimisticTransaction::TryLock -> PessimisticTransactionDB::TryLock -> TransactionLockMgr::TryLock一路調用到TransactionLockMgr的TryLock,在裏面完成對key加鎖,加鎖成功便實現了對key的獨占,此時直到事務commit之前,其他事務是無法修改這個key的。
鎖是加成功了,但這也只能說明從此刻起到事務結束前這個key不會再被外部修改,但如果事務在最開始執行SetSnapshot設置了快照,如果在打快照和Put之間的過程中外部對相同key進行了修改(並commit),此時已經打破了snapshot的保證,所以事務之後的Put也不能成功,這個沖突檢測也是在PessimisticTransaction::TryLock中做的,如下:
Status PessimisticTransaction::TryLock(ColumnFamilyHandle* column_family,
const Slice& key, bool read_only,
bool exclusive, bool skip_validate) {
...
// 加鎖
if (!previously_locked || lock_upgrade) {
s = txn_db_impl_->TryLock(this, cfh_id, key_str, exclusive);
}
SetSnapshotIfNeeded();
...
// 使用事務一開始拿到的snapshot的sequence1與這個key在DB中最新
// 的sequence2進行比較,如果sequence2 > sequence1則代表在snapshot
// 之後,外部有對key進行過寫入,有沖突!
s = ValidateSnapshot(column_family, key, &tracked_at_seq);
if (!s.ok()) {
// 檢測到沖突,解鎖
// Failed to validate key
if (!previously_locked) {
// Unlock key we just locked
if (lock_upgrade) {
s = txn_db_impl_->TryLock(this, cfh_id, key_str,
false /* exclusive */);
assert(s.ok());
} else {
txn_db_impl_->UnLock(this, cfh_id, key.ToString());
}
}
}
if (s.ok()) {
// 如果加鎖及沖突檢測通過,記錄這個key以便事務結束時釋放掉鎖
// We must track all the locked keys so that we can unlock them later. If
// the key is already locked, this func will update some stats on the
// tracked key. It could also update the tracked_at_seq if it is lower than
// the existing trackey seq.
TrackKey(cfh_id, key_str, tracked_at_seq, read_only, exclusive);
}
}
其中ValidateSnapshot就是進行沖突檢測,通過將事務設置的snapshot與key最新的sequence進行比較,如果小於key最新的sequence,則代表設置snapshot後,外部事務修改過這個key,有沖突!獲取key最新的sequence也是簡單粗暴,遍歷memtable,immutable memtable,memtable list history及SST文件來拿。總結如下圖:
GetForUpdate的邏輯和Put差不多,無非就是以Get之名行Put之事(加鎖及沖突檢測),如下圖:
接著介紹下TransactionLockMgr,如下圖:
最外層先是一個std::unordered_map,將每個ColumnFamily映射到一個LockMap,每個LockMap默認有16個LockMapStripe,然後每個LockMapStripe裏包含一個std::unordered_map keys,這就是存放每個key對應的鎖信息的。所以每次加鎖過程大致如下:
首先通過ThreadLocal拿到lock_maps指針
通過column family ID 拿到對應的LockMap
對key hash映射到某個LockMapStripe,對該LockMapStripe加鎖(同一LockMapStripe下的所有key會搶同一把鎖,粒度略大)
操作LockMapStripe裏的std::unordered_map完成加鎖
3.3 OptimisticTransaction
OptimisticTransactionDB不使用鎖進行key的獨占,只在commit是進行沖突檢測。所以OptimisticTransaction::TryLock如下:
Status OptimisticTransaction::TryLock(ColumnFamilyHandle* column_family,
const Slice& key, bool read_only,
bool exclusive, bool untracked) {
if (untracked) {
return Status::OK();
}
uint32_t cfh_id = GetColumnFamilyID(column_family);
SetSnapshotIfNeeded();
// 如果設置了之前事務snapshot,這裏使用它作為key的seq
// 如果沒有設置snapshot,則以當前全局的sequence作為key的seq
SequenceNumber seq;
if (snapshot_) {
seq = snapshot_->GetSequenceNumber();
} else {
seq = db_->GetLatestSequenceNumber();
}
std::string key_str = key.ToString();
// 記錄這個key及其對應的seq,後期在commit時通過使用這個seq和
// key當前的最新sequence比較來做沖突檢測
TrackKey(cfh_id, key_str, seq, read_only, exclusive);
// Always return OK. Confilct checking will happen at commit time.
return Status::OK();
}
這裏TryLock實際上就是給key標記一個sequence並記錄,用作commit時的沖突檢測,commit實現如下:
Status OptimisticTransaction::Commit() {
// Set up callback which will call CheckTransactionForConflicts() to
// check whether this transaction is safe to be committed.
OptimisticTransactionCallback callback(this);
DBImpl* db_impl = static_cast_with_check<DBImpl, DB>(db_->GetRootDB());
// 調用WriteWithCallback進行沖突檢測,如果沒有沖突就寫入DB
Status s = db_impl->WriteWithCallback(
write_options_, GetWriteBatch()->GetWriteBatch(), &callback);
if (s.ok()) {
Clear();
}
return s;
}
沖突檢測的實現在OptimisticTransactionCallback裏,和設置了snapshot的PessimisticTransaction一樣,最終還是會調用TransactionUtil::CheckKeysForConflicts來檢測,也就是比較sequence。整體如下圖:
3.4 兩階段提交(Two Phase Commit)
在分布式場景下使用PessimisticTransaction時,我們可能需要使用兩階段提交(2PC)來確保一個事務在多個節點上執行成功,所以PessimisticTransaction也支持2PC。具體做法也不難,就是將之前commit拆分為prepare和commit,prepare階段進行WAL的寫入,commit階段進行Memtable的寫入(寫入後其他事務方可見),所以現在一個事務的操作流程如下:
BeginTransaction
GetForUpdate
Put
...
Prepare
Commit
使用2PC,我們首先要通過SetName為一個事務設置唯一的標識並註冊到全局映射表裏,這裏記錄著所有未完成的2PC事務,當Commit後再從映射表裏刪除。
接下來具體2PC實現無非就是在WriteBatch上做文章,通過特殊的標記來控制寫WAL和Memtable,簡單說一下:
正常的WriteBatch結構如下:
Sequence(0);NumRecords(3);Put(a,1);Merge(a,1);Delete(a);
2PC一開始的WriteBatch如下:
Sequence(0);NumRecords(0);Noop;
先使用一個Noop占位,至於為什麽,後面再說。緊接著就是一些操作,操作後,WriteBatch如下:
Sequence(0);NumRecords(3);Noop;Put(a,1);Merge(a,1);Delete(a);
然後執行Prepare,寫WAL,在寫WAL之前,先會隊WriteBatch做一些改動,插入Prepare和EndPrepare記錄,如下:
Sequence(0);NumRecords(3);Prepare();Put(a,1);Merge(a,1);Delete(a);EndPrepare(xid)
可以看到這裏將之前的Noop占位換成Prepare,然後在結尾插入EndPrepare(xid),構造好WriteBatch後就直接調用WriteImpl寫WAL了。註意,此時往WAL裏寫的這條日誌的sequence雖然比VersionSet的last_sequence大,但寫入WAL之後並不會調用SetLastSequence來更新VersionSet的last_sequence,它只有在最後寫入Memtable之後才更新,具體做法就是給VersionSet除了last_sequence_之外,再加一個last_allocatedsequence,初始相等,寫WAL是加後者,後者對外不可見,commit後再加前者。所以一旦PessimisticTransactionDB使用了2PC,就要求所有都是2PC,不然last_sequence_可能會錯亂(更正:如果使用two_writequeues,不管是Prepare -> Commit還是直接Commit,sequence的增長都是以last_allocated_sequence_為準,最後用它來調整lastsequence;如果不使用two_write_queues_則直接以last_sequence_為準,總之不會出現sequence混錯,所以可以Prepare -> Commit和Commit混用)。
WAL寫完之後,即使沒有commit就宕機也沒事,重啟後Recovery會將事務從WAL恢復記錄到全局recovered_transaction中,等待Commit
最後就是Commit,Commit階段會使用一個新的CommitTime WriteBatch,和之前的WriteBatch合並整理後最終使用CommitTime WriteBatch寫Memtable
整理後的CommitTime WriteBatch如下:
Sequence(0);NumRecords(3);Commit(xid);
Prepare();Put(a,1);Merge(a,1);Delete(a);EndPrepare(xid);
將CommitTime WriteBatch的WALTerminalPoint設置到Commit(xid)處,告訴Writer寫WAL時寫到這裏就可以停了,其實就是只將Commit記錄寫進WAL(因為其後的記錄在Prepare階段就已經寫到WAL了);
在最後就是MemTableInserter遍歷這個CommitTime WriteBatch向memtable寫入,具體就不說了。寫入成功後,更新VersionSet的lastsequence,至此,事務成功提交。
- WritePrepared & WriteUnprepared
我們可以看到無論是Pessimistic還是Optimistic,都有一個共同缺點,那就是在事務最終Commit之前,所以數據都是緩存在內存(WriteBatch)裏,對於很大的事務來說,這非常耗費內存並且將所有實際寫入壓力都扔給Commit階段來搞,性能有瓶頸,所以RocksDB正在支持WritePolicy為WritePrepared和WriteUnprepared的PessimisticTransaction,主要思想就是將對Memtable的寫入提前,
如果放到Prepare階段那就是WritePrepared
如果再往前,每次操作直接寫Memtable那就是WriteUnprepared
可以看到WriteUnprepared無論內存占用還是寫入壓力點的分散都做的最好,WritePrepared稍遜。
支持這倆新的WritePolicy的難點在於如何保證寫入到Memtable但還未Commit的數據不被其他事物看到,這裏就需要在Sequence上大做文章了,目前Rocksdb支持了WritePrepare、而WriteUnprepared還未支持,期待後續...
- 隔離級別
看了前面的介紹,這裏就不用展開說了
TransactionDB支持ReadCommitted和RepeatableReads級別的隔離
原文鏈接請添加鏈接描述
本文為雲棲社區原創內容,未經允許不得轉載
【RocksDB】TransactionDB源碼分析