以太坊原始碼分析(30)eth-bloombits和filter原始碼分析
阿新 • • 發佈:2019-01-29
## 以太坊的布隆過濾器
以太坊的區塊頭中包含了一個叫做logsBloom的區域。 這個區域儲存了當前區塊中所有的收據的日誌的布隆過濾器,一共是2048個bit。也就是256個位元組。
而我們的一個交易的收據包含了很多的日誌記錄。 每個日誌記錄包含了 合約的地址, 多個Topic。 而在我們的收據中也存在一個布隆過濾器,這個布隆過濾器記錄了所有的日誌記錄的資訊。如果我們看黃皮書裡面對日誌記錄的形式化定義。
O代表我們的日誌記錄,Oa代表logger的地址,Oto,Ot1代表日誌的Topics, Od代表時間。
Oa是20個位元組,Ot是32個位元組,Od是很多位元組
我們定義了一個布隆過濾器函式M,用來把一個日誌物件轉換成256位元組的hash
M3:2045是一個特別的函式,用來設定2048個bit位中的三位為1。
對於任意的輸入值,首先求他的KEC輸出, 然後通過取KEC輸出的 [0,1] [2,3],[4,5] 這幾位的值 對2048取模, 得到三個值, 這三個值就是輸出的2048中需要置位的下標。 也就是說對於任何一個輸入,如果它對應的三個下標的值不都為1,那麼它肯定不在這個區塊中。 當如如果對應的三位都為1,也不能說明一定在這個區塊中。 這就是布隆過濾器的特性。
收據中的布隆過濾器就是所有的日誌的布隆過濾器輸出的並集。
同時區塊頭中的logBloom,就是所有的收據的布隆過濾器的並集。
## ChainIndexer 和 BloomIndexer 最開始看到ChainIndexer,不是很明白是什麼功能。 其實從名字中可以看到,是Chain的索引。 在 eth中我們有看到BloomIndexer,這個就是布隆過濾器的索引。
在我們的協議中提供了查詢指定Log的功能。
使用者可以通過傳遞下面的引數來查詢指定的Log,開始的區塊號,結束的區塊號, 根據合約 Addresses指定的地址過濾,根據指定的Topics來過濾。
// FilterCriteria represents a request to create a new filter. type FilterCriteria struct { FromBlock *big.Int ToBlock *big.Int Addresses []common.Address Topics [][]common.Hash }
如果開始和結束之間間隔很大,那麼如果直接依次檢索每個區塊頭的logBloom區域是比較低效的。 因為每個區塊頭都是分開儲存的, 可能需要非常多的磁碟隨機訪問。
所以以太坊協議在本地維護了一套索引,用來加速這個過程。
大致原理是。 每4096個區塊稱為一個Section,一個Section裡面的logBloom會儲存在一起。對於每個Section, 用一個二維資料,A[2048 ][4096]來儲存。 第一維2048代表了bloom過濾器的長度2048個位元組。 第二維4096代表了一個Section裡面的所有區塊,每一個位置按照順序代表了其中的一個區塊。
- A[0][0]=blockchain[section*4096+0].logBloom[0],- A[0][1]=blockchain[section*4096+1].logBloom[0],- A[0][4096]=blockchain[section*4096+1].logBloom[0],- A[1][0]=blockchain[section*4096+0].logBloom[1],- A[1][1024]=blockchain[section*4096+1024].logBloom[1],- A[2047][1]=blockchain[section*4096+1].logBloom[2047],
如果Section填充完畢,那麼會寫成2048個KV。 ![image](picture/bloom_6.png)
## bloombit.go 程式碼分析
這個程式碼相對不是很獨立,如果單獨看這個程式碼,有點摸不著頭腦的感覺, 因為它只是實現了一些介面,具體的處理邏輯並不在這裡,而是在core裡面。 不過這裡我先結合之前講到的資訊分析一下。 後續更詳細的邏輯在分析core的程式碼的時候再詳細分析。
服務執行緒startBloomHandlers,這個方法是為了響應具體的查詢請求, 給定指定的Section和bit來從levelDB裡面查詢然後返回出去。 單獨看這裡有點摸不著頭腦。 這個方法的呼叫比較複雜。 涉及到core裡面的很多邏輯。 這裡先不細說了。 直到有這個方法就行了。
type Retrieval struct { Bit uint //Bit的取值 0-2047 代表了想要獲取哪一位的值 Sections []uint64 // 那些Section Bitsets [][]byte // 返回值 查詢出來的結果。 } // startBloomHandlers starts a batch of goroutines to accept bloom bit database // retrievals from possibly a range of filters and serving the data to satisfy. func (eth *Ethereum) startBloomHandlers() { for i := 0; i < bloomServiceThreads; i++ { go func() { for { select { case <-eth.shutdownChan: return case request := <-eth.bloomRequests: // request是一個通道 task := <-request //從通道里面獲取一個task task.Bitsets = make([][]byte, len(task.Sections)) for i, section := range task.Sections { head := core.GetCanonicalHash(eth.chainDb, (section+1)*params.BloomBitsBlocks-1) blob, err := bitutil.DecompressBytes(core.GetBloomBits(eth.chainDb, task.Bit, section, head), int(params.BloomBitsBlocks)/8) if err != nil { panic(err) } task.Bitsets[i] = blob } request <- task //通過request通道返回結果 } } }() } }
### 資料結構BloomIndexer物件主要使用者構建索引的過程,是core.ChainIndexer的一個介面實現,所以只實現了一些必須的介面。對於建立索引的邏輯還在core.ChainIndexer裡面。
// BloomIndexer implements a core.ChainIndexer, building up a rotated bloom bits index // for the Ethereum header bloom filters, permitting blazing fast filtering. type BloomIndexer struct { size uint64 // section size to generate bloombits for db ethdb.Database // database instance to write index data and metadata into gen *bloombits.Generator // generator to rotate the bloom bits crating the bloom index section uint64 // Section is the section number being processed currently 當前的section head common.Hash // Head is the hash of the last header processed }
// NewBloomIndexer returns a chain indexer that generates bloom bits data for the // canonical chain for fast logs filtering. func NewBloomIndexer(db ethdb.Database, size uint64) *core.ChainIndexer { backend := &BloomIndexer{ db: db, size: size, } table := ethdb.NewTable(db, string(core.BloomBitsIndexPrefix)) return core.NewChainIndexer(db, table, backend, size, bloomConfirms, bloomThrottling, "bloombits") }
Reset實現了ChainIndexerBackend的方法,啟動一個新的section
// Reset implements core.ChainIndexerBackend, starting a new bloombits index // section. func (b *BloomIndexer) Reset(section uint64) { gen, err := bloombits.NewGenerator(uint(b.size)) if err != nil { panic(err) } b.gen, b.section, b.head = gen, section, common.Hash{} }
Process實現了ChainIndexerBackend, 增加一個新的區塊頭到index // Process implements core.ChainIndexerBackend, adding a new header's bloom into // the index. func (b *BloomIndexer) Process(header *types.Header) { b.gen.AddBloom(uint(header.Number.Uint64()-b.section*b.size), header.Bloom) b.head = header.Hash() }
Commit方法實現了ChainIndexerBackend,持久化並寫入資料庫。 // Commit implements core.ChainIndexerBackend, finalizing the bloom section and // writing it out into the database. func (b *BloomIndexer) Commit() error { batch := b.db.NewBatch() for i := 0; i < types.BloomBitLength; i++ { bits, err := b.gen.Bitset(uint(i)) if err != nil { return err } core.WriteBloomBits(batch, uint(i), b.section, b.head, bitutil.CompressBytes(bits)) } return batch.Write() }
## filter/api.go 原始碼分析
eth/filter 包 包含了給使用者提供過濾的功能,使用者可以通過呼叫對交易或者區塊進行過濾,然後持續的獲取結果,如果5分鐘沒有操作,這個過濾器會被刪除。
過濾器的結構。
var ( deadline = 5 * time.Minute // consider a filter inactive if it has not been polled for within deadline ) // filter is a helper struct that holds meta information over the filter type // and associated subscription in the event system. type filter struct { typ Type // 過濾器的型別, 過濾什麼型別的資料 deadline *time.Timer // filter is inactiv when deadline triggers 當計時器響起的時候,會觸發定時器。 hashes []common.Hash //過濾出來的hash結果 crit FilterCriteria //過濾條件 logs []*types.Log //過濾出來的Log資訊 s *Subscription // associated subscription in event system 事件系統中的訂閱器。 }
構造方法
// PublicFilterAPI offers support to create and manage filters. This will allow external clients to retrieve various // information related to the Ethereum protocol such als blocks, transactions and logs. // PublicFilterAPI用來建立和管理過濾器。 允許外部的客戶端獲取以太坊協議的一些資訊,比如區塊資訊,交易資訊和日誌資訊。 type PublicFilterAPI struct { backend Backend mux *event.TypeMux quit chan struct{} chainDb ethdb.Database events *EventSystem filtersMu sync.Mutex filters map[rpc.ID]*filter } // NewPublicFilterAPI returns a new PublicFilterAPI instance. func NewPublicFilterAPI(backend Backend, lightMode bool) *PublicFilterAPI { api := &PublicFilterAPI{ backend: backend, mux: backend.EventMux(), chainDb: backend.ChainDb(), events: NewEventSystem(backend.EventMux(), backend, lightMode), filters: make(map[rpc.ID]*filter), } go api.timeoutLoop() return api }
### 超時檢查 // timeoutLoop runs every 5 minutes and deletes filters that have not been recently used. // Tt is started when the api is created. // 每隔5分鐘檢查一下。 如果過期的過濾器,刪除。 func (api *PublicFilterAPI) timeoutLoop() { ticker := time.NewTicker(5 * time.Minute) for { <-ticker.C api.filtersMu.Lock() for id, f := range api.filters { select { case <-f.deadline.C: f.s.Unsubscribe() delete(api.filters, id) default: continue } } api.filtersMu.Unlock() } }
NewPendingTransactionFilter,用來建立一個PendingTransactionFilter。 這種方式是用來給那種無法建立長連線的通道使用的(比如HTTP), 如果對於可以建立長連結的通道(比如WebSocket)可以使用rpc提供的傳送訂閱模式來處理,就不用持續的輪詢了 // NewPendingTransactionFilter creates a filter that fetches pending transaction hashes // as transactions enter the pending state. // // It is part of the filter package because this filter can be used throug the // `eth_getFilterChanges` polling method that is also used for log filters. // // https://github.com/ethereum/wiki/wiki/JSON-RPC#eth_newpendingtransactionfilter func (api *PublicFilterAPI) NewPendingTransactionFilter() rpc.ID { var ( pendingTxs = make(chan common.Hash) // 在事件系統訂閱這種訊息 pendingTxSub = api.events.SubscribePendingTxEvents(pendingTxs) ) api.filtersMu.Lock() api.filters[pendingTxSub.ID] = &filter{typ: PendingTransactionsSubscription, deadline: time.NewTimer(deadline), hashes: make([]common.Hash, 0), s: pendingTxSub} api.filtersMu.Unlock() go func() { for { select { case ph := <-pendingTxs: // 接收到pendingTxs,儲存在過濾器的hashes容器裡面。 api.filtersMu.Lock() if f, found := api.filters[pendingTxSub.ID]; found { f.hashes = append(f.hashes, ph) } api.filtersMu.Unlock() case <-pendingTxSub.Err(): api.filtersMu.Lock() delete(api.filters, pendingTxSub.ID) api.filtersMu.Unlock() return } } }() return pendingTxSub.ID }
輪詢: GetFilterChanges // GetFilterChanges returns the logs for the filter with the given id since // last time it was called. This can be used for polling. // GetFilterChanges 用來返回從上次呼叫到現在的所有的指定id的所有過濾資訊。這個可以用來輪詢。 // For pending transaction and block filters the result is []common.Hash. // (pending)Log filters return []Log. // 對於pending transaction和block的過濾器,返回結果型別是[]common.Hash. 對於pending Log 過濾器,返回的是 []Log // https://github.com/ethereum/wiki/wiki/JSON-RPC#eth_getfilterchanges func (api *PublicFilterAPI) GetFilterChanges(id rpc.ID) (interface{}, error) { api.filtersMu.Lock() defer api.filtersMu.Unlock() if f, found := api.filters[id]; found { if !f.deadline.Stop() { // 如果定時器已經觸發,但是filter還沒有移除,那麼我們先接收定時器的值,然後重置定時器 // timer expired but filter is not yet removed in timeout loop // receive timer value and reset timer <-f.deadline.C } f.deadline.Reset(deadline) switch f.typ { case PendingTransactionsSubscription, BlocksSubscription: hashes := f.hashes f.hashes = nil return returnHashes(hashes), nil case LogsSubscription: logs := f.logs f.logs = nil return returnLogs(logs), nil } } return []interface{}{}, fmt.Errorf("filter not found") }
對於可以建立長連線的通道,可以直接使用rpc的傳送訂閱模式, 這樣客戶端就可以直接接收到過濾資訊,不用呼叫輪詢的方式了。 可以看到這種模式下面並沒有新增到filters這個容器,也沒有超時管理了。也就是說支援兩種模式。
// NewPendingTransactions creates a subscription that is triggered each time a transaction // enters the transaction pool and was signed from one of the transactions this nodes manages. func (api *PublicFilterAPI) NewPendingTransactions(ctx context.Context) (*rpc.Subscription, error) { notifier, supported := rpc.NotifierFromContext(ctx) if !supported { return &rpc.Subscription{}, rpc.ErrNotificationsUnsupported } rpcSub := notifier.CreateSubscription() go func() { txHashes := make(chan common.Hash) pendingTxSub := api.events.SubscribePendingTxEvents(txHashes) for { select { case h := <-txHashes: notifier.Notify(rpcSub.ID, h) case <-rpcSub.Err(): pendingTxSub.Unsubscribe() return case <-notifier.Closed(): pendingTxSub.Unsubscribe() return } } }() return rpcSub, nil }
日誌過濾功能,根據FilterCriteria指定的引數,來對日誌進行過濾,開始區塊,結束區塊,地址和Topics,這裡面引入了一個新的物件filter // FilterCriteria represents a request to create a new filter. type FilterCriteria struct { FromBlock *big.Int ToBlock *big.Int Addresses []common.Address Topics [][]common.Hash } // GetLogs returns logs matching the given argument that are stored within the state. // // https://github.com/ethereum/wiki/wiki/JSON-RPC#eth_getlogs func (api *PublicFilterAPI) GetLogs(ctx context.Context, crit FilterCriteria) ([]*types.Log, error) { // Convert the RPC block numbers into internal representations if crit.FromBlock == nil { crit.FromBlock = big.NewInt(rpc.LatestBlockNumber.Int64()) } if crit.ToBlock == nil { crit.ToBlock = big.NewInt(rpc.LatestBlockNumber.Int64()) } // Create and run the filter to get all the logs // 建立了一個Filter物件 然後呼叫filter.Logs filter := New(api.backend, crit.FromBlock.Int64(), crit.ToBlock.Int64(), crit.Addresses, crit.Topics) logs, err := filter.Logs(ctx) if err != nil { return nil, err } return returnLogs(logs), err }
## filter.gofiter.go裡面定義了一個Filter物件。這個物件主要用來根據 區塊的BloomIndexer和布隆過濾器等來執行日誌的過濾功能。
### 資料結構 // 後端, 這個後端其實是在core裡面實現的。 布隆過濾器的主要演算法在core裡面實現了。 type Backend interface { ChainDb() ethdb.Database EventMux() *event.TypeMux HeaderByNumber(ctx context.Context, blockNr rpc.BlockNumber) (*types.Header, error) GetReceipts(ctx context.Context, blockHash common.Hash) (types.Receipts, error) SubscribeTxPreEvent(chan<- core.TxPreEvent) event.Subscription SubscribeChainEvent(ch chan<- core.ChainEvent) event.Subscription SubscribeRemovedLogsEvent(ch chan<- core.RemovedLogsEvent) event.Subscription SubscribeLogsEvent(ch chan<- []*types.Log) event.Subscription BloomStatus() (uint64, uint64) ServiceFilter(ctx context.Context, session *bloombits.MatcherSession) } // Filter can be used to retrieve and filter logs. type Filter struct { backend Backend // 後端 db ethdb.Database // 資料庫 begin, end int64 // 開始結束區塊 addresses []common.Address // 篩選地址 topics [][]common.Hash // 篩選主題 matcher *bloombits.Matcher // 布隆過濾器的匹配器 }
建構函式把address和topic都加入到filters容器。然後構建了一個bloombits.NewMatcher(size, filters)。這個函式在core裡面實現, 暫時不會講解。
// New creates a new filter which uses a bloom filter on blocks to figure out whether // a particular block is interesting or not. func New(backend Backend, begin, end int64, addresses []common.Address, topics [][]common.Hash) *Filter { // Flatten the address and topic filter clauses into a single bloombits filter // system. Since the bloombits are not positional, nil topics are permitted, // which get flattened into a nil byte slice. var filters [][][]byte if len(addresses) > 0 { filter := make([][]byte, len(addresses)) for i, address := range addresses { filter[i] = address.Bytes() } filters = append(filters, filter) } for _, topicList := range topics { filter := make([][]byte, len(topicList)) for i, topic := range topicList { filter[i] = topic.Bytes() } filters = append(filters, filter) } // Assemble and return the filter size, _ := backend.BloomStatus() return &Filter{ backend: backend, begin: begin, end: end, addresses: addresses, topics: topics, db: backend.ChainDb(), matcher: bloombits.NewMatcher(size, filters), } }
Logs 執行過濾
// Logs searches the blockchain for matching log entries, returning all from the // first block that contains matches, updating the start of the filter accordingly. func (f *Filter) Logs(ctx context.Context) ([]*types.Log, error) { // Figure out the limits of the filter range header, _ := f.backend.HeaderByNumber(ctx, rpc.LatestBlockNumber) if header == nil { return nil, nil } head := header.Number.Uint64() if f.begin == -1 { f.begin = int64(head) } end := uint64(f.end) if f.end == -1 { end = head } // Gather all indexed logs, and finish with non indexed ones var ( logs []*types.Log err error ) size, sections := f.backend.BloomStatus() // indexed 是指建立了索引的區塊的最大值。 如果過濾的範圍落在了建立了索引的部分。 // 那麼執行索引搜尋。 if indexed := sections * size; indexed > uint64(f.begin) { if indexed > end { logs, err = f.indexedLogs(ctx, end) } else { logs, err = f.indexedLogs(ctx, indexed-1) } if err != nil { return logs, err } } // 對於剩下的部分執行非索引的搜尋。 rest, err := f.unindexedLogs(ctx, end) logs = append(logs, rest...) return logs, err }
索引搜尋
// indexedLogs returns the logs matching the filter criteria based on the bloom // bits indexed available locally or via the network. func (f *Filter) indexedLogs(ctx context.Context, end uint64) ([]*types.Log, error) { // Create a matcher session and request servicing from the backend matches := make(chan uint64, 64) // 啟動matcher session, err := f.matcher.Start(uint64(f.begin), end, matches) if err != nil { return nil, err } defer session.Close(time.Second) // 進行過濾服務。 這些都在core裡面。後續分析core的程式碼會進行分析。 f.backend.ServiceFilter(ctx, session) // Iterate over the matches until exhausted or context closed var logs []*types.Log for { select { case number, ok := <-matches: // Abort if all matches have been fulfilled if !ok { // 沒有接收到值並且channel已經被關閉 f.begin = int64(end) + 1 //更新begin。以便於下面的非索引搜尋 return logs, nil } // Retrieve the suggested block and pull any truly matching logs header, err := f.backend.HeaderByNumber(ctx, rpc.BlockNumber(number)) if header == nil || err != nil { return logs, err } found, err := f.checkMatches(ctx, header) //查詢匹配的值 if err != nil { return logs, err } logs = append(logs, found...) case <-ctx.Done(): return logs, ctx.Err() } } }
checkMatches,拿到所有的收據,並從收據中拿到所有的日誌。 執行filterLogs方法。 // checkMatches checks if the receipts belonging to the given header contain any log events that // match the filter criteria. This function is called when the bloom filter signals a potential match. func (f *Filter) checkMatches(ctx context.Context, header *types.Header) (logs []*types.Log, err error) { // Get the logs of the block receipts, err := f.backend.GetReceipts(ctx, header.Hash()) if err != nil { return nil, err } var unfiltered []*types.Log for _, receipt := range receipts { unfiltered = append(unfiltered, ([]*types.Log)(receipt.Logs)...) } logs = filterLogs(unfiltered, nil, nil, f.addresses, f.topics) if len(logs) > 0 { return logs, nil } return nil, nil }
filterLogs,這個方法從給定的logs裡面找到能夠匹配上的。並返回。
// filterLogs creates a slice of logs matching the given criteria. func filterLogs(logs []*types.Log, fromBlock, toBlock *big.Int, addresses []common.Address, topics [][]common.Hash) []*types.Log { var ret []*types.Log Logs: for _, log := range logs { if fromBlock != nil && fromBlock.Int64() >= 0 && fromBlock.Uint64() > log.BlockNumber { continue } if toBlock != nil && toBlock.Int64() >= 0 && toBlock.Uint64() < log.BlockNumber { continue } if len(addresses) > 0 && !includes(addresses, log.Address) { continue } // If the to filtered topics is greater than the amount of topics in logs, skip. if len(topics) > len(log.Topics) { continue Logs } for i, topics := range topics { match := len(topics) == 0 // empty rule set == wildcard for _, topic := range topics { if log.Topics[i] == topic { match = true break } } if !match { continue Logs } } ret = append(ret, log) } return ret }
unindexedLogs,非索引查詢,迴圈遍歷所有的區塊。 首先用區塊裡面的header.Bloom來看是否有可能存在,如果有可能存在, 再使用checkMatches來檢索所有的匹配。 // indexedLogs returns the logs matching the filter criteria based on raw block // iteration and bloom matching. func (f *Filter) unindexedLogs(ctx context.Context, end uint64) ([]*types.Log, error) { var logs []*types.Log for ; f.begin <= int64(end); f.begin++ { header, err := f.backend.HeaderByNumber(ctx, rpc.BlockNumber(f.begin)) if header == nil || err != nil { return logs, err } if bloomFilter(header.Bloom, f.addresses, f.topics) { found, err := f.checkMatches(ctx, header) if err != nil { return logs, err } logs = append(logs, found...) } } return logs, nil }
## 總結filter原始碼包主要實現了兩個功能, - 提供了 釋出訂閱模式的filter RPC。用來給rpc客戶端提供實時的交易,區塊,日誌等的過濾- 提供了 基於bloomIndexer的日誌過濾模式,這種模式下,可以快速的對大量區塊執行布隆過濾操作。 還提供了歷史的日誌的過濾操作。
以太坊的區塊頭中包含了一個叫做logsBloom的區域。 這個區域儲存了當前區塊中所有的收據的日誌的布隆過濾器,一共是2048個bit。也就是256個位元組。
而我們的一個交易的收據包含了很多的日誌記錄。 每個日誌記錄包含了 合約的地址, 多個Topic。 而在我們的收據中也存在一個布隆過濾器,這個布隆過濾器記錄了所有的日誌記錄的資訊。如果我們看黃皮書裡面對日誌記錄的形式化定義。
O代表我們的日誌記錄,Oa代表logger的地址,Oto,Ot1代表日誌的Topics, Od代表時間。
Oa是20個位元組,Ot是32個位元組,Od是很多位元組
我們定義了一個布隆過濾器函式M,用來把一個日誌物件轉換成256位元組的hash
M3:2045是一個特別的函式,用來設定2048個bit位中的三位為1。
對於任意的輸入值,首先求他的KEC輸出, 然後通過取KEC輸出的 [0,1] [2,3],[4,5] 這幾位的值 對2048取模, 得到三個值, 這三個值就是輸出的2048中需要置位的下標。 也就是說對於任何一個輸入,如果它對應的三個下標的值不都為1,那麼它肯定不在這個區塊中。 當如如果對應的三位都為1,也不能說明一定在這個區塊中。 這就是布隆過濾器的特性。
收據中的布隆過濾器就是所有的日誌的布隆過濾器輸出的並集。
同時區塊頭中的logBloom,就是所有的收據的布隆過濾器的並集。
## ChainIndexer 和 BloomIndexer
在我們的協議中提供了查詢指定Log的功能。
使用者可以通過傳遞下面的引數來查詢指定的Log,開始的區塊號,結束的區塊號, 根據合約 Addresses指定的地址過濾,根據指定的Topics來過濾。
// FilterCriteria represents a request to create a new filter. type FilterCriteria struct { FromBlock *big.Int ToBlock *big.Int Addresses []common.Address Topics [][]common.Hash }
如果開始和結束之間間隔很大,那麼如果直接依次檢索每個區塊頭的logBloom區域是比較低效的。 因為每個區塊頭都是分開儲存的, 可能需要非常多的磁碟隨機訪問。
所以以太坊協議在本地維護了一套索引,用來加速這個過程。
大致原理是。 每4096個區塊稱為一個Section,一個Section裡面的logBloom會儲存在一起。對於每個Section, 用一個二維資料,A[2048
- A[0][0]=blockchain[section*4096+0].logBloom[0],- A[0][1]=blockchain[section*4096+1].logBloom[0],- A[0][4096]=blockchain[section*4096+1].logBloom[0],- A[1][0]=blockchain[section*4096+0].logBloom[1],- A[1][1024]=blockchain[section*4096+1024].logBloom[1],-
如果Section填充完畢,那麼會寫成2048個KV。 ![image](picture/bloom_6.png)
## bloombit.go 程式碼分析
這個程式碼相對不是很獨立,如果單獨看這個程式碼,有點摸不著頭腦的感覺, 因為它只是實現了一些介面,具體的處理邏輯並不在這裡,而是在core裡面。 不過這裡我先結合之前講到的資訊分析一下。 後續更詳細的邏輯在分析core的程式碼的時候再詳細分析。
服務執行緒startBloomHandlers,這個方法是為了響應具體的查詢請求, 給定指定的Section和bit來從levelDB裡面查詢然後返回出去。 單獨看這裡有點摸不著頭腦。 這個方法的呼叫比較複雜。 涉及到core裡面的很多邏輯。 這裡先不細說了。 直到有這個方法就行了。
type Retrieval struct { Bit uint //Bit的取值 0-2047 代表了想要獲取哪一位的值 Sections []uint64 // 那些Section Bitsets [][]byte // 返回值 查詢出來的結果。 } // startBloomHandlers starts a batch of goroutines to accept bloom bit database // retrievals from possibly a range of filters and serving the data to satisfy. func (eth *Ethereum) startBloomHandlers() { for i := 0; i < bloomServiceThreads; i++ { go func() { for { select { case <-eth.shutdownChan: return case request := <-eth.bloomRequests: // request是一個通道 task := <-request //從通道里面獲取一個task task.Bitsets = make([][]byte, len(task.Sections)) for i, section := range task.Sections { head := core.GetCanonicalHash(eth.chainDb, (section+1)*params.BloomBitsBlocks-1) blob, err := bitutil.DecompressBytes(core.GetBloomBits(eth.chainDb, task.Bit, section, head), int(params.BloomBitsBlocks)/8) if err != nil { panic(err) } task.Bitsets[i] = blob } request <- task //通過request通道返回結果 } } }() } }
### 資料結構BloomIndexer物件主要使用者構建索引的過程,是core.ChainIndexer的一個介面實現,所以只實現了一些必須的介面。對於建立索引的邏輯還在core.ChainIndexer裡面。
// BloomIndexer implements a core.ChainIndexer, building up a rotated bloom bits index // for the Ethereum header bloom filters, permitting blazing fast filtering. type BloomIndexer struct { size uint64 // section size to generate bloombits for db ethdb.Database // database instance to write index data and metadata into gen *bloombits.Generator // generator to rotate the bloom bits crating the bloom index section uint64 // Section is the section number being processed currently 當前的section head common.Hash // Head is the hash of the last header processed }
// NewBloomIndexer returns a chain indexer that generates bloom bits data for the // canonical chain for fast logs filtering. func NewBloomIndexer(db ethdb.Database, size uint64) *core.ChainIndexer { backend := &BloomIndexer{ db: db, size: size, } table := ethdb.NewTable(db, string(core.BloomBitsIndexPrefix)) return core.NewChainIndexer(db, table, backend, size, bloomConfirms, bloomThrottling, "bloombits") }
Reset實現了ChainIndexerBackend的方法,啟動一個新的section
// Reset implements core.ChainIndexerBackend, starting a new bloombits index // section. func (b *BloomIndexer) Reset(section uint64) { gen, err := bloombits.NewGenerator(uint(b.size)) if err != nil { panic(err) } b.gen, b.section, b.head = gen, section, common.Hash{} }
Process實現了ChainIndexerBackend, 增加一個新的區塊頭到index // Process implements core.ChainIndexerBackend, adding a new header's bloom into // the index. func (b *BloomIndexer) Process(header *types.Header) { b.gen.AddBloom(uint(header.Number.Uint64()-b.section*b.size), header.Bloom) b.head = header.Hash() }
Commit方法實現了ChainIndexerBackend,持久化並寫入資料庫。 // Commit implements core.ChainIndexerBackend, finalizing the bloom section and // writing it out into the database. func (b *BloomIndexer) Commit() error { batch := b.db.NewBatch() for i := 0; i < types.BloomBitLength; i++ { bits, err := b.gen.Bitset(uint(i)) if err != nil { return err } core.WriteBloomBits(batch, uint(i), b.section, b.head, bitutil.CompressBytes(bits)) } return batch.Write() }
## filter/api.go 原始碼分析
eth/filter 包 包含了給使用者提供過濾的功能,使用者可以通過呼叫對交易或者區塊進行過濾,然後持續的獲取結果,如果5分鐘沒有操作,這個過濾器會被刪除。
過濾器的結構。
var ( deadline = 5 * time.Minute // consider a filter inactive if it has not been polled for within deadline ) // filter is a helper struct that holds meta information over the filter type // and associated subscription in the event system. type filter struct { typ Type // 過濾器的型別, 過濾什麼型別的資料 deadline *time.Timer // filter is inactiv when deadline triggers 當計時器響起的時候,會觸發定時器。 hashes []common.Hash //過濾出來的hash結果 crit FilterCriteria //過濾條件 logs []*types.Log //過濾出來的Log資訊 s *Subscription // associated subscription in event system 事件系統中的訂閱器。 }
構造方法
// PublicFilterAPI offers support to create and manage filters. This will allow external clients to retrieve various // information related to the Ethereum protocol such als blocks, transactions and logs. // PublicFilterAPI用來建立和管理過濾器。 允許外部的客戶端獲取以太坊協議的一些資訊,比如區塊資訊,交易資訊和日誌資訊。 type PublicFilterAPI struct { backend Backend mux *event.TypeMux quit chan struct{} chainDb ethdb.Database events *EventSystem filtersMu sync.Mutex filters map[rpc.ID]*filter } // NewPublicFilterAPI returns a new PublicFilterAPI instance. func NewPublicFilterAPI(backend Backend, lightMode bool) *PublicFilterAPI { api := &PublicFilterAPI{ backend: backend, mux: backend.EventMux(), chainDb: backend.ChainDb(), events: NewEventSystem(backend.EventMux(), backend, lightMode), filters: make(map[rpc.ID]*filter), } go api.timeoutLoop() return api }
### 超時檢查 // timeoutLoop runs every 5 minutes and deletes filters that have not been recently used. // Tt is started when the api is created. // 每隔5分鐘檢查一下。 如果過期的過濾器,刪除。 func (api *PublicFilterAPI) timeoutLoop() { ticker := time.NewTicker(5 * time.Minute) for { <-ticker.C api.filtersMu.Lock() for id, f := range api.filters { select { case <-f.deadline.C: f.s.Unsubscribe() delete(api.filters, id) default: continue } } api.filtersMu.Unlock() } }
NewPendingTransactionFilter,用來建立一個PendingTransactionFilter。 這種方式是用來給那種無法建立長連線的通道使用的(比如HTTP), 如果對於可以建立長連結的通道(比如WebSocket)可以使用rpc提供的傳送訂閱模式來處理,就不用持續的輪詢了 // NewPendingTransactionFilter creates a filter that fetches pending transaction hashes // as transactions enter the pending state. // // It is part of the filter package because this filter can be used throug the // `eth_getFilterChanges` polling method that is also used for log filters. // // https://github.com/ethereum/wiki/wiki/JSON-RPC#eth_newpendingtransactionfilter func (api *PublicFilterAPI) NewPendingTransactionFilter() rpc.ID { var ( pendingTxs = make(chan common.Hash) // 在事件系統訂閱這種訊息 pendingTxSub = api.events.SubscribePendingTxEvents(pendingTxs) ) api.filtersMu.Lock() api.filters[pendingTxSub.ID] = &filter{typ: PendingTransactionsSubscription, deadline: time.NewTimer(deadline), hashes: make([]common.Hash, 0), s: pendingTxSub} api.filtersMu.Unlock() go func() { for { select { case ph := <-pendingTxs: // 接收到pendingTxs,儲存在過濾器的hashes容器裡面。 api.filtersMu.Lock() if f, found := api.filters[pendingTxSub.ID]; found { f.hashes = append(f.hashes, ph) } api.filtersMu.Unlock() case <-pendingTxSub.Err(): api.filtersMu.Lock() delete(api.filters, pendingTxSub.ID) api.filtersMu.Unlock() return } } }() return pendingTxSub.ID }
輪詢: GetFilterChanges // GetFilterChanges returns the logs for the filter with the given id since // last time it was called. This can be used for polling. // GetFilterChanges 用來返回從上次呼叫到現在的所有的指定id的所有過濾資訊。這個可以用來輪詢。 // For pending transaction and block filters the result is []common.Hash. // (pending)Log filters return []Log. // 對於pending transaction和block的過濾器,返回結果型別是[]common.Hash. 對於pending Log 過濾器,返回的是 []Log // https://github.com/ethereum/wiki/wiki/JSON-RPC#eth_getfilterchanges func (api *PublicFilterAPI) GetFilterChanges(id rpc.ID) (interface{}, error) { api.filtersMu.Lock() defer api.filtersMu.Unlock() if f, found := api.filters[id]; found { if !f.deadline.Stop() { // 如果定時器已經觸發,但是filter還沒有移除,那麼我們先接收定時器的值,然後重置定時器 // timer expired but filter is not yet removed in timeout loop // receive timer value and reset timer <-f.deadline.C } f.deadline.Reset(deadline) switch f.typ { case PendingTransactionsSubscription, BlocksSubscription: hashes := f.hashes f.hashes = nil return returnHashes(hashes), nil case LogsSubscription: logs := f.logs f.logs = nil return returnLogs(logs), nil } } return []interface{}{}, fmt.Errorf("filter not found") }
對於可以建立長連線的通道,可以直接使用rpc的傳送訂閱模式, 這樣客戶端就可以直接接收到過濾資訊,不用呼叫輪詢的方式了。 可以看到這種模式下面並沒有新增到filters這個容器,也沒有超時管理了。也就是說支援兩種模式。
// NewPendingTransactions creates a subscription that is triggered each time a transaction // enters the transaction pool and was signed from one of the transactions this nodes manages. func (api *PublicFilterAPI) NewPendingTransactions(ctx context.Context) (*rpc.Subscription, error) { notifier, supported := rpc.NotifierFromContext(ctx) if !supported { return &rpc.Subscription{}, rpc.ErrNotificationsUnsupported } rpcSub := notifier.CreateSubscription() go func() { txHashes := make(chan common.Hash) pendingTxSub := api.events.SubscribePendingTxEvents(txHashes) for { select { case h := <-txHashes: notifier.Notify(rpcSub.ID, h) case <-rpcSub.Err(): pendingTxSub.Unsubscribe() return case <-notifier.Closed(): pendingTxSub.Unsubscribe() return } } }() return rpcSub, nil }
日誌過濾功能,根據FilterCriteria指定的引數,來對日誌進行過濾,開始區塊,結束區塊,地址和Topics,這裡面引入了一個新的物件filter // FilterCriteria represents a request to create a new filter. type FilterCriteria struct { FromBlock *big.Int ToBlock *big.Int Addresses []common.Address Topics [][]common.Hash } // GetLogs returns logs matching the given argument that are stored within the state. // // https://github.com/ethereum/wiki/wiki/JSON-RPC#eth_getlogs func (api *PublicFilterAPI) GetLogs(ctx context.Context, crit FilterCriteria) ([]*types.Log, error) { // Convert the RPC block numbers into internal representations if crit.FromBlock == nil { crit.FromBlock = big.NewInt(rpc.LatestBlockNumber.Int64()) } if crit.ToBlock == nil { crit.ToBlock = big.NewInt(rpc.LatestBlockNumber.Int64()) } // Create and run the filter to get all the logs // 建立了一個Filter物件 然後呼叫filter.Logs filter := New(api.backend, crit.FromBlock.Int64(), crit.ToBlock.Int64(), crit.Addresses, crit.Topics) logs, err := filter.Logs(ctx) if err != nil { return nil, err } return returnLogs(logs), err }
## filter.gofiter.go裡面定義了一個Filter物件。這個物件主要用來根據 區塊的BloomIndexer和布隆過濾器等來執行日誌的過濾功能。
### 資料結構 // 後端, 這個後端其實是在core裡面實現的。 布隆過濾器的主要演算法在core裡面實現了。 type Backend interface { ChainDb() ethdb.Database EventMux() *event.TypeMux HeaderByNumber(ctx context.Context, blockNr rpc.BlockNumber) (*types.Header, error) GetReceipts(ctx context.Context, blockHash common.Hash) (types.Receipts, error) SubscribeTxPreEvent(chan<- core.TxPreEvent) event.Subscription SubscribeChainEvent(ch chan<- core.ChainEvent) event.Subscription SubscribeRemovedLogsEvent(ch chan<- core.RemovedLogsEvent) event.Subscription SubscribeLogsEvent(ch chan<- []*types.Log) event.Subscription BloomStatus() (uint64, uint64) ServiceFilter(ctx context.Context, session *bloombits.MatcherSession) } // Filter can be used to retrieve and filter logs. type Filter struct { backend Backend // 後端 db ethdb.Database // 資料庫 begin, end int64 // 開始結束區塊 addresses []common.Address // 篩選地址 topics [][]common.Hash // 篩選主題 matcher *bloombits.Matcher // 布隆過濾器的匹配器 }
建構函式把address和topic都加入到filters容器。然後構建了一個bloombits.NewMatcher(size, filters)。這個函式在core裡面實現, 暫時不會講解。
// New creates a new filter which uses a bloom filter on blocks to figure out whether // a particular block is interesting or not. func New(backend Backend, begin, end int64, addresses []common.Address, topics [][]common.Hash) *Filter { // Flatten the address and topic filter clauses into a single bloombits filter // system. Since the bloombits are not positional, nil topics are permitted, // which get flattened into a nil byte slice. var filters [][][]byte if len(addresses) > 0 { filter := make([][]byte, len(addresses)) for i, address := range addresses { filter[i] = address.Bytes() } filters = append(filters, filter) } for _, topicList := range topics { filter := make([][]byte, len(topicList)) for i, topic := range topicList { filter[i] = topic.Bytes() } filters = append(filters, filter) } // Assemble and return the filter size, _ := backend.BloomStatus() return &Filter{ backend: backend, begin: begin, end: end, addresses: addresses, topics: topics, db: backend.ChainDb(), matcher: bloombits.NewMatcher(size, filters), } }
Logs 執行過濾
// Logs searches the blockchain for matching log entries, returning all from the // first block that contains matches, updating the start of the filter accordingly. func (f *Filter) Logs(ctx context.Context) ([]*types.Log, error) { // Figure out the limits of the filter range header, _ := f.backend.HeaderByNumber(ctx, rpc.LatestBlockNumber) if header == nil { return nil, nil } head := header.Number.Uint64() if f.begin == -1 { f.begin = int64(head) } end := uint64(f.end) if f.end == -1 { end = head } // Gather all indexed logs, and finish with non indexed ones var ( logs []*types.Log err error ) size, sections := f.backend.BloomStatus() // indexed 是指建立了索引的區塊的最大值。 如果過濾的範圍落在了建立了索引的部分。 // 那麼執行索引搜尋。 if indexed := sections * size; indexed > uint64(f.begin) { if indexed > end { logs, err = f.indexedLogs(ctx, end) } else { logs, err = f.indexedLogs(ctx, indexed-1) } if err != nil { return logs, err } } // 對於剩下的部分執行非索引的搜尋。 rest, err := f.unindexedLogs(ctx, end) logs = append(logs, rest...) return logs, err }
索引搜尋
// indexedLogs returns the logs matching the filter criteria based on the bloom // bits indexed available locally or via the network. func (f *Filter) indexedLogs(ctx context.Context, end uint64) ([]*types.Log, error) { // Create a matcher session and request servicing from the backend matches := make(chan uint64, 64) // 啟動matcher session, err := f.matcher.Start(uint64(f.begin), end, matches) if err != nil { return nil, err } defer session.Close(time.Second) // 進行過濾服務。 這些都在core裡面。後續分析core的程式碼會進行分析。 f.backend.ServiceFilter(ctx, session) // Iterate over the matches until exhausted or context closed var logs []*types.Log for { select { case number, ok := <-matches: // Abort if all matches have been fulfilled if !ok { // 沒有接收到值並且channel已經被關閉 f.begin = int64(end) + 1 //更新begin。以便於下面的非索引搜尋 return logs, nil } // Retrieve the suggested block and pull any truly matching logs header, err := f.backend.HeaderByNumber(ctx, rpc.BlockNumber(number)) if header == nil || err != nil { return logs, err } found, err := f.checkMatches(ctx, header) //查詢匹配的值 if err != nil { return logs, err } logs = append(logs, found...) case <-ctx.Done(): return logs, ctx.Err() } } }
checkMatches,拿到所有的收據,並從收據中拿到所有的日誌。 執行filterLogs方法。 // checkMatches checks if the receipts belonging to the given header contain any log events that // match the filter criteria. This function is called when the bloom filter signals a potential match. func (f *Filter) checkMatches(ctx context.Context, header *types.Header) (logs []*types.Log, err error) { // Get the logs of the block receipts, err := f.backend.GetReceipts(ctx, header.Hash()) if err != nil { return nil, err } var unfiltered []*types.Log for _, receipt := range receipts { unfiltered = append(unfiltered, ([]*types.Log)(receipt.Logs)...) } logs = filterLogs(unfiltered, nil, nil, f.addresses, f.topics) if len(logs) > 0 { return logs, nil } return nil, nil }
filterLogs,這個方法從給定的logs裡面找到能夠匹配上的。並返回。
// filterLogs creates a slice of logs matching the given criteria. func filterLogs(logs []*types.Log, fromBlock, toBlock *big.Int, addresses []common.Address, topics [][]common.Hash) []*types.Log { var ret []*types.Log Logs: for _, log := range logs { if fromBlock != nil && fromBlock.Int64() >= 0 && fromBlock.Uint64() > log.BlockNumber { continue } if toBlock != nil && toBlock.Int64() >= 0 && toBlock.Uint64() < log.BlockNumber { continue } if len(addresses) > 0 && !includes(addresses, log.Address) { continue } // If the to filtered topics is greater than the amount of topics in logs, skip. if len(topics) > len(log.Topics) { continue Logs } for i, topics := range topics { match := len(topics) == 0 // empty rule set == wildcard for _, topic := range topics { if log.Topics[i] == topic { match = true break } } if !match { continue Logs } } ret = append(ret, log) } return ret }
unindexedLogs,非索引查詢,迴圈遍歷所有的區塊。 首先用區塊裡面的header.Bloom來看是否有可能存在,如果有可能存在, 再使用checkMatches來檢索所有的匹配。 // indexedLogs returns the logs matching the filter criteria based on raw block // iteration and bloom matching. func (f *Filter) unindexedLogs(ctx context.Context, end uint64) ([]*types.Log, error) { var logs []*types.Log for ; f.begin <= int64(end); f.begin++ { header, err := f.backend.HeaderByNumber(ctx, rpc.BlockNumber(f.begin)) if header == nil || err != nil { return logs, err } if bloomFilter(header.Bloom, f.addresses, f.topics) { found, err := f.checkMatches(ctx, header) if err != nil { return logs, err } logs = append(logs, found...) } } return logs, nil }
## 總結filter原始碼包主要實現了兩個功能, - 提供了 釋出訂閱模式的filter RPC。用來給rpc客戶端提供實時的交易,區塊,日誌等的過濾- 提供了 基於bloomIndexer的日誌過濾模式,這種模式下,可以快速的對大量區塊執行布隆過濾操作。 還提供了歷史的日誌的過濾操作。
網址:http://www.qukuailianxueyuan.io/
欲領取造幣技術與全套虛擬機器資料
區塊鏈技術交流QQ群:756146052 備註:CSDN
尹成學院微信:備註:CSDN