[LevelDB] 寫批處理過程詳解
leveldb的write代碼初看瞎搞一堆,細看則實為短小精悍。
1 Status DBImpl::Write(const WriteOptions& options, WriteBatch* my_batch) { 2 // -----A begin------- 3 Writer w(&mutex_); 4 w.batch = my_batch; 5 w.sync = options.sync; 6 w.done = false; 7 // -----A end -------- 8 9 10 // -----B begin------- 11 MutexLock l(&mutex_); 12 writers_.push_back(&w); 13 while (!w.done && &w != writers_.front()) { 14 w.cv.Wait(); 15 } 16 if (w.done) { 17 return w.status; 18 } 19 // -----B end ------- 20 21 // May temporarily unlock and wait. 22 Status status = MakeRoomForWrite(my_batch == NULL); 23 uint64_t last_sequence = versions_->LastSequence(); 24 Writer* last_writer = &w; 25 if (status.ok() && my_batch != NULL) { // NULL batch is for compactions 26 WriteBatch* updates = BuildBatchGroup(&last_writer); 27 WriteBatchInternal::SetSequence(updates, last_sequence + 1); 28 last_sequence += WriteBatchInternal::Count(updates); 29 30 // Add to log and apply to memtable. We can release the lock 31 // during this phase since &w is currently responsible for logging 32 // and protects against concurrent loggers and concurrent writes 33 // into mem_. 34 { 35 // -----C begin------- 36 mutex_.Unlock(); 37 // -----C end ------- 38 status = log_->AddRecord(WriteBatchInternal::Contents(updates)); 39 if (status.ok() && options.sync) { 40 status = logfile_->Sync(); 41 } 42 if (status.ok()) { 43 status = WriteBatchInternal::InsertInto(updates, mem_); 44 } 45 // -----D begin------- 46 mutex_.Lock(); 47 // -----D end ------- 48 } 49 if (updates == tmp_batch_) tmp_batch_->Clear(); 50 51 versions_->SetLastSequence(last_sequence); 52 } 53 54 // -----E begin------- 55 while (true) { 56 Writer* ready = writers_.front(); 57 writers_.pop_front(); 58 if (ready != &w) { 59 ready->status = status; 60 ready->done = true; 61 ready->cv.Signal(); 62 } 63 if (ready == last_writer) break; 64 } 65 // -----E end ------- 66 67 68 // -----F begin------- 69 // Notify new head of write queue 70 if (!writers_.empty()) { 71 writers_.front()->cv.Signal(); 72 } 73 // -----F end------- 74 75 return status; 76 }
如上,A段代碼定義一個Writer w, w的成員包括了batch信息,同時初始化了一個條件變量成員(port::CondVar
)
假設同時有w1, w2, w3, w4, w5, w6 並發請求寫入。
B段代碼讓競爭到mutex資源的w1獲取了鎖。添加到writers隊列裏去,此時隊列只有一個w1, 從而其順利的進行BuildBatchGroup
。當運行到c段代碼時,mutex互斥鎖釋放,這時(w2, w3, w4, w5, w6)會競爭鎖,由於B段代碼中不滿足隊首條件,均等待並釋放鎖了。從而隊列可能會如(w3, w5, w2, w4).
繼而w1進行log寫入和memtable寫入,之所以這裏在無鎖狀況下時安全的,因為其它的寫操作都不滿足隊首條件,進而不會進入log和memtable寫入階段。 當w1完成log和memtable寫入後,進入d段代碼,則mutex又鎖住,這時B段代碼中隊列因為獲取不到鎖則隊列不會修改。
進入E段代碼後,w1被pop出來,由於reader==w, 並且ready==last_writer,所以直接到F段代碼,喚醒了此時處於隊首的w3.
w3喚醒時,發現自己是隊首,可以順利的進行進入BuildBatchGroup
,在該函數中,遍歷了目前所有的隊列元素,形成一個update的batch,即將w3, w5, w2, w4合並為一個batch. 並將last_writer置為此時處於隊尾的最後一個元素w4,c段代碼運行後,因為釋放了鎖資源,隊列可能隨著DBImpl::Write的調用而更改,如隊列狀況可能為(w3, w5, w2, w4, w6, w9, w8).
C段和D段間的代碼將w3, w5, w2, w4整個的batch寫入log和memtable. 到E段時,分別對w5, w2, w4進行了一次cond signal.當判斷到完w4 == lastwriter時,則退出E段代碼。F段則對隊首的w6喚醒,從而按上述步驟依次進行下去。
這樣就形成了多個並發write 合並為一個batch寫入log和memtable的機制。
[LevelDB] 寫批處理過程詳解