1. 程式人生 > >[LevelDB] 寫批處理過程詳解

[LevelDB] 寫批處理過程詳解

iter update ddr ins sequence cond += inter false

leveldb的write代碼初看瞎搞一堆,細看則實為短小精悍。

1 Status DBImpl::Write(const WriteOptions& options, WriteBatch* my_batch) {
 2  // -----A begin-------  
 3   Writer w(&mutex_);  
 4   w.batch = my_batch;  
 5   w.sync = options.sync;  
 6   w.done = false;  
 7   // -----A end  --------  
 8 
 9 
10   // -----B begin-------  
11   MutexLock l(&mutex_);  
12   writers_.push_back(&w);  
13   while (!w.done && &w != writers_.front()) {  
14     w.cv.Wait();  
15   }  
16   if (w.done) {  
17     return w.status;  
18   }  
19   // -----B end  -------  
20 
21   // May temporarily unlock and wait.  
22   Status status = MakeRoomForWrite(my_batch == NULL);  
23   uint64_t last_sequence = versions_->LastSequence();  
24   Writer* last_writer = &w;  
25   if (status.ok() && my_batch != NULL) {  // NULL batch is for compactions  
26     WriteBatch* updates = BuildBatchGroup(&last_writer);  
27     WriteBatchInternal::SetSequence(updates, last_sequence + 1);  
28     last_sequence += WriteBatchInternal::Count(updates);  
29 
30     // Add to log and apply to memtable.  We can release the lock  
31     // during this phase since &w is currently responsible for logging  
32     // and protects against concurrent loggers and concurrent writes  
33     // into mem_.  
34     {
35       // -----C begin-------  
36       mutex_.Unlock();  
37       // -----C end  -------  
38       status = log_->AddRecord(WriteBatchInternal::Contents(updates));  
39       if (status.ok() && options.sync) {  
40         status = logfile_->Sync();  
41       }  
42       if (status.ok()) {  
43         status = WriteBatchInternal::InsertInto(updates, mem_);  
44       }  
45       // -----D begin-------  
46       mutex_.Lock();  
47       // -----D end  -------  
48     }  
49     if (updates == tmp_batch_) tmp_batch_->Clear();  
50 
51     versions_->SetLastSequence(last_sequence);  
52   }  
53 
54   // -----E begin-------  
55   while (true) {  
56     Writer* ready = writers_.front();  
57     writers_.pop_front();  
58     if (ready != &w) {  
59       ready->status = status;  
60       ready->done = true;  
61       ready->cv.Signal();  
62     }  
63     if (ready == last_writer) break;  
64   }  
65   // -----E end -------  
66 
67 
68   // -----F begin-------  
69   // Notify new head of write queue  
70   if (!writers_.empty()) {  
71     writers_.front()->cv.Signal();  
72   }  
73   // -----F end-------  
74 
75   return status;  
76 }

  如上,A段代碼定義一個Writer w, w的成員包括了batch信息,同時初始化了一個條件變量成員(port::CondVar)

  假設同時有w1, w2, w3, w4, w5, w6 並發請求寫入。

  B段代碼讓競爭到mutex資源的w1獲取了鎖。添加到writers隊列裏去,此時隊列只有一個w1, 從而其順利的進行BuildBatchGroup。當運行到c段代碼時,mutex互斥鎖釋放,這時(w2, w3, w4, w5, w6)會競爭鎖,由於B段代碼中不滿足隊首條件,均等待並釋放鎖了。從而隊列可能會如(w3, w5, w2, w4).

  繼而w1進行log寫入和memtable寫入,之所以這裏在無鎖狀況下時安全的,因為其它的寫操作都不滿足隊首條件,進而不會進入log和memtable寫入階段。 當w1完成log和memtable寫入後,進入d段代碼,則mutex又鎖住,這時B段代碼中隊列因為獲取不到鎖則隊列不會修改。

  進入E段代碼後,w1被pop出來,由於reader==w, 並且ready==last_writer,所以直接到F段代碼,喚醒了此時處於隊首的w3.

  w3喚醒時,發現自己是隊首,可以順利的進行進入BuildBatchGroup,在該函數中,遍歷了目前所有的隊列元素,形成一個update的batch,即將w3, w5, w2, w4合並為一個batch. 並將last_writer置為此時處於隊尾的最後一個元素w4,c段代碼運行後,因為釋放了鎖資源,隊列可能隨著DBImpl::Write的調用而更改,如隊列狀況可能為(w3, w5, w2, w4, w6, w9, w8).

  C段和D段間的代碼將w3, w5, w2, w4整個的batch寫入log和memtable. 到E段時,分別對w5, w2, w4進行了一次cond signal.當判斷到完w4 == lastwriter時,則退出E段代碼。F段則對隊首的w6喚醒,從而按上述步驟依次進行下去。

  這樣就形成了多個並發write 合並為一個batch寫入log和memtable的機制。

[LevelDB] 寫批處理過程詳解