以太坊之LevelDB原始碼分析
最近研究以太坊的LevelDB使用,看了看程式碼,大致介紹下使用流程(網上介紹的leveldb大多是c++版本的,以太坊使用的是go語言版本的),我使用的是mac book開發環境。介紹中會忽略一些細節,如有重要遺漏或者錯誤歡迎指出。
讀此篇文章預設leveldb的基本知識都瞭解,可以參見我的另外一篇文章介紹
https://blog.csdn.net/csds319/article/details/80333187
初始化
在ethdb/database.go的NewLDBDataBase()函式中,
db, err := leveldb.OpenFile(file, &opt.Options{ OpenFilesCacheCapacity: handles, BlockCacheCapacity: cache / 2 * opt.MiB, WriteBuffer: cache / 4 * opt.MiB, // Two of these are used internally Filter: filter.NewBloomFilter(10), })
file就是leveldb的路徑,以太坊的預設路徑是/Users/$Owner/Library/Ethereum/geth/chaindata
OpenFilesCacheCapacity:以太坊設定的是1024,作用應該是可開啟的檔案數吧,後續程式碼中再確認一下
BlockCacheCapacity:設定的是cache的一半,是384M
WriteBuffer:設定的是cache的1/4,是192M,這個是memtable的size。為什麼是1/4呢,因為cache是設定的leveldb總共使用的大小,一半給了BlockCacheCapacity,另外一半是給memtable的。而leveldb寫資料的流程是先寫memtable,等寫滿了把這個memtable forzen,然後啟用minor compaction到level 0檔案,同時new一個memtable供新寫入。所以cache的一半是給memtable和frozon memtable用的,單個memory的大小就是1/4
Filter:bloom filter,每個level檔案會建filter,10的意思是每個key hash的次數。bloom的位數需要程式碼確認下
OpenFile就會直接呼叫到leveldb的db.go檔案中
經過一些列初始化,恢復log檔案等,建立了若干個goroutine,看程式碼
func openDB(s *session) (*DB, error) { .... // Doesn't need to be included in the wait group. go db.compactionError() go db.mpoolDrain() if readOnly { db.SetReadOnly() } else { db.closeW.Add(2) go db.tCompaction() go db.mCompaction() // go db.jWriter() } }
compactionError:看程式碼是監聽一些channel做處理,暫未深究,後續補充
mpoolDrain:啟動一個30s的ticker讀取mempool chan,具體作用暫未深究,後續補充
mCompaction: minor compaction,就是把memory的內容寫入到level 0的檔案
tCompaction:major compaction,就是合併不同層級的level檔案。比如level 0滿了(已經有大於等於4個檔案了),此goroutine監聽到了,就會將level 0的某個檔案和level 1的某些檔案合併成新的level 1檔案
到這裡leveldb的初始化就成功了,新建幾個goroutine監聽是否compaction,基本流程大值如此了
讀寫資料
leveldb提供了一些介面來寫資料,以太坊做了包裝,具體看ethdb/interface.go
// Putter wraps the database write operation supported by both batches and regular databases.
type Putter interface {
Put(key []byte, value []byte) error
}
// Database wraps all database operations. All methods are safe for concurrent use.
type Database interface {
Putter
Get(key []byte) ([]byte, error)
Has(key []byte) (bool, error)
Delete(key []byte) error
Close()
NewBatch() Batch
}
// Batch is a write-only database that commits changes to its host database
// when Write is called. Batch cannot be used concurrently.
type Batch interface {
Putter
ValueSize() int // amount of data in the batch
Write() error
// Reset resets the batch for reuse
Reset()
}
定義了三個interface,Putter,Database和Batch與LevelDB讀寫互動
寫資料
寫資料又分為寫新資料、更新資料和刪除資料
leveldb為了效率考慮(如果刪除資料和更新資料用傳統的方式做的話,需要查詢所有資料庫找到原始key,效率比較低),此三種情況統統使用插入資料的方式,刪除資料是寫一個刪除標誌,更新資料是寫一樣key帶不同的value
那麼問題來了,如果更新或刪除資料,整個資料庫中有兩個或更多個相同的key,什麼時候合併,查詢的時候怎麼確定哪個是正確的
答案:
(1)什麼時候合併
如果有兩個或多個相同的key(或者是刪除,key的v是刪除標誌),一直到major compaction的時候才會執行合併動作或者刪除動作,這樣可以提升效率
(2)如何查詢到正確的值
因為leveldb的分層概念,讀資料的時候先查memory,然後再從level 0到level N逐層查詢,查詢到了就不再查詢,這裡有個新鮮度的概念,層級越低,新鮮度越高,memory中新鮮度最高。所以對於更新操作來說,即便是某個時刻資料庫中有兩個或者更過個相同key的kv,會以新鮮度高的為準。如果查詢到了key為刪除標誌,那麼直接返回not found即可
寫新資料
為了減少leveldb的互動,寫資料的時候一般會以Batch進行,就是先往batch裡寫一堆資料,然後再統一把這個Batch寫到leveldb。
即便是單個kv的寫入,leveldb內部也是使用batch來寫入的,但是這個batch也會即時寫入memory和log
以太坊的core/blockchain.go中寫block的時候就是新建Batch,然後把Batch寫入leveldb
// WriteBlockWithState writes the block and all associated state to the database.
func (bc *BlockChain) WriteBlockWithState(block *types.Block, receipts ...) {
...
// Write other block data using a batch.
batch := bc.db.NewBatch()
if err := WriteBlock(batch, block); err != nil {
return NonStatTy, err
}
....
if err := batch.Write(); err != nil {
return NonStatTy, err
}
....
}
我們來看看batch.Write的實現,在leveldb的db_write.go程式碼裡:
func (db *DB) Write(batch *Batch, wo *opt.WriteOptions) error {
…
// 這段程式碼的意思是當batch的內容長度大於memory table的長度(以太坊是192M),
// 一次性寫入memory(當寫滿的時候會觸發minor compaction,然後接著寫memory直到把內容全部寫完)
if batch.internalLen > db.s.o.GetWriteBuffer() && !db.s.o.GetDisableLargeBatchTransaction() {
tr, err := db.OpenTransaction()
if err != nil {
return err
}
if err := tr.Write(batch, wo); err != nil {
tr.Discard()
return err
}
return tr.Commit()
}
…
return db.writeLocked(batch, nil, merge, sync)
}
接著看writeLocked程式碼:
func (db *DB) writeLocked(batch, ourBatch *Batch, merge, sync bool) error {
// flush的功能是看是否觸發minor compaction
mdb, mdbFree, err := db.flush(batch.internalLen)
…
// Write journal. 寫Log檔案
if err := db.writeJournal(batches, seq, sync); err != nil {
db.unlockWrite(overflow, merged, err)
return err
}
// Put batches. 寫batch資料到memory
for _, batch := range batches {
if err := batch.putMem(seq, mdb.DB); err != nil {
panic(err)
}
seq += uint64(batch.Len())
}
….
// Rotate memdb if it's reach the threshold.
// 如果memory不夠寫batch的內容,呼叫rotateMem,就是把memory frezon觸發minor compaction
if batch.internalLen >= mdbFree {
db.rotateMem(0, false)
}
db.unlockWrite(overflow, merged, nil)
return nil
}
有點沒看懂為什麼先batch.putMem然後判斷batch.internalLen與mdbFree比大小再rotateMem,理應是先判斷mdbFree...
還有個merge與一堆channel的互動沒看明白,後續接著看
再看rotateMem的實現
func (db *DB) rotateMem(n int, wait bool) (mem *memDB, err error) {
retryLimit := 3
retry:
// Wait for pending memdb compaction.
err = db.compTriggerWait(db.mcompCmdC)
if err != nil {
return
}
retryLimit--
// Create new memdb and journal.
// 新建log檔案和memory,同時把現在使用的memory指向為frozenMem,minor compaction的時候寫入frozenMem到level 0檔案
mem, err = db.newMem(n)
if err != nil {
if err == errHasFrozenMem {
if retryLimit <= 0 {
panic("BUG: still has frozen memdb")
}
goto retry
}
return
}
// Schedule memdb compaction.
// 觸發minor compaction
if wait {
err = db.compTriggerWait(db.mcompCmdC)
} else {
db.compTrigger(db.mcompCmdC)
}
return
}
至此資料寫完,如果memory空間夠,直接寫入memory
如果memory空間不夠,等待執行minor compaction(compTrigger內會等待compaction的結果)再寫入新建的memory db(是從mempool中拿的,應該是mempool中就兩塊兒memory,待寫入的memory和frozon memory)中
刪除資料/更新資料
先看插入新資料的介面,更新資料也是呼叫這個一樣的介面:
func (db *DB) Put(key, value []byte, wo *opt.WriteOptions) error {
return db.putRec(keyTypeVal, key, value, wo)
}
插入資料是插入一個type為keyTypeVal,key/value的資料
再看刪除資料的介面
func (db *DB) Delete(key []byte, wo *opt.WriteOptions) error {
return db.putRec(keyTypeDel, key, nil, wo)
}
刪除資料的程式碼其實就是插入一個type為keyTypeDel,key/nil的資料,當做一個普通的資料插入到memory中
等後續做major compaction的時候找到原始的key再執行刪除動作(更新資料也是在major compaction的時候進行)
具體major compaction的程式碼還未看明白,後續看明白了再貼上來
讀資料
讀資料是依次從memtable和各個level檔案中查詢資料,db.go的介面:
func (db *DB) Get(key []byte, ro *opt.ReadOptions) (value []byte, err error) {
err = db.ok()
if err != nil {
return
}
// 關於snapshot未做研究,後續有研究再貼一下
se := db.acquireSnapshot()
defer db.releaseSnapshot(se)
return db.get(nil, nil, key, se.seq, ro)
}
func (db *DB) get(auxm *memdb.DB, auxt tFiles, key []byte, seq uint64, ro *opt.ReadOptions) (value []byte, err error) {
ikey := makeInternalKey(nil, key, seq, keyTypeSeek)
if auxm != nil {
if ok, mv, me := memGet(auxm, ikey, db.s.icmp); ok {
return append([]byte{}, mv...), me
}
}
// 拿到memdb和frozon memdb依次查詢
em, fm := db.getMems()
for _, m := range [...]*memDB{em, fm} {
if m == nil {
continue
}
defer m.decref()
if ok, mv, me := memGet(m.DB, ikey, db.s.icmp); ok {
return append([]byte{}, mv...), me
}
}
// 拿到version後從version中各個level的檔案中依次查詢
v := db.s.version()
value, cSched, err := v.get(auxt, ikey, ro, false)
v.release()
if cSched {
// Trigger table compaction.
db.compTrigger(db.tcompCmdC)
}
return
}
Compaction
compaction是把資料一級一級的往下寫,leveldb實現了minor compaction和major compaction
minor compaction,leveldb裡面的mCompaction goroutine做的事情,就是把memory中的資料寫入到level 0檔案中
major compaction,leveldb裡面tCompaction goroutine做的事情,就是把低層的level檔案合併寫入高層的level檔案中
mCompaction
func (db *DB) mCompaction() {
var x cCmd
for {
select {
case x = <-db.mcompCmdC:
switch x.(type) {
case cAuto:
db.memCompaction()
x.ack(nil)
x = nil
default:
panic("leveldb: unknown command")
}
case <-db.closeC:
return
}
}
}
還記得寫資料的時候rotateMem中會寫channel mcompCmdC嗎,這個goroutine起來後一直在監聽該channel等待做compaction的事情,所以看memCompaction的實現
func (db *DB) memCompaction() {
// rotateMem的時候把當前使用的memory指向到frozonMem,這裡讀出來寫入level 0檔案
mdb := db.getFrozenMem()
// Pause table compaction.
// 這裡的作用是minor compaction的時候要先暫停major compaction
resumeC := make(chan struct{})
select {
case db.tcompPauseC <- (chan<- struct{})(resumeC):
case <-db.compPerErrC:
close(resumeC)
resumeC = nil
case <-db.closeC:
db.compactionExitTransact()
}
// Generate tables. 建立level 0檔案然後寫memory到檔案
// flushMemdb是把memory內容寫到新建的level 0檔案,然後把level 0檔案加入到addedTables record中
// 程式碼裡把level 0~N的檔案叫做table
db.compactionTransactFunc("[email protected]", func(cnt *compactionTransactCounter) (err error) {
stats.startTimer()
flushLevel, err = db.s.flushMemdb(rec, mdb.DB, db.memdbMaxLevel)
stats.stopTimer()
return
}, func() error {
for _, r := range rec.addedTables {
db.logf("[email protected] revert @%d", r.num)
if err := db.s.stor.Remove(storage.FileDesc{Type: storage.TypeTable, Num: r.num}); err != nil {
return err
}
}
return nil
})
rec.setJournalNum(db.journalFd.Num)
rec.setSeqNum(db.frozenSeq)
// Commit.
// 就是最終儲存tables,寫入到version記錄。。。後續深入看下
stats.startTimer()
db.compactionCommit("memdb", rec)
stats.stopTimer()
db.logf("[email protected] committed F·%d T·%v", len(rec.addedTables), stats.duration)
for _, r := range rec.addedTables {
stats.write += r.size
}
db.compStats.addStat(flushLevel, stats)
// Drop frozen memdb.
// minor compaction之後把指向frozon的memory重新放回mempool中
db.dropFrozenMem()
// Resume table compaction.
// 恢復major compaction
if resumeC != nil {
select {
case <-resumeC:
close(resumeC)
case <-db.closeC:
db.compactionExitTransact()
}
}
// Trigger table compaction.
// tcompCmdC就是major compaction要監聽的channel,這裡寫資料到此channel
db.compTrigger(db.tcompCmdC)
}
後續需要繼續完善compactionCommit程式碼,實現都在這裡
tCompaction
func (db *DB) tCompaction() {
for {
if db.tableNeedCompaction() {
select {
case x = <-db.tcompCmdC:
case ch := <-db.tcompPauseC:
db.pauseCompaction(ch)
continue
case <-db.closeC:
return
default:
}
} else {
for i := range ackQ {
ackQ[i].ack(nil)
ackQ[i] = nil
}
ackQ = ackQ[:0]
select {
case x = <-db.tcompCmdC:
case ch := <-db.tcompPauseC:
db.pauseCompaction(ch)
continue
case <-db.closeC:
return
}
}
if x != nil {
switch cmd := x.(type) {
case cAuto:
ackQ = append(ackQ, x)
case cRange:
x.ack(db.tableRangeCompaction(cmd.level, cmd.min, cmd.max))
default:
panic("leveldb: unknown command")
}
x = nil
}
db.tableAutoCompaction()
}
}
計算是否要執行major compaction
func (v *version) computeCompaction() {
for level, tables := range v.levels {
var score float64
size := tables.size()
if level == 0 {
// We treat level-0 specially by bounding the number of files
// instead of number of bytes for two reasons:
//
// (1) With larger write-buffer sizes, it is nice not to do too
// many level-0 compaction.
//
// (2) The files in level-0 are merged on every read and
// therefore we wish to avoid too many files when the individual
// file size is small (perhaps because of a small write-buffer
// setting, or very high compression ratios, or lots of
// overwrites/deletions).
score = float64(len(tables)) / float64(v.s.o.GetCompactionL0Trigger())
} else {
score = float64(size) / float64(v.s.o.GetCompactionTotalSize(level))
}
if score > bestScore {
bestLevel = level
bestScore = score
}
statFiles[level] = len(tables)
statSizes[level] = shortenb(int(size))
statScore[level] = fmt.Sprintf("%.2f", score)
statTotSize += size
}
v.cLevel = bestLevel
v.cScore = bestScore
}
計算是否要compaction是邏輯是:計算一個分數,level 0是檔案個數/4,level 0以上就是檔案的總大小/預設的每個level的檔案大小總量;最後找出算出的值最大的一個賦值到v.cScore,level賦值到v.cLevel
最終使用的時候是判斷這個cScore是否>=1來決定是否要進行compaction
func (v *version) needCompaction() bool {
return v.cScore >= 1 || atomic.LoadPointer(&v.cSeek) != nil
}
還有一個判斷是v.cSeek是否為空,這個是讀資料那邊用到的,等看到讀那邊的邏輯再講一下
基本上程式碼擼了一遍,但是隻是粗略的過了一遍,很多細節尚未涉及,有機會再詳細擼一遍。