超級賬本 Fabric交易提交過程詳解
Peer 啟動後會在後臺執行 gossip 服務,包括若干 goroutine,實現位於 gossip/state/state.go#NewGossipStateProvider(chainID string, services *ServicesMediator, ledger ledgerResources) GossipStateProvider
方法。
其中一個協程專門負責處理收到的區塊資訊。
- // Deliver in order messages into the incoming channel
- go s.deliverPayloads()
deliverPayloads() 方法實現位於同一個檔案的 GossipStateProviderImpl 結構下,其主要過程為迴圈從收到的 Gossip 訊息載荷緩衝區按序拿到封裝訊息,解析後進行處理。核心程式碼邏輯如下:
- // gossip/state/state.go#GossipStateProviderImpl.deliverPayloads()
- for {
- select {
- case <-s.payloads.Ready(): // 等待訊息
- // 依次處理收到的訊息
- for payload := s.payloads.Pop(); payload != nil; payload = s.payloads.Pop() {
- rawBlock := &common.Block{}
- // 從載荷資料中嘗試解析區塊結構,失敗則嘗試下個訊息
- if err := pb.Unmarshal(payload.Data, rawBlock); err != nil {
- logger.Errorf("Error getting block with seqNum = %d due to (%+v)...dropping block", payload.SeqNum, errors.WithStack(err))
- continue
- }
- // 檢查區塊結構是否完整,失敗則嘗試下個訊息
- if rawBlock.Data == nil
|| rawBlock.Header == nil { - logger.Errorf("Block with claimed sequence %d has no header (%v) or data (%v)",
- payload.SeqNum, rawBlock.Header, rawBlock.Data)
- continue
- }
- // 從載荷中解析私密資料,失敗則嘗試下個訊息
- var p util.PvtDataCollections
- if payload.PrivateData != nil {
- err := p.Unmarshal(payload.PrivateData)
- if err != nil {
- logger.Errorf("Wasn't able to unmarshal private data for block seqNum = %d due to (%+v)...dropping block", payload.SeqNum, errors.WithStack(err))
- continue
- }
- }
- // 核心部分:提交區塊到本地賬本
- if err := s.commitBlock(rawBlock, p); err != nil {
- if executionErr, isExecutionErr := err.(*vsccErrors.VSCCExecutionFailureError); isExecutionErr {
- logger.Errorf("Failed executing VSCC due to %v. Aborting chain processing", executionErr)
- return
- }
- logger.Panicf("Cannot commit block to the ledger due to %+v", errors.WithStack(err))
- }
- }
- case <-s.stopCh: // 停止處理訊息
- s.stopCh <- struct{}{}
- logger.Debug("State provider has been stopped, finishing to push new blocks.")
- return
- }
- }
整體邏輯
s.commitBlock(rawBlock, p) 是對區塊進行處理和提交的核心邏輯,主要包括提交前準備、提交過程和提交後處理三部分,如下圖所示。
下面分別進行介紹三個階段的實現過程。
提交前準備
主要完成對區塊中交易格式的檢查和獲取關聯該區塊但缺失的私密資料,最後構建 blockAndPvtData 結構。
格式檢查
對區塊格式的檢查主要在 core/committer/txvalidator/validator.go#TxValidator.Validate(block *common.Block) error
方法中完成,包括檢查交易格式、對應賬本是否存在、是否雙花、滿足 VSCC 和 Policy 等。核心邏輯如下。
- // core/committer/txvalidator/validator.go#TxValidator.Validate(block *common.Block) error
- // 併發驗證交易有效性
- go func() {
- for tIdx, d := range block.Data.Data {
- // ensure that we don't have too many concurrent validation workers
- v.Support.Acquire(context.Background(), 1)
- go func(index int, data []byte) {
- defer v.Support.Release(1)
- v.validateTx(&blockValidationRequest{
- d: data,
- block: block,
- tIdx: index,
- }, results)
- }(tIdx, d)
- }
- }()
- // 處理檢查結果
- for i := 0; i < len(block.Data.Data); i++ {
- res := <-results
- if res.err != nil {
- if err == nil || res.tIdx < errPos {
- err = res.err
- errPos = res.tIdx
- }
- } else { // 設定有效標記,記錄鏈碼名稱,更新鏈碼資訊
- txsfltr.SetFlag(res.tIdx, res.validationCode)
- if res.validationCode == peer.TxValidationCode_VALID {
- if res.txsChaincodeName != nil {
- txsChaincodeNames[res.tIdx] = res.txsChaincodeName
- }
- if res.txsUpgradedChaincode != nil {
- txsUpgradedChaincodes[res.tIdx] = res.txsUpgradedChaincode
- }
- txidArray[res.tIdx] = res.txid
- }
- }
- }
- // 標記雙花交易
- if v.Support.Capabilities().ForbidDuplicateTXIdInBlock() {
- markTXIdDuplicates(txidArray, txsfltr)
- }
- // 防止多次升級操作
- v.invalidTXsForUpgradeCC(txsChaincodeNames, txsUpgradedChaincodes, txsfltr)
- // 確認所有交易都完成檢查
- err = v.allValidated(txsfltr, block)
- if err != nil {
- return err
- }
- // 更新交易有效標籤到元資料
- utils.InitBlockMetadata(block)
- block.Metadata.Metadata[common.BlockMetadataIndex_TRANSACTIONS_FILTER] = txsflt
獲取缺失的私密資料
首先根據已有的私密資料計算區塊中交易關聯的讀寫集資訊。如果仍有缺失,則嘗試從其它節點獲取。
- // gossip/privdata/coordinator.go#coordinator.StoreBlock(block *common.Block, privateDataSets util.PvtDataCollections) error
- // 利用已有的私密資料計算讀寫集
- ownedRWsets, err := computeOwnedRWsets(block, privateDataSets)
- privateInfo, err := c.listMissingPrivateData(block, ownedRWsets)
- // 獲取缺失私密資料
- for len(privateInfo.missingKeys) > 0 && time.Now().Before(limit) {
- c.fetchFromPeers(block.Header.Number, ownedRWsets, privateInfo)
- if len(privateInfo.missingKeys) == 0 {
- break
- }
- time.Sleep(pullRetrySleepInterval)
- }
構建 blockAndPvtData 結構
blockAndPvtData 結構用於後續的提交工作,因此,需要包括相關的區塊和私密資料。
主要實現邏輯如下:
- // gossip/privdata/coordinator.go#coordinator.StoreBlock(block *common.Block, privateDataSets util.PvtDataCollections) error
- // 填充私密讀寫集資訊
- for seqInBlock, nsRWS := range ownedRWsets.bySeqsInBlock() {
- rwsets := nsRWS.toRWSet()
- blockAndPvtData.BlockPvtData[seqInBlock] = &ledger.TxPvtData{
- SeqInBlock: seqInBlock,
- WriteSet: rwsets,
- }
- }
- // 填充缺失私密資料資訊
- for missingRWS := range privateInfo.missingKeys {
- blockAndPvtData.Missing = append(blockAndPvtData.Missing, ledger.MissingPrivateData{
- TxId: missingRWS.txID,
- Namespace: missingRWS.namespace,
- Collection: missingRWS.collection,
- SeqInBlock: int(missingRWS.seqInBlock),
- })
- }
提交過程
提交過程是核心過程,主要包括預處理、驗證交易、更新本地區塊鏈結構、更新本地資料庫結構四個步驟。
預處理
預處理階段負責構造一個有效的內部區塊結構。包括:
- 處理 Endorser 交易:只保留有效的 Endorser 交易;
- 處理配置交易:獲取配置更新的模擬結果,放入讀寫集;
- 校驗寫集合:如果狀態資料庫採用 CouchDB,要按照 CouchDB 約束檢查鍵值的格式:Key 必須為非下劃線開頭的 UTF-8 字串,Value 必須為合法的字典結構,且不包括下劃線開頭的鍵名。
核心實現程式碼位於 core/ledger/kvledger/txmgmt/validator/valimpl/helper.go#preprocessProtoBlock(txmgr txmgr.TxMgr, validateKVFunc func(key string, value []byte) error, block *common.Block, doMVCCValidation bool) (*valinternal.Block, error),如下所示:
- // core/ledger/kvledger/txmgmt/validator/valimpl/helper.go#preprocessProtoBlock(txmgr txmgr.TxMgr, validateKVFunc func(key string, value []byte) error, block *common.Block, doMVCCValidation bool) (*valinternal.Block, error)
- // 處理 endorser 交易
- if txType == common.HeaderType_ENDORSER_TRANSACTION {
- // extract actions from the envelope message
- respPayload, err := utils.GetActionFromEnvelope(envBytes)
- if err != nil {
- txsFilter.SetFlag(txIndex, peer.TxValidationCode_NIL_TXACTION)
- continue
- }
- txRWSet = &rwsetutil.TxRwSet{}
- if err = txRWSet.FromProtoBytes(respPayload.Results); err != nil {
- txsFilter.SetFlag(txIndex, peer.TxValidationCode_INVALID_OTHER_REASON)
- continue
- }
- } else { // 處理配置更新交易
- rwsetProto, err := processNonEndorserTx(env, chdr.TxId, txType, txmgr, !doMVCCValidation)
- if _, ok := err.(*customtx.InvalidTxError); ok {
- txsFilter.SetFlag(txIndex, peer.TxValidationCode_INVALID_OTHER_REASON)
- continue
- }
- if err != nil {
- return nil, err
- }
- if rwsetProto != nil {
- if txRWSet, err = rwsetutil.TxRwSetFromProtoMsg(rwsetProto); err != nil {
- return nil, err
- }
- }
- }
- // 檢查讀寫集是否符合資料庫要求格式
- if txRWSet != nil {
- if err := validateWriteset(txRWSet, validateKVFunc); err != nil {
- txsFilter.SetFlag(txIndex, peer.TxValidationCode_INVALID_WRITESET)
- continue
- }
- b.Txs = append(b.Txs, &valinternal.Transaction{IndexInBlock: txIndex, ID: chdr.TxId, RWSet: txRWSet})
- }
驗證交易
接下來,對區塊中交易進行 MVCC 檢查,並校驗私密讀寫集,更新區塊元資料中的交易有效標記列表。
MVCC 檢查需要逐個驗證塊中的 Endorser 交易,滿足下列條件者才認為有效:
- 公共讀集合中 key 版本在該交易前未變;
- RangeQuery 的結果未變;
- 私密讀集合中 key 版本未變。
實現在 core/ledger/kvledger/txmgmt/validator/statebasedval/state_based_validator.go#Validator.ValidateAndPrepareBatch(block *valinternal.Block, doMVCCValidation bool) (*valinternal.PubAndHashUpdates, error) 方法中,主要邏輯如下:
- // core/ledger/kvledger/txmgmt/validator/statebasedval/state_based_validator.go#Validator.ValidateAndPrepareBatch(block *valinternal.Block, doMVCCValidation bool) (*valinternal.PubAndHashUpdates, error)
- // 依次順序檢查每個交易
- for _, tx := range block.Txs {
- var validationCode peer.TxValidationCode
- var err error
- // 檢查 Endorser 交易
- if validationCode, err = v.validateEndorserTX(tx.RWSet, doMVCCValidation, updates); err != nil {
- return nil, err
- }
- tx.ValidationCode = validationCode
- // 有效交易則將其讀寫集放到更新集合中
- if validationCode == peer.TxValidationCode_VALID {
- committingTxHeight := version.NewHeight(block.Num, uint64(tx.IndexInBlock))
- updates.ApplyWriteSet(tx.RWSet, committingTxHeight)
- } else {
- logger.Warningf("Block [%d] Transaction index [%d] TxId [%s] marked as invalid by state validator. Reason code [%s]",
- block.Num, tx.IndexInBlock, tx.ID, validationCode.String())
- }
- }
對私密讀寫集的校驗主要是再次檢查 Hash 值是否匹配,實現在 core/ledger/kvledger/txmgmt/validator/valimpl/helper.go#validatePvtdata(tx *valinternal.Transaction, pvtdata *ledger.TxPvtData) error
方法中。
- // core/ledger/kvledger/txmgmt/validator/valimpl/helper.go#validatePvtdata(tx *valinternal.Transaction, pvtdata *ledger.TxPvtData) error
- for _, nsPvtdata := range pvtdata.WriteSet.NsPvtRwset {
- for _, collPvtdata := range nsPvtdata.CollectionPvtRwset {
- collPvtdataHash := util.ComputeHash(collPvtdata.Rwset)
- hashInPubdata := tx.RetrieveHash(nsPvtdata.Namespace, collPvtdata.CollectionName)
- // 重新計算私密資料 Hash 值,對比公共資料中的記錄
- if !bytes.Equal(collPvtdataHash, hashInPubdata) {
- return &validator.ErrPvtdataHashMissmatch{
- Msg: fmt.Sprintf(`Hash of pvt data for collection [%s:%s] does not match with the corresponding hash in the public data.
- public hash = [%#v], pvt data hash = [%#v]`, nsPvtdata.Namespace, collPvtdata.CollectionName, hashInPubdata, collPvtdataHash),
- }
- }
- }
- }
最後,更新區塊元資料中的交易有效標記列表,實現位於 core/ledger/kvledger/txmgmt/validator/valimpl/helper.go#postprocessProtoBlock(block *common.Block, validatedBlock *valinternal.Block)
方法,程式碼如下所示。
- // core/ledger/kvledger/txmgmt/validator/valimpl/helper.go#postprocessProtoBlock(block *common.Block, validatedBlock *valinternal.Block)
- func postprocessProtoBlock(block *common.Block, validatedBlock *valinternal.Block) {
- txsFilter := util.TxValidationFlags(block.Metadata.Metadata[common.BlockMetadataIndex_TRANSACTIONS_FILTER])
- for _, tx := range validatedBlock.Txs {
- txsFilter.SetFlag(tx.IndexInBlock, tx.ValidationCode)
- }
- block.Metadata.Metadata[common.BlockMetadataIndex_TRANSACTIONS_FILTER] = txsFilter
- }
接下來,需要更新本地的賬本結構,包括區塊鏈結構和相關的本地資料庫。
更新本地區塊鏈結構
入口在 core/ledger/ledgerstorage/store.go#Store.CommitWithPvtData(blockAndPvtdata *ledger.BlockAndPvtData) error
方法中,主要包括如下步驟:
- 將區塊寫入本地 Chunk 檔案;
- 更新索引資料庫(區塊號、Hash值、檔案指標、交易偏移、區塊元資料);
- 更新所提交的區塊號到私密資料庫;
區塊寫入 Chunk 檔案主要實現在 common/ledger/blkstorage/fsblkstorage/blockfile_mgr.go#blockfileMgr.addBlock(block *common.Block) error
方法中,主要邏輯如下所示:
- // common/ledger/blkstorage/fsblkstorage/blockfile_mgr.go#blockfileMgr.addBlock(block *common.Block) error
- // 計算長度資訊
- blockBytesLen := len(blockBytes)
- blockBytesEncodedLen := proto.EncodeVarint(uint64(blockBytesLen))
- totalBytesToAppend := blockBytesLen + len(blockBytesEncodedLen)
- // 新增長度資訊到 Chunk 檔案
- err = mgr.currentFileWriter.append(blockBytesEncodedLen, false)
- // 更新 checkpoint 資訊
- newCPInfo := &checkpointInfo{
- latestFileChunkSuffixNum: currentCPInfo.latestFileChunkSuffixNum,
- latestFileChunksize: currentCPInfo.latestFileChunksize + totalBytesToAppend,
- isChainEmpty: false,
- lastBlockNumber: block.Header.Number}
- mgr.saveCurrentInfo(newCPInfo, false);
- // 更新區塊在檔案中索引位置和交易偏移量
- blockFLP := &fileLocPointer{fileSuffixNum: newCPInfo.latestFileChunkSuffixNum}
- blockFLP.offset = currentOffset
- for _, txOffset := range txOffsets {
- txOffset.loc.offset += len(blockBytesEncodedLen)
- }
- // 更新索引資料庫
- mgr.index.indexBlock(&blockIdxInfo{
- blockNum: block.Header.Number, blockHash: blockHash,
- flp: blockFLP, txOffsets: txOffsets, metadata: block.Metadata})
- // 更新 checkpoint 資訊和區塊鏈資訊
- mgr.updateCheckpoint(newCPInfo)
- mgr.updateBlockchainInfo(blockHash, block)
更新本地資料庫結構
更新資料庫是提交交易的最後一步,主要包括如下步驟:
- 刪除過期私密資料;
- 更新私密資料生命週期記錄資料庫;
- 更新本地公共狀態資料庫和私密狀態資料庫;
- 如果啟用了歷史資料庫,更新資料。
實現程式碼在 core/ledger/kvledger/txmgmt/txmgr/lockbasedtxmgr/lockbased_txmgr.go#LockBasedTxMgr.Commit() error
方法中,主要邏輯如下。
- // core/ledger/kvledger/txmgmt/txmgr/lockbasedtxmgr/lockbased_txmgr.go#LockBasedTxMgr.Commit() error
- // 準備過期的私密鍵值清理
- txmgr.pvtdataPurgeMgr.PrepareForExpiringKeys(txmgr.current.blockNum())
- // 更新私密資料生命週期記錄資料庫,這裡記錄了每個私密鍵值的存活期限
- if err := txmgr.pvtdataPurgeMgr.DeleteExpiredAndUpdateBookkeeping(
- txmgr.current.batch.PvtUpdates, txmgr.current.batch.HashUpdates); err != nil{
- return err
- }
- // 更新本地公共狀態資料庫和私密狀態資料庫
- if err := txmgr.db.ApplyPrivacyAwareUpdates(txmgr.current.batch, commitHeight); err != nil {
- return err
- }
- // 如果啟用了歷史資料庫,更新資料
- if ledgerconfig.IsHistoryDBEnabled() {
- if err := l.historyDB.Commit(block); err != nil {
- panic(errors.WithMessage(err, "Error during commit to history db"))
- }
- }
提交後處理
提交後的處理比較簡單,包括清理本地的臨時狀態資料庫和更新賬本高度資訊。
清理工作包括區塊關聯的臨時私密資料和舊區塊關聯的臨時私密資料。
======關於本文 =======
更多區塊鏈深度技術可參考 區塊鏈技術指南 開源專案。
轉載請註明原文連結。
Peer 啟動後會在後臺執行 gossip 服務,包括若干 goroutine,實現位於 gossip/state/state.go#NewGossipStateProvider(chainID string, services *ServicesMediator, ledger ledgerResources) GossipStateProvider
方法。
其中一個協程專門負責處理收到的區塊資訊。
- // Deliver in order messages into the incoming channel
- go s.deliverPayloads()
deliverPayloads() 方法實現位於同一個檔案的 GossipStateProviderImpl 結構下,其主要過程為迴圈從收到的 Gossip 訊息載荷緩衝區按序拿到封裝訊息,解析後進行處理。核心程式碼邏輯如下:
- // gossip/state/state.go#GossipStateProviderImpl.deliverPayloads()
- for {
- select {
- case <-s.payloads.Ready(): // 等待訊息
- // 依次處理收到的訊息
- for payload := s.payloads.Pop(); payload != nil; payload = s.payloads.Pop() {
- rawBlock := &common.Block{}
- // 從載荷資料中嘗試解析區塊結構,失敗則嘗試下個訊息
- if err := pb.Unmarshal(payload.Data, rawBlock); err != nil {
- logger.Errorf("Error getting block with seqNum = %d due to (%+v)...dropping block", payload.SeqNum, errors.WithStack(err))
- continue
- }
- // 檢查區塊結構是否完整,失敗則嘗試下個訊息
- if rawBlock.Data == nil || rawBlock.Header == nil {
- logger.Errorf("Block with claimed sequence %d has no header (%v) or data (%v)",
- payload.SeqNum, rawBlock.Header, rawBlock.Data)
- continue
- }
- // 從載荷中解析私密資料,失敗則嘗試下個訊息
- var p util.PvtDataCollections
- if payload.PrivateData != nil {
- err := p.Unmarshal(payload.PrivateData)
- if err != nil {
- logger.Errorf("Wasn't able to unmarshal private data for block seqNum = %d due to (%+v)...dropping block", payload.SeqNum, errors.WithStack(err))
- continue
- }
- }
- // 核心部分:提交區塊到本地賬本
- if err := s.commitBlock(rawBlock, p); err != nil {
- if executionErr, isExecutionErr := err.(*vsccErrors.VSCCExecutionFailureError); isExecutionErr {
- logger.Errorf("Failed executing VSCC due to %v. Aborting chain processing", executionErr)
- return
- }
- logger.Panicf("Cannot commit block to the ledger due to %+v", errors.WithStack(err))
- }
- }
- case <-s.stopCh: // 停止處理訊息
- s.stopCh <- struct{}{}
- logger.Debug("State provider has been stopped, finishing to push new blocks.")
- return
- }
- }
整體邏輯
s.commitBlock(rawBlock, p) 是對區塊進行處理和提交的核心邏輯,主要包括提交前準備、提交過程和提交後處理三部分,如下圖所示。
下面分別進行介紹三個階段的實現過程。
提交前準備
主要完成對區塊中交易格式的檢查和獲取關聯該區塊但缺失的私密資料,最後構建 blockAndPvtData 結構。
格式檢查
對區塊格式的檢查主要在 core/committer/txvalidator/validator.go#TxValidator.Validate(block *common.Block) error
方法中完成,包括檢查交易格式、對應賬本是否存在、是否雙花、滿足 VSCC 和 Policy 等。核心邏輯如下。
- // core/committer/txvalidator/validator.go#TxValidator.Validate(block *common.Block) error
- // 併發驗證交易有效性
- go func() {
- for tIdx, d := range block.Data.Data {
- // ensure that we don't have too many concurrent validation workers
- v.Support.Acquire(context.Background(), 1)
- go func(index int, data []byte) {
- defer v.Support.Release(1)
- v.validateTx(&blockValidationRequest{
- d: data,
- block: block,
- tIdx: index,
- }, results)
- }(tIdx, d)
- }
- }()
- // 處理檢查結果
- for i := 0; i < len(block.Data.Data); i++ {
- res := <-results
- if res.err != nil {
- if err == nil || res.tIdx < errPos {
- err = res.err
- errPos = res.tIdx
- }
- } else { // 設定有效標記,記錄鏈碼名稱,更新鏈碼資訊
- txsfltr.SetFlag(res.tIdx, res.validationCode)
- if res.validationCode == peer.TxValidationCode_VALID {
- if res.txsChaincodeName != nil {
- txsChaincodeNames[res.tIdx] = res.txsChaincodeName
- }
- if res.txsUpgradedChaincode != nil {
- txsUpgradedChaincodes[res.tIdx] = res.txsUpgradedChaincode
- }
- txidArray[res.tIdx] = res.txid
- }
- }
- }
- // 標記雙花交易
- if v.Support.Capabilities().ForbidDuplicateTXIdInBlock() {
- markTXIdDuplicates(txidArray, txsfltr)
- }
- // 防止多次升級操作
- v.invalidTXsForUpgradeCC(txsChaincodeNames, txsUpgradedChaincodes, txsfltr)
- // 確認所有交易都完成檢查
- err = v.allValidated(txsfltr, block)
- if err != nil {
- return err<