1. 程式人生 > >菜鳥學習Fabric原始碼學習 — Endorser背書節點

菜鳥學習Fabric原始碼學習 — Endorser背書節點

Fabric 1.4 原始碼分析 Endorser背書節點

本文件主要介紹fabric背書節點的主要功能及其實現。

1. 簡介

Endorser節點是peer節點所扮演的一種角色,在peer啟動時會建立Endorser背書伺服器,並註冊到本地gRPC伺服器(7051埠)上對外提供服務,對請求的簽名提案訊息執行啟動鏈碼容器、模擬執行鏈碼、背書籤名等流程。所有客戶端提交到賬本的呼叫交易都需要背書節點背書,當客戶端收集到足夠的背書資訊之後,再將簽名提案訊息、模擬執行的結果以及背書資訊打包成交易資訊發給orderer節點排序出塊。

背書者Endorser在一個交易流中充當的作用如下:

  • 客戶端傳送一個背書申請(SignedProposal)到Endorser。
  • Endorser對申請進行背書,傳送一個申請應答(ProposalResponse)到客戶端。
  • 客戶端將申請應答中的背書組裝到一個交易請求(SignedTransaction)中。

2. 背書伺服器初始化

當peer節點啟動時,會註冊背書伺服器。

serverEndorser := endorser.NewEndorserServer(privDataDist, endorserSupport, pr, metricsProvider)
auth := authHandler.ChainFilters(serverEndorser, authFilters...)
// Register the Endorser server
pb.RegisterEndorserServer(peerServer.Server(), auth)

其中,背書服務最重要的介面為

// EndorserServer is the server API for Endorser service.
type EndorserServer interface {
    ProcessProposal(context.Context, *SignedProposal) (*ProposalResponse, error)
}

ProcessProposal()服務介面主要功能為接收和處理簽名提案訊息(SignedProposal)、啟動鏈碼容器、執行呼叫鏈碼以及進行簽名背書。

3. 背書服務

在ProcessProposal()服務中,主要存在以下流程:

  1. 呼叫preProcess()方法檢查和校驗簽名提案的合法性
  2. 呼叫SimulateProposal()方法呼叫鏈碼容器並模擬執行提案
  3. 呼叫endorseProposal()方法對模擬執行結果簽名背書,並返回提案響應訊息

原始碼如下所示:

func (e *Endorser) ProcessProposal(ctx context.Context, signedProp *pb.SignedProposal) (*pb.ProposalResponse, error) {
    ...
    // 0 -- check and validate
    vr, err := e.preProcess(signedProp)
    if err != nil {
        resp := vr.resp
        return resp, err
    }

    prop, hdrExt, chainID, txid := vr.prop, vr.hdrExt, vr.chainID, vr.txid
    txParams := &ccprovider.TransactionParams{
        ChannelID:            chainID,
        TxID:                 txid,
        SignedProp:           signedProp,
        Proposal:             prop,
        TXSimulator:          txsim,
        HistoryQueryExecutor: historyQueryExecutor,
    }
    // 1 -- simulate
    cd, res, simulationResult, ccevent, err := e.SimulateProposal(txParams, hdrExt.ChaincodeId)
    if err != nil {
        return &pb.ProposalResponse{Response: &pb.Response{Status: 500, Message: err.Error()}}, nil
    }
    ...
    // 2 -- endorse and get a marshalled ProposalResponse message
    var pResp *pb.ProposalResponse
    if chainID == "" {
        pResp = &pb.ProposalResponse{Response: res}
    } else {
        pResp, err = e.endorseProposal(ctx, chainID, txid, signedProp, prop, res, simulationResult, ccevent, hdrExt.PayloadVisibility, hdrExt.ChaincodeId, txsim, cd)
        ...
    }
    pResp.Response = res
    return pResp, nil
}

3.1 檢查和校驗簽名提案的合法性

preProcess()方法對簽名提案訊息進行預處理,主要包括驗證訊息格式和簽名的合法性、驗證提案訊息對應鏈碼檢查是否是系統鏈碼並且不為外部呼叫、交易的唯一性、驗證是否滿足對應通道的訪問控制策略。

func (e *Endorser) preProcess(signedProp *pb.SignedProposal) (*validateResult, error) {
    vr := &validateResult{}
    // 1. 驗證訊息格式和簽名合法性
    prop, hdr, hdrExt, err := validation.ValidateProposalMessage(signedProp)
    chdr, err := putils.UnmarshalChannelHeader(hdr.ChannelHeader)
    shdr, err := putils.GetSignatureHeader(hdr.SignatureHeader)
    // block invocations to security-sensitive system chaincodes
    // 2. 驗證鏈碼
    if e.s.IsSysCCAndNotInvokableExternal(hdrExt.ChaincodeId.Name) {
        vr.resp = &pb.ProposalResponse{Response: &pb.Response{Status: 500, Message: err.Error()}}
        return vr, err
    }

    chainID := chdr.ChannelId
    txid := chdr.TxId

    if chainID != "" {
        // 3. 驗證交易唯一性
        if _, err = e.s.GetTransactionByID(chainID, txid); err == nil {
            err = errors.Errorf("duplicate transaction found [%s]. Creator [%x]", txid, shdr.Creator)
            vr.resp = &pb.ProposalResponse{Response: &pb.Response{Status: 500, Message: err.Error()}}
            return vr, err
        }
        if !e.s.IsSysCC(hdrExt.ChaincodeId.Name) {
            // check that the proposal complies with the Channel's writers
            // 4. 驗證acl
            if err = e.s.CheckACL(signedProp, chdr, shdr, hdrExt); err != nil {
                e.Metrics.ProposalACLCheckFailed.With(meterLabels...).Add(1)
                return vr, err
            }
        }
    } else {

    }
    vr.prop, vr.hdrExt, vr.chainID, vr.txid = prop, hdrExt, chainID, txid
    return vr, nil
}

3.1.1 驗證訊息格式和簽名合法性

preProcess()呼叫ValidateProposalMessage()對訊息進行驗證。主要針對訊息的格式、簽名、交易id進行驗證。

首先呼叫validateCommonHeader()校驗Proposal.Header的合法性。

// checks for a valid Header
func validateCommonHeader(hdr *common.Header) (*common.ChannelHeader, *common.SignatureHeader, error) {
    if hdr == nil {
        return nil, nil, errors.New("nil header")
    }
    chdr, err := utils.UnmarshalChannelHeader(hdr.ChannelHeader)
    shdr, err := utils.GetSignatureHeader(hdr.SignatureHeader)
    // 校驗訊息型別是否屬於HeaderType_ENDORSER_TRANSACTION、HeaderType_CONFIG_UPDATE、HeaderType_CONFIG、HeaderType_TOKEN_TRANSACTION,並且校驗Epoch是否為0
    err = validateChannelHeader(chdr)
    // 校驗shdr shdr.Nonce  shdr.Creator是否為nil,或長度是否為0
    err = validateSignatureHeader(shdr)
    return chdr, shdr, nil
}

接著呼叫checkSignatureFromCreator()對簽名進行校驗。其中,首先校驗傳入引數是否為nil,接著creator.Validate()對建立者creator進行驗證。

err = checkSignatureFromCreator(shdr.Creator, signedProp.Signature, signedProp.ProposalBytes, chdr.ChannelId)

然後對交易id進行驗證,驗證交易id是否與計算的交易id一致

err = utils.CheckTxID(
        chdr.TxId,
        shdr.Nonce,
        shdr.Creator)

// 計算交易id
func ComputeTxID(nonce, creator []byte) (string, error) {
    digest, err := factory.GetDefault().Hash(
        append(nonce, creator...),
        &bccsp.SHA256Opts{})
    if err != nil {
        return "", err
    }
    return hex.EncodeToString(digest), nil
}

最後根據訊息型別進行分類處理:

switch common.HeaderType(chdr.Type) {
case common.HeaderType_CONFIG:
    fallthrough
case common.HeaderType_ENDORSER_TRANSACTION:
    chaincodeHdrExt, err := validateChaincodeProposalMessage(prop, hdr)
    if err != nil {
        return nil, nil, nil, err
    }
    return prop, hdr, chaincodeHdrExt, err
default:
    return nil, nil, nil, errors.Errorf("unsupported proposal type %d", common.HeaderType(chdr.Type))
}

其中,validateChaincodeProposalMessage()方法驗證輸入引數不為nil,呼叫GetChaincodeHeaderExtension()方法獲取chaincodeHdrExt,並校驗chaincodeHdrExt.ChaincodeId是否為nil以及chaincodeHdrExt.PayloadVisibility是否不為nil(當前為nil)

3.1.2 檢查是否是系統鏈碼並且不為外部呼叫

preProcess()呼叫IsSysCCAndNotInvokableExternal()方法驗證提案訊息頭部hdrExt.ChaincodeId.Name鏈碼名對應的鏈碼是否為允許外部呼叫的系統鏈碼。遍歷所有系統鏈碼(lscc,vscc,escc,qscc,cscc),其中vscc、escc不為外部呼叫

func (p *Provider) IsSysCCAndNotInvokableExternal(name string) bool {
    for _, sysCC := range p.SysCCs {
        if sysCC.Name() == name {
            return !sysCC.InvokableExternal()
        }
    }

    if isDeprecatedSysCC(name) {
        return true
    }

    return false
}

func isDeprecatedSysCC(name string) bool {
    return name == "vscc" || name == "escc"
}

3.1.3 檢查簽名提案訊息交易id的唯一性

首先檢視是否存在該賬本,然後檢視賬本是否存在該交易id。

func (s *SupportImpl) GetTransactionByID(chid, txID string) (*pb.ProcessedTransaction, error) {
    lgr := s.Peer.GetLedger(chid)
    if lgr == nil {
        return nil, errors.Errorf("failed to look up the ledger for Channel %s", chid)
    }
    tx, err := lgr.GetTransactionByID(txID)
    if err != nil {
        return nil, errors.WithMessage(err, "GetTransactionByID failed")
    }
    return tx, nil
}

3.1.4 驗證是否滿足對應通道的訪問控制策略

背書節點在背書過程中會檢查是否滿足應用通道的 Writers 策略。CheckACL()方法最後會呼叫core/endorser/support.go CheckACL()

func (s *SupportImpl) CheckACL(signedProp *pb.SignedProposal, chdr *common.ChannelHeader, shdr *common.SignatureHeader, hdrext *pb.ChaincodeHeaderExtension) error {
    return s.ACLProvider.CheckACL(resources.Peer_Propose, chdr.ChannelId, signedProp)
}

其中:

Peer_Propose = "peer/Propose"
d.cResourcePolicyMap[resources.Peer_Propose] = CHANNELWRITERS

會根據簽名提案訊息型別呼叫
core/aclmgmt/defaultaclprovider.go CheckACL()方法

case *pb.SignedProposal:
        return d.policyChecker.CheckPolicy(channelID, policy, typedData)

最終會呼叫

func (p *policyChecker) CheckPolicyBySignedData(channelID, policyName string, sd []*common.SignedData) error {
    if channelID == "" {
        return errors.New("Invalid channel ID name during check policy on signed data. Name must be different from nil.")
    }

    if policyName == "" {
        return fmt.Errorf("Invalid policy name during check policy on signed data on channel [%s]. Name must be different from nil.", channelID)
    }

    if sd == nil {
        return fmt.Errorf("Invalid signed data during check policy on channel [%s] with policy [%s]", channelID, policyName)
    }

    // Get Policy
    policyManager, _ := p.channelPolicyManagerGetter.Manager(channelID)
    if policyManager == nil {
        return fmt.Errorf("Failed to get policy manager for channel [%s]", channelID)
    }

    // Recall that get policy always returns a policy object
    policy, _ := policyManager.GetPolicy(policyName)

    // Evaluate the policy
    err := policy.Evaluate(sd)
    if err != nil {
        return fmt.Errorf("Failed evaluating policy on signed data during check policy on channel [%s] with policy [%s]: [%s]", channelID, policyName, err)
    }
    return nil
}

其中:

sd := []*common.SignedData{{
    Data:      signedProp.ProposalBytes,
    Identity:  shdr.Creator,
    Signature: signedProp.Signature,
}}

3.2 呼叫鏈碼並模擬執行提案

首先,ProcessProposal()方法呼叫方法acquireTxSimulator()根據鏈碼判斷是否需要建立交易模擬器TxSimulator,如果需要則建立交易模擬器TxSimulator(無法查詢歷史記錄)以及歷史記錄查詢器HistoryQueryExecutor,接著再呼叫SimulateProposal()模擬執行交易提案訊息,並返回模擬執行結果。
其中,鏈碼qscc、cscc不需要交易模擬器。

unc acquireTxSimulator(chainID string, ccid *pb.ChaincodeID) bool {
    if chainID == "" {
        return false
    }

    // ¯\_(ツ)_/¯ locking.
    // Don't get a simulator for the query and config system chaincode.
    // These don't need the simulator and its read lock results in deadlocks.
    switch ccid.Name {
    case "qscc", "cscc":
        return false
    default:
        return true
    }
}

在SimulateProposal()方法中,首先判斷呼叫鏈碼是否為系統鏈碼:

  • 是 獲取鏈碼版本GetSysCCVersion()
  • 否 檢查例項化策略以及獲取版本CheckInstantiationPolicy()

然後呼叫callChaincode()呼叫鏈碼。接著從交易模擬器中獲取模擬執行結果。其中,如果私密資料模擬執行結果不為nil,則分發私密資料。最後獲取模擬執行結果的公有資料。

if simResult, err = txParams.TXSimulator.GetTxSimulationResults(); err != nil {
    txParams.TXSimulator.Done()
    return nil, nil, nil, nil, err
}

if err := e.distributePrivateData(txParams.ChannelID, txParams.TxID, pvtDataWithConfig, endorsedAt); err != nil {
    return nil, nil, nil, nil, err
}
            
if pubSimResBytes, err = simResult.GetPubSimulationBytes(); err != nil {
    return nil, nil, nil, nil, err
}       

3.2.1 檢查例項化策略

CheckInstantiationPolicy()會呼叫GetChaincodeData()嘗試從快取或者本地檔案系統獲取已安裝的鏈碼包CCPackage,再解析成ChaincodeData物件ccdata。再與賬本中儲存的對應鏈碼的例項化策略進行比較。

func CheckInstantiationPolicy(name, version string, cdLedger *ChaincodeData) error {
    ccdata, err := GetChaincodeData(name, version)
    if err != nil {
        return err
    }
    if ccdata.InstantiationPolicy != nil {
        if !bytes.Equal(ccdata.InstantiationPolicy, cdLedger.InstantiationPolicy) {
            return fmt.Errorf("Instantiation policy mismatch for cc %s/%s", name, version)
        }
    }

    return nil
}

例項化策略在安裝鏈碼時指定

3.2.2 呼叫鏈碼

在SimulateProposal()方法中,會呼叫callChaincode()方法呼叫鏈碼。首先執行Execute()方法呼叫鏈碼,然後在針對“deploy”和“upgrade”操作進行處理。

3.2.2.1 Execute()操作

SimulateProposal()方法呼叫Execute()執行鏈碼,最終會呼叫core/chaincode/chaincode_support.go Execute()方法。

func (cs *ChaincodeSupport) Execute(txParams *ccprovider.TransactionParams, cccid *ccprovider.CCContext, input *pb.ChaincodeInput) (*pb.Response, *pb.ChaincodeEvent, error) {
    resp, err := cs.Invoke(txParams, cccid, input)
    return processChaincodeExecutionResult(txParams.TxID, cccid.Name, resp, err)
}
  • Invoke()方法 core/chaincode/chaincode_support.go Invoke()
    啟動鏈碼容器,呼叫鏈碼
func (cs *ChaincodeSupport) Invoke(txParams *ccprovider.TransactionParams, cccid *ccprovider.CCContext, input *pb.ChaincodeInput) (*pb.ChaincodeMessage, error) {
    h, err := cs.Launch(txParams.ChannelID, cccid.Name, cccid.Version, txParams.TXSimulator)
    if err != nil {
        return nil, err
    }
    cctype := pb.ChaincodeMessage_TRANSACTION
    return cs.execute(cctype, txParams, cccid, input, h)
}

==訊息型別為:ChaincodeMessage_TRANSACTION==,其中還呼叫了execute()方法。

  • processChaincodeExecutionResult()方法 core/chaincode/chaincode_support.go processChaincodeExecutionResult()方法
    對鏈碼執行結果進行處理
func processChaincodeExecutionResult(txid, ccName string, resp *pb.ChaincodeMessage, err error) (*pb.Response, *pb.ChaincodeEvent, error) {
    ...
    if resp.ChaincodeEvent != nil {
        resp.ChaincodeEvent.ChaincodeId = ccName
        resp.ChaincodeEvent.TxId = txid
    }

    switch resp.Type {
    case pb.ChaincodeMessage_COMPLETED:
        res := &pb.Response{}
        err := proto.Unmarshal(resp.Payload, res)
        if err != nil {
            return nil, nil, errors.Wrapf(err, "failed to unmarshal response for transaction %s", txid)
        }
        return res, resp.ChaincodeEvent, nil

    case pb.ChaincodeMessage_ERROR:
        return nil, resp.ChaincodeEvent, errors.Errorf("transaction returned with failure: %s", resp.Payload)

    default:
        return nil, nil, errors.Errorf("unexpected response type %d for transaction %s", resp.Type, txid)
    }
}
3.2.2.2 "deploy"/"upgrade" 操作

主要實現方法為ExecuteLegacyInit(),該流程和Execute()操作類似

func (cs *ChaincodeSupport) ExecuteLegacyInit(txParams *ccprovider.TransactionParams, cccid *ccprovider.CCContext, spec *pb.ChaincodeDeploymentSpec) (*pb.Response, *pb.ChaincodeEvent, error) {
    ccci := ccprovider.DeploymentSpecToChaincodeContainerInfo(spec)
    ccci.Version = cccid.Version

    err := cs.LaunchInit(ccci)
    if err != nil {
        return nil, nil, err
    }

    cname := ccci.Name + ":" + ccci.Version
    h := cs.HandlerRegistry.Handler(cname)
    if h == nil {
        return nil, nil, errors.Wrapf(err, "[channel %s] claimed to start chaincode container for %s but could not find handler", txParams.ChannelID, cname)
    }

    resp, err := cs.execute(pb.ChaincodeMessage_INIT, txParams, cccid, spec.GetChaincodeSpec().Input, h)
    return processChaincodeExecutionResult(txParams.TxID, cccid.Name, resp, err)
}

其中,==訊息型別為ChaincodeMessage_INIT==

3.2.3 處理模擬執行結果

針對模擬執行的結果進行處理。對鏈碼模擬執行以後,將模擬執行結果寫入交易模擬器TXSimulator中。通過呼叫GetTxSimulationResults()方法可以獲取模擬執行結果。TxSimulationResults包含公有資料讀寫集PubSimulationResults以及私有資料讀寫集PvtSimulationResults。

3.2.3.1 獲取模擬執行結果

SimulateProposal()方法會呼叫GetTxSimulationResults()方法獲取模擬執行結果。
原始碼如下所示。

func (b *RWSetBuilder) GetTxSimulationResults() (*ledger.TxSimulationResults, error) {
    // 獲取交易模擬執行結果的交易私密資料讀寫集
    pvtData := b.getTxPvtReadWriteSet()
    var err error

    var pubDataProto *rwset.TxReadWriteSet
    var pvtDataProto *rwset.TxPvtReadWriteSet

    // Populate the collection-level hashes into pub rwset and compute the proto bytes for pvt rwset
    // 計算私密資料hash
    if pvtData != nil {
        if pvtDataProto, err = pvtData.toProtoMsg(); err != nil {
            return nil, err
        }
        // 遍歷計算私密資料hash值
        for _, ns := range pvtDataProto.NsPvtRwset {
            for _, coll := range ns.CollectionPvtRwset {
                // 計算並設定私密資料hash 
                b.setPvtCollectionHash(ns.Namespace, coll.CollectionName, coll.Rwset)
            }
        }
    }
    // Compute the proto bytes for pub rwset
    // 獲取交易模擬執行結果的公有資料讀寫集
    pubSet := b.GetTxReadWriteSet()
    if pubSet != nil {
        if pubDataProto, err = b.GetTxReadWriteSet().toProtoMsg(); err != nil {
            return nil, err
        }
    }
    // 構造交易模擬執行結果
    return &ledger.TxSimulationResults{
        PubSimulationResults: pubDataProto,
        PvtSimulationResults: pvtDataProto,
    }, nil
}
3.2.3.2 資料處理
  • 私密資料處理
    SimulateProposal()方法會檢查模擬執行結果裡面的PvtSimulationResults是否為nil,如果不為nil,則會通過AssemblePvtRWSet()方法將TxPvtReadWriteSet,擴充到TxPvtReadWriteSetWithConfigInfo,並新增與私有讀寫集相關的可用集合配置資訊。再獲取當前賬本高度。呼叫gossip模組的distributePrivateData()方法(本質上是gossip/service/gossip_service.go DistributePrivateData方法)將私密資料分發到通道內符合策略的其他peer節點上。並暫時儲存私密資料到本地瞬時資料庫(transient store)中(交易驗證和提交賬本時會進行處理)。
func (g *gossipServiceImpl) DistributePrivateData(chainID string, txID string, privData *transientstore.TxPvtReadWriteSetWithConfigInfo, blkHt uint64) error {
    g.lock.RLock()
    handler, exists := g.privateHandlers[chainID]
    g.lock.RUnlock()
    if !exists {
        return errors.Errorf("No private data handler for %s", chainID)
    }

    if err := handler.distributor.Distribute(txID, privData, blkHt); err != nil {
        logger.Error("Failed to distributed private collection, txID", txID, "channel", chainID, "due to", err)
        return err
    }

    if err := handler.coordinator.StorePvtData(txID, privData, blkHt); err != nil {
        logger.Error("Failed to store private data into transient store, txID",
            txID, "channel", chainID, "due to", err)
        return err
    }
    return nil
}
  • 公有資料處理
    序列號公有資料讀寫集並返回結果。
if pubSimResBytes, err = simResult.GetPubSimulationBytes(); err != nil {
            return nil, nil, nil, nil, err
}

func (txSim *TxSimulationResults) GetPubSimulationBytes() ([]byte, error) {
    return proto.Marshal(txSim.PubSimulationResults)
}

3.3 簽名背書

在ProcessProposal()方法中,首先會判斷通道id是否為nil,如果為nil,則直接返回響應結果(例如install操作)。如果不為nil,會呼叫endorseProposal()方法對模擬執行結果進行簽名和背書。在endorseProposal()方法中,會構造Context物件,再呼叫EndorseWithPlugin()裡面會呼叫getOrCreatePlugin()建立plugin,然後呼叫proposalResponsePayloadFromContext()方法,在該方法中會計算背書結果hash以及封裝模擬執行結果、鏈碼event事件以及鏈碼響應結果等(資料結構為ProposalResponsePayload),在序列化成[]byte陣列,最後呼叫Endorse()方法執行簽名背書操作(由於escc現在是外掛形式執行,裡面會進行判斷。預設執行escc)。

func (e *DefaultEndorsement) Endorse(prpBytes []byte, sp *peer.SignedProposal) (*peer.Endorsement, []byte, error) {
    signer, err := e.SigningIdentityForRequest(sp)
    if err != nil {
        return nil, nil, errors.Wrap(err, "failed fetching signing identity")
    }
    // serialize the signing identity
    identityBytes, err := signer.Serialize()
    if err != nil {
        return nil, nil, errors.Wrapf(err, "could not serialize the signing identity")
    }

    // sign the concatenation of the proposal response and the serialized endorser identity with this endorser's key
    signature, err := signer.Sign(append(prpBytes, identityBytes...))
    if err != nil {
        return nil, nil, errors.Wrapf(err, "could not sign the proposal response payload")
    }
    endorsement := &peer.Endorsement{Signature: signature, Endorser: identityBytes}
    return endorsement, prpBytes, nil
}