1. 程式人生 > >以太坊之LevelDB原始碼分析

以太坊之LevelDB原始碼分析

最近研究以太坊的LevelDB使用,看了看程式碼,大致介紹下使用流程(網上介紹的leveldb大多是c++版本的,以太坊使用的是go語言版本的),我使用的是mac book開發環境。介紹中會忽略一些細節,如有重要遺漏或者錯誤歡迎指出。

讀此篇文章預設leveldb的基本知識都瞭解,可以參見我的另外一篇文章介紹

https://blog.csdn.net/csds319/article/details/80333187

初始化

在ethdb/database.go的NewLDBDataBase()函式中,

db, err := leveldb.OpenFile(file, &opt.Options{
		OpenFilesCacheCapacity: handles,
		BlockCacheCapacity:     cache / 2 * opt.MiB,
		WriteBuffer:            cache / 4 * opt.MiB, // Two of these are used internally
		Filter:                 filter.NewBloomFilter(10),
	})

file就是leveldb的路徑,以太坊的預設路徑是/Users/$Owner/Library/Ethereum/geth/chaindata

OpenFilesCacheCapacity:以太坊設定的是1024,作用應該是可開啟的檔案數吧,後續程式碼中再確認一下

BlockCacheCapacity:設定的是cache的一半,是384M

WriteBuffer:設定的是cache的1/4,是192M,這個是memtable的size。為什麼是1/4呢,因為cache是設定的leveldb總共使用的大小,一半給了BlockCacheCapacity,另外一半是給memtable的。而leveldb寫資料的流程是先寫memtable,等寫滿了把這個memtable forzen,然後啟用minor compaction到level 0檔案,同時new一個memtable供新寫入。所以cache的一半是給memtable和frozon memtable用的,單個memory的大小就是1/4

Filter:bloom filter,每個level檔案會建filter,10的意思是每個key hash的次數。bloom的位數需要程式碼確認下

OpenFile就會直接呼叫到leveldb的db.go檔案中

經過一些列初始化,恢復log檔案等,建立了若干個goroutine,看程式碼

func openDB(s *session) (*DB, error) {
    ....
    // Doesn't need to be included in the wait group.
    go db.compactionError()  
    go db.mpoolDrain()

    if readOnly {
	db.SetReadOnly()
    } else {
	db.closeW.Add(2)
	go db.tCompaction()
	go db.mCompaction()
	// go db.jWriter()
    }
}

compactionError:看程式碼是監聽一些channel做處理,暫未深究,後續補充

mpoolDrain:啟動一個30s的ticker讀取mempool chan,具體作用暫未深究,後續補充

mCompaction: minor compaction,就是把memory的內容寫入到level 0的檔案

tCompaction:major compaction,就是合併不同層級的level檔案。比如level 0滿了(已經有大於等於4個檔案了),此goroutine監聽到了,就會將level 0的某個檔案和level 1的某些檔案合併成新的level 1檔案

到這裡leveldb的初始化就成功了,新建幾個goroutine監聽是否compaction,基本流程大值如此了

讀寫資料

leveldb提供了一些介面來寫資料,以太坊做了包裝,具體看ethdb/interface.go

// Putter wraps the database write operation supported by both batches and regular databases.
type Putter interface {
	Put(key []byte, value []byte) error
}

// Database wraps all database operations. All methods are safe for concurrent use.
type Database interface {
	Putter
	Get(key []byte) ([]byte, error)
	Has(key []byte) (bool, error)
	Delete(key []byte) error
	Close()
	NewBatch() Batch
}

// Batch is a write-only database that commits changes to its host database
// when Write is called. Batch cannot be used concurrently.
type Batch interface {
	Putter
	ValueSize() int // amount of data in the batch
	Write() error
	// Reset resets the batch for reuse
	Reset()
}

定義了三個interface,Putter,Database和Batch與LevelDB讀寫互動

寫資料

寫資料又分為寫新資料、更新資料和刪除資料

leveldb為了效率考慮(如果刪除資料和更新資料用傳統的方式做的話,需要查詢所有資料庫找到原始key,效率比較低),此三種情況統統使用插入資料的方式,刪除資料是寫一個刪除標誌,更新資料是寫一樣key帶不同的value

那麼問題來了,如果更新或刪除資料,整個資料庫中有兩個或更多個相同的key,什麼時候合併,查詢的時候怎麼確定哪個是正確的

答案:

(1)什麼時候合併

如果有兩個或多個相同的key(或者是刪除,key的v是刪除標誌),一直到major compaction的時候才會執行合併動作或者刪除動作,這樣可以提升效率

(2)如何查詢到正確的值

因為leveldb的分層概念,讀資料的時候先查memory,然後再從level 0到level N逐層查詢,查詢到了就不再查詢,這裡有個新鮮度的概念,層級越低,新鮮度越高,memory中新鮮度最高。所以對於更新操作來說,即便是某個時刻資料庫中有兩個或者更過個相同key的kv,會以新鮮度高的為準。如果查詢到了key為刪除標誌,那麼直接返回not found即可

寫新資料

為了減少leveldb的互動,寫資料的時候一般會以Batch進行,就是先往batch裡寫一堆資料,然後再統一把這個Batch寫到leveldb。

即便是單個kv的寫入,leveldb內部也是使用batch來寫入的,但是這個batch也會即時寫入memory和log

以太坊的core/blockchain.go中寫block的時候就是新建Batch,然後把Batch寫入leveldb

// WriteBlockWithState writes the block and all associated state to the database.
func (bc *BlockChain) WriteBlockWithState(block *types.Block, receipts ...) {
    ...
    // Write other block data using a batch.
    batch := bc.db.NewBatch()
    if err := WriteBlock(batch, block); err != nil {
	return NonStatTy, err
    }
    ....
    if err := batch.Write(); err != nil {
	return NonStatTy, err
    }
    ....
}

我們來看看batch.Write的實現,在leveldb的db_write.go程式碼裡:

func (db *DB) Write(batch *Batch, wo *opt.WriteOptions) error {
	…
        // 這段程式碼的意思是當batch的內容長度大於memory table的長度(以太坊是192M),
        // 一次性寫入memory(當寫滿的時候會觸發minor compaction,然後接著寫memory直到把內容全部寫完)
	if batch.internalLen > db.s.o.GetWriteBuffer() && !db.s.o.GetDisableLargeBatchTransaction() {
    		tr, err := db.OpenTransaction()
    		if err != nil {
    			return err
    		}
    		if err := tr.Write(batch, wo); err != nil {
			tr.Discard()
            		return err
    		}
    		return tr.Commit()
    	}
	…
	return db.writeLocked(batch, nil, merge, sync)

}

接著看writeLocked程式碼:

func (db *DB) writeLocked(batch, ourBatch *Batch, merge, sync bool) error {
    // flush的功能是看是否觸發minor compaction
    mdb, mdbFree, err := db.flush(batch.internalLen)
    …
    // Write journal. 寫Log檔案
    if err := db.writeJournal(batches, seq, sync); err != nil {
        db.unlockWrite(overflow, merged, err)
        return err
    }
    
    // Put batches.  寫batch資料到memory
    for _, batch := range batches {
        if err := batch.putMem(seq, mdb.DB); err != nil {
            panic(err)
        }
        seq += uint64(batch.Len())
    }
    ….
    // Rotate memdb if it's reach the threshold.   
    // 如果memory不夠寫batch的內容,呼叫rotateMem,就是把memory frezon觸發minor compaction
    if batch.internalLen >= mdbFree {
        db.rotateMem(0, false)
    }
    db.unlockWrite(overflow, merged, nil)
    return nil
}

有點沒看懂為什麼先batch.putMem然後判斷batch.internalLen與mdbFree比大小再rotateMem,理應是先判斷mdbFree...

還有個merge與一堆channel的互動沒看明白,後續接著看

再看rotateMem的實現

func (db *DB) rotateMem(n int, wait bool) (mem *memDB, err error) {
	retryLimit := 3
retry:
	// Wait for pending memdb compaction.
	err = db.compTriggerWait(db.mcompCmdC)
	if err != nil {
		return
	}
	retryLimit--

	// Create new memdb and journal. 
        // 新建log檔案和memory,同時把現在使用的memory指向為frozenMem,minor compaction的時候寫入frozenMem到level 0檔案
	mem, err = db.newMem(n)
	if err != nil {
		if err == errHasFrozenMem {
			if retryLimit <= 0 {
				panic("BUG: still has frozen memdb")
			}
			goto retry
		}
		return
	}

	// Schedule memdb compaction.
        // 觸發minor compaction
	if wait {
		err = db.compTriggerWait(db.mcompCmdC)
	} else {
		db.compTrigger(db.mcompCmdC)
	}
	return
}

至此資料寫完,如果memory空間夠,直接寫入memory

如果memory空間不夠,等待執行minor compaction(compTrigger內會等待compaction的結果)再寫入新建的memory db(是從mempool中拿的,應該是mempool中就兩塊兒memory,待寫入的memory和frozon memory)中

刪除資料/更新資料

先看插入新資料的介面,更新資料也是呼叫這個一樣的介面:

func (db *DB) Put(key, value []byte, wo *opt.WriteOptions) error {
	return db.putRec(keyTypeVal, key, value, wo)
}

插入資料是插入一個type為keyTypeVal,key/value的資料

再看刪除資料的介面

func (db *DB) Delete(key []byte, wo *opt.WriteOptions) error {
	return db.putRec(keyTypeDel, key, nil, wo)
}

刪除資料的程式碼其實就是插入一個type為keyTypeDel,key/nil的資料,當做一個普通的資料插入到memory中

等後續做major compaction的時候找到原始的key再執行刪除動作(更新資料也是在major compaction的時候進行)

具體major compaction的程式碼還未看明白,後續看明白了再貼上來

讀資料

讀資料是依次從memtable和各個level檔案中查詢資料,db.go的介面:

func (db *DB) Get(key []byte, ro *opt.ReadOptions) (value []byte, err error) {
	err = db.ok()
	if err != nil {
		return
	}
        // 關於snapshot未做研究,後續有研究再貼一下
	se := db.acquireSnapshot()
	defer db.releaseSnapshot(se)
	return db.get(nil, nil, key, se.seq, ro)
}
func (db *DB) get(auxm *memdb.DB, auxt tFiles, key []byte, seq uint64, ro *opt.ReadOptions) (value []byte, err error) {
	ikey := makeInternalKey(nil, key, seq, keyTypeSeek)

	if auxm != nil {
		if ok, mv, me := memGet(auxm, ikey, db.s.icmp); ok {
			return append([]byte{}, mv...), me
		}
	}
        // 拿到memdb和frozon memdb依次查詢
	em, fm := db.getMems()
	for _, m := range [...]*memDB{em, fm} {
		if m == nil {
			continue
		}
		defer m.decref()

		if ok, mv, me := memGet(m.DB, ikey, db.s.icmp); ok {
			return append([]byte{}, mv...), me
		}
	}
        // 拿到version後從version中各個level的檔案中依次查詢
	v := db.s.version()
	value, cSched, err := v.get(auxt, ikey, ro, false)
	v.release()
	if cSched {
		// Trigger table compaction.
		db.compTrigger(db.tcompCmdC)
	}
	return
}

Compaction

compaction是把資料一級一級的往下寫,leveldb實現了minor compaction和major compaction

minor compaction,leveldb裡面的mCompaction goroutine做的事情,就是把memory中的資料寫入到level 0檔案中

major compaction,leveldb裡面tCompaction goroutine做的事情,就是把低層的level檔案合併寫入高層的level檔案中

mCompaction

func (db *DB) mCompaction() {
	var x cCmd

	for {
		select {
		case x = <-db.mcompCmdC:
			switch x.(type) {
			case cAuto:
				db.memCompaction()
				x.ack(nil)
				x = nil
			default:
				panic("leveldb: unknown command")
			}
		case <-db.closeC:
			return
		}
	}
}

還記得寫資料的時候rotateMem中會寫channel mcompCmdC嗎,這個goroutine起來後一直在監聽該channel等待做compaction的事情,所以看memCompaction的實現

func (db *DB) memCompaction() {
        // rotateMem的時候把當前使用的memory指向到frozonMem,這裡讀出來寫入level 0檔案
	mdb := db.getFrozenMem()
	
	// Pause table compaction. 
        // 這裡的作用是minor compaction的時候要先暫停major compaction
	resumeC := make(chan struct{})
	select {
	case db.tcompPauseC <- (chan<- struct{})(resumeC):
	case <-db.compPerErrC:
		close(resumeC)
		resumeC = nil
	case <-db.closeC:
		db.compactionExitTransact()
	}

	// Generate tables. 建立level 0檔案然後寫memory到檔案
        // flushMemdb是把memory內容寫到新建的level 0檔案,然後把level 0檔案加入到addedTables record中
        // 程式碼裡把level 0~N的檔案叫做table
	db.compactionTransactFunc("[email protected]", func(cnt *compactionTransactCounter) (err error) {
		stats.startTimer()
		flushLevel, err = db.s.flushMemdb(rec, mdb.DB, db.memdbMaxLevel)
		stats.stopTimer()
		return
	}, func() error {
		for _, r := range rec.addedTables {
			db.logf("[email protected] revert @%d", r.num)
			if err := db.s.stor.Remove(storage.FileDesc{Type: storage.TypeTable, Num: r.num}); err != nil {
				return err
			}
		}
		return nil
	})

	rec.setJournalNum(db.journalFd.Num)
	rec.setSeqNum(db.frozenSeq)

	// Commit. 
        // 就是最終儲存tables,寫入到version記錄。。。後續深入看下
	stats.startTimer()
	db.compactionCommit("memdb", rec)
	stats.stopTimer()

	db.logf("[email protected] committed F·%d T·%v", len(rec.addedTables), stats.duration)

	for _, r := range rec.addedTables {
		stats.write += r.size
	}
	db.compStats.addStat(flushLevel, stats)

	// Drop frozen memdb.     
        // minor compaction之後把指向frozon的memory重新放回mempool中
	db.dropFrozenMem()

	// Resume table compaction.
        // 恢復major compaction
	if resumeC != nil {
		select {
		case <-resumeC:
			close(resumeC)
		case <-db.closeC:
			db.compactionExitTransact()
		}
	}

	// Trigger table compaction. 
        // tcompCmdC就是major compaction要監聽的channel,這裡寫資料到此channel
	db.compTrigger(db.tcompCmdC)
}

後續需要繼續完善compactionCommit程式碼,實現都在這裡

tCompaction

func (db *DB) tCompaction() {
	for {
		if db.tableNeedCompaction() {
			select {
			case x = <-db.tcompCmdC:
			case ch := <-db.tcompPauseC:
				db.pauseCompaction(ch)
				continue
			case <-db.closeC:
				return
			default:
			}
		} else {
			for i := range ackQ {
				ackQ[i].ack(nil)
				ackQ[i] = nil
			}
			ackQ = ackQ[:0]
			select {
			case x = <-db.tcompCmdC:
			case ch := <-db.tcompPauseC:
				db.pauseCompaction(ch)
				continue
			case <-db.closeC:
				return
			}
		}
		if x != nil {
			switch cmd := x.(type) {
			case cAuto:
				ackQ = append(ackQ, x)
			case cRange:
				x.ack(db.tableRangeCompaction(cmd.level, cmd.min, cmd.max))
			default:
				panic("leveldb: unknown command")
			}
			x = nil
		}
		db.tableAutoCompaction()
	}
}

計算是否要執行major compaction

func (v *version) computeCompaction() {
	for level, tables := range v.levels {
		var score float64
		size := tables.size()
		if level == 0 {
			// We treat level-0 specially by bounding the number of files
			// instead of number of bytes for two reasons:
			//
			// (1) With larger write-buffer sizes, it is nice not to do too
			// many level-0 compaction.
			//
			// (2) The files in level-0 are merged on every read and
			// therefore we wish to avoid too many files when the individual
			// file size is small (perhaps because of a small write-buffer
			// setting, or very high compression ratios, or lots of
			// overwrites/deletions).
			score = float64(len(tables)) / float64(v.s.o.GetCompactionL0Trigger())
		} else {
			score = float64(size) / float64(v.s.o.GetCompactionTotalSize(level))
		}

		if score > bestScore {
			bestLevel = level
			bestScore = score
		}

		statFiles[level] = len(tables)
		statSizes[level] = shortenb(int(size))
		statScore[level] = fmt.Sprintf("%.2f", score)
		statTotSize += size
	}

	v.cLevel = bestLevel
	v.cScore = bestScore
}

計算是否要compaction是邏輯是:計算一個分數,level 0是檔案個數/4,level 0以上就是檔案的總大小/預設的每個level的檔案大小總量;最後找出算出的值最大的一個賦值到v.cScore,level賦值到v.cLevel

最終使用的時候是判斷這個cScore是否>=1來決定是否要進行compaction

func (v *version) needCompaction() bool {
	return v.cScore >= 1 || atomic.LoadPointer(&v.cSeek) != nil
}

還有一個判斷是v.cSeek是否為空,這個是讀資料那邊用到的,等看到讀那邊的邏輯再講一下

基本上程式碼擼了一遍,但是隻是粗略的過了一遍,很多細節尚未涉及,有機會再詳細擼一遍。