菜鳥系列Fabric原始碼學習 — 區塊同步
Fabric 1.4 原始碼分析 區塊同步
本文主要從原始碼層面介紹fabric peer同步區塊過程,peer同步區塊主要有2個過程:
1)peer組織的leader與orderer同步區塊
2)peer組織間peer同步區塊。
1. peer leader和orderer同步區塊
首先,orderer對外主要是broadcast和deliver兩個服務orderer服務介紹。並且我們知道peer和orderer同步區塊肯定是deliver服務實現的,但是到底是peer從orderer拉還是ordrer推送給peer呢?由於peer可以知道orderer資訊(配置塊)並且是grpc服務,則推斷是peer從orderer拉區塊。如果是拉區塊,那麼peer如何獲取區塊,獲取區塊的方式是什麼?
1.1 Orderer Deliver服務
首先,檢視orderer deliver服務是怎麼執行的,是如何同步區塊的。
當deliver服務被呼叫時,轉到Handle()方法處理
func (h *Handler) Handle(ctx context.Context, srv *Server) error { ... for { logger.Debugf("Attempting to read seek info message from %s", addr) // 接受發來envelope envelope, err := srv.Recv() ... // 分發區塊 status, err := h.deliverBlocks(ctx, srv, envelope) ... } }
其中,srv.Recv()接收envelope,在根據envelope資訊分發block。
func (h *Handler) deliverBlocks(ctx context.Context, srv *Server, envelope *cb.Envelope) (status cb.Status, err error) { addr := util.ExtractRemoteAddress(ctx) payload, err := utils.UnmarshalPayload(envelope.Payload) if payload.Header == nil { logger.Warningf("Malformed envelope received from %s with bad header", addr) return cb.Status_BAD_REQUEST, nil } chdr, err := utils.UnmarshalChannelHeader(payload.Header.ChannelHeader) err = h.validateChannelHeader(ctx, chdr) chain := h.ChainManager.GetChain(chdr.ChannelId) defer func() { labels := append(labels, "success", strconv.FormatBool(status == cb.Status_SUCCESS)) h.Metrics.RequestsCompleted.With(labels...).Add(1) }() accessControl, err := NewSessionAC(chain, envelope, srv.PolicyChecker, chdr.ChannelId, crypto.ExpiresAt) if err != nil { logger.Warningf("[channel: %s] failed to create access control object due to %s", chdr.ChannelId, err) return cb.Status_BAD_REQUEST, nil } if err := accessControl.Evaluate(); err != nil { logger.Warningf("[channel: %s] Client authorization revoked for deliver request from %s: %s", chdr.ChannelId, addr, err) return cb.Status_FORBIDDEN, nil } seekInfo := &ab.SeekInfo{} // 返回迭代器及起始區塊號 cursor, number := chain.Reader().Iterator(seekInfo.Start) defer cursor.Close() var stopNum uint64 switch stop := seekInfo.Stop.Type.(type) { case *ab.SeekPosition_Oldest: stopNum = number case *ab.SeekPosition_Newest: stopNum = chain.Reader().Height() - 1 case *ab.SeekPosition_Specified: stopNum = stop.Specified.Number if stopNum < number { logger.Warningf("[channel: %s] Received invalid seekInfo message from %s: start number %d greater than stop number %d", chdr.ChannelId, addr, number, stopNum) return cb.Status_BAD_REQUEST, nil } } for { if seekInfo.Behavior == ab.SeekInfo_FAIL_IF_NOT_READY { if number > chain.Reader().Height()-1 { return cb.Status_NOT_FOUND, nil } } var block *cb.Block var status cb.Status iterCh := make(chan struct{}) go func() { // 獲取區塊 block, status = cursor.Next() close(iterCh) }() select { case <-ctx.Done(): logger.Debugf("Context canceled, aborting wait for next block") return cb.Status_INTERNAL_SERVER_ERROR, errors.Wrapf(ctx.Err(), "context finished before block retrieved") case <-erroredChan: // TODO, today, the only user of the errorChan is the orderer consensus implementations. If the peer ever reports // this error, we will need to update this error message, possibly finding a way to signal what error text to return. logger.Warningf("Aborting deliver for request because the backing consensus implementation indicates an error") return cb.Status_SERVICE_UNAVAILABLE, nil case <-iterCh: // Iterator has set the block and status vars } if status != cb.Status_SUCCESS { logger.Errorf("[channel: %s] Error reading from channel, cause was: %v", chdr.ChannelId, status) return status, nil } // increment block number to support FAIL_IF_NOT_READY deliver behavior number++ if err := accessControl.Evaluate(); err != nil { logger.Warningf("[channel: %s] Client authorization revoked for deliver request from %s: %s", chdr.ChannelId, addr, err) return cb.Status_FORBIDDEN, nil } logger.Debugf("[channel: %s] Delivering block for (%p) for %s", chdr.ChannelId, seekInfo, addr) // 傳送區塊 if err := srv.SendBlockResponse(block); err != nil { logger.Warningf("[channel: %s] Error sending to %s: %s", chdr.ChannelId, addr, err) return cb.Status_INTERNAL_SERVER_ERROR, err } h.Metrics.BlocksSent.With(labels...).Add(1) // 如果到了client請求對最後區塊跳出迴圈 if stopNum == block.Header.Number { break } } logger.Debugf("[channel: %s] Done delivering to %s for (%p)", chdr.ChannelId, addr, seekInfo) return cb.Status_SUCCESS, nil }
- 反序列化envelope.Payload
- 對payload.Header和ChannelHeader進行驗證
- 根據通道獲取對應對chain
- 訪問控制相關驗證,policy,signature
- 從payload.data解析出SeekInfo
- 新建一個迭代器cursor
- 通過stop.type判斷stopNum()
- cursor.Next()獲取下個區塊及SendBlockResponse()傳送區塊
- 判斷是否達到請求的最後區塊,是就跳過迴圈
// Chain encapsulates chain operations and data.
type Chain interface {
// Sequence returns the current config sequence number, can be used to detect config changes
Sequence() uint64
// PolicyManager returns the current policy manager as specified by the chain configuration
PolicyManager() policies.Manager
// Reader returns the chain Reader for the chain
Reader() blockledger.Reader
// Errored returns a channel which closes when the backing consenter has errored
Errored() <-chan struct{}
}
type SeekInfo struct {
Start *SeekPosition `protobuf:"bytes,1,opt,name=start,proto3" json:"start,omitempty"`
Stop *SeekPosition `protobuf:"bytes,2,opt,name=stop,proto3" json:"stop,omitempty"`
Behavior SeekInfo_SeekBehavior `protobuf:"varint,3,opt,name=behavior,proto3,enum=orderer.SeekInfo_SeekBehavior" json:"behavior,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
}
1.2 peer從orderer同步區塊
這裡主要解決1個問題:peer如何觸發orderer deliver服務?即peer和orderer怎麼同步區塊的?
在介紹之前參閱peer節點啟動流程。在peer節點啟動過程中會執行peer.Initialize()方法,對peer所在的所有chain例項化。其中呼叫了createChain()介面建立鏈物件。在createChain()方法中呼叫了GossipService.InitializeChannel()方法。然後呼叫g.deliveryService[chainID].StartDeliverForChannel()方法獲取區塊。
func (d *deliverServiceImpl) StartDeliverForChannel(chainID string, ledgerInfo blocksprovider.LedgerInfo, finalizer func()) error {
d.lock.Lock()
defer d.lock.Unlock()
if d.stopping {
errMsg := fmt.Sprintf("Delivery service is stopping cannot join a new channel %s", chainID)
logger.Errorf(errMsg)
return errors.New(errMsg)
}
if _, exist := d.blockProviders[chainID]; exist {
errMsg := fmt.Sprintf("Delivery service - block provider already exists for %s found, can't start delivery", chainID)
logger.Errorf(errMsg)
return errors.New(errMsg)
} else {
client := d.newClient(chainID, ledgerInfo)
logger.Debug("This peer will pass blocks from orderer service to other peers for channel", chainID)
// 建立區塊deliver例項
d.blockProviders[chainID] = blocksprovider.NewBlocksProvider(chainID, client, d.conf.Gossip, d.conf.CryptoSvc)
// 執行
go d.launchBlockProvider(chainID, finalizer)
}
return nil
}
其中newClient()建立一個broadcastClient,傳入引數為requester.RequestBlocks(ledgerInfoProvider)方法。很顯然,peer是通過該方法獲取區塊的,那麼該方法主要實現是什麼?
func (b *blocksRequester) RequestBlocks(ledgerInfoProvider blocksprovider.LedgerInfo) error {
height, err := ledgerInfoProvider.LedgerHeight()
if err != nil {
logger.Errorf("Can't get ledger height for channel %s from committer [%s]", b.chainID, err)
return err
}
if height > 0 {
logger.Debugf("Starting deliver with block [%d] for channel %s", height, b.chainID)
if err := b.seekLatestFromCommitter(height); err != nil {
return err
}
} else {
logger.Debugf("Starting deliver with oldest block for channel %s", b.chainID)
if err := b.seekOldest(); err != nil {
return err
}
}
return nil
}
呼叫了seek_XXX方法,其中
type SeekInfo struct {
Start *SeekPosition `protobuf:"bytes,1,opt,name=start,proto3" json:"start,omitempty"`
Stop *SeekPosition `protobuf:"bytes,2,opt,name=stop,proto3" json:"stop,omitempty"`
Behavior SeekInfo_SeekBehavior `protobuf:"varint,3,opt,name=behavior,proto3,enum=orderer.SeekInfo_SeekBehavior" json:"behavior,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
}
func (b *blocksRequester) seekOldest() error {
seekInfo := &orderer.SeekInfo{
Start: &orderer.SeekPosition{Type: &orderer.SeekPosition_Oldest{Oldest: &orderer.SeekOldest{}}},
Stop: &orderer.SeekPosition{Type: &orderer.SeekPosition_Specified{Specified: &orderer.SeekSpecified{Number: math.MaxUint64}}},
Behavior: orderer.SeekInfo_BLOCK_UNTIL_READY,
}
//TODO- epoch and msgVersion may need to be obtained for nowfollowing usage in orderer/configupdate/configupdate.go
msgVersion := int32(0)
epoch := uint64(0)
tlsCertHash := b.getTLSCertHash()
env, err := utils.CreateSignedEnvelopeWithTLSBinding(common.HeaderType_DELIVER_SEEK_INFO, b.chainID, localmsp.NewSigner(), seekInfo, msgVersion, epoch, tlsCertHash)
if err != nil {
return err
}
// 傳送envelope給orderer獲取區塊
return b.client.Send(env)
}
從其中RequestBlocks()呼叫的2個方法可知,seekInfo的stopNum都為math.MaxUint64,則該方法會持續請求區塊知道最大值(可以看作現在到未來的所有區塊)。
上文可知,broadcastClient已經例項化,並且通過呼叫broadcastClient.onConnect向orderer傳送獲取區塊的envelope。在例項化後,呼叫launchBlockProvider。然後會呼叫 pb.DeliverBlocks()方法(開始獲取區塊)。
type broadcastClient struct {
stopFlag int32
stopChan chan struct{}
createClient clientFactory
shouldRetry retryPolicy
onConnect broadcastSetup
prod comm.ConnectionProducer
mutex sync.Mutex
blocksDeliverer blocksprovider.BlocksDeliverer
conn *connection
endpoint string
}
// DeliverBlocks used to pull out blocks from the ordering service to
// distributed them across peers
func (b *blocksProviderImpl) DeliverBlocks() {
errorStatusCounter := 0
statusCounter := 0
defer b.client.Close()
for !b.isDone() {
// 接收orderer分發的區塊
msg, err := b.client.Recv()
if err != nil {
logger.Warningf("[%s] Receive error: %s", b.chainID, err.Error())
return
}
switch t := msg.Type.(type) {
case *orderer.DeliverResponse_Status:
if t.Status == common.Status_SUCCESS {
logger.Warningf("[%s] ERROR! Received success for a seek that should never complete", b.chainID)
return
}
if t.Status == common.Status_BAD_REQUEST || t.Status == common.Status_FORBIDDEN {
logger.Errorf("[%s] Got error %v", b.chainID, t)
errorStatusCounter++
if errorStatusCounter > b.wrongStatusThreshold {
logger.Criticalf("[%s] Wrong statuses threshold passed, stopping block provider", b.chainID)
return
}
} else {
errorStatusCounter = 0
logger.Warningf("[%s] Got error %v", b.chainID, t)
}
maxDelay := float64(maxRetryDelay)
currDelay := float64(time.Duration(math.Pow(2, float64(statusCounter))) * 100 * time.Millisecond)
time.Sleep(time.Duration(math.Min(maxDelay, currDelay)))
if currDelay < maxDelay {
statusCounter++
}
if t.Status == common.Status_BAD_REQUEST {
b.client.Disconnect(false)
} else {
b.client.Disconnect(true)
}
continue
case *orderer.DeliverResponse_Block:
errorStatusCounter = 0
statusCounter = 0
blockNum := t.Block.Header.Number
marshaledBlock, err := proto.Marshal(t.Block)
if err != nil {
logger.Errorf("[%s] Error serializing block with sequence number %d, due to %s", b.chainID, blockNum, err)
continue
}
if err := b.mcs.VerifyBlock(gossipcommon.ChainID(b.chainID), blockNum, marshaledBlock); err != nil {
logger.Errorf("[%s] Error verifying block with sequnce number %d, due to %s", b.chainID, blockNum, err)
continue
}
numberOfPeers := len(b.gossip.PeersOfChannel(gossipcommon.ChainID(b.chainID)))
// Create payload with a block received
payload := createPayload(blockNum, marshaledBlock)
// Use payload to create gossip message
gossipMsg := createGossipMsg(b.chainID, payload)
logger.Debugf("[%s] Adding payload to local buffer, blockNum = [%d]", b.chainID, blockNum)
// Add payload to local state payloads buffer
if err := b.gossip.AddPayload(b.chainID, payload); err != nil {
logger.Warningf("Block [%d] received from ordering service wasn't added to payload buffer: %v", blockNum, err)
}
// Gossip messages with other nodes
logger.Debugf("[%s] Gossiping block [%d], peers number [%d]", b.chainID, blockNum, numberOfPeers)
if !b.isDone() {
// peer節點間通過gossip同步區塊
b.gossip.Gossip(gossipMsg)
}
default:
logger.Warningf("[%s] Received unknown: %v", b.chainID, t)
return
}
}
}
DeliverBlocks()方法介紹,首先呼叫 b.client.Recv()接收orderer傳過來的響應,
// Recv receives a message from the ordering service
func (bc *broadcastClient) Recv() (*orderer.DeliverResponse, error) {
o, err := bc.try(func() (interface{}, error) {
if bc.shouldStop() {
return nil, errors.New("closing")
}
return bc.tryReceive()
})
if err != nil {
return nil, err
}
return o.(*orderer.DeliverResponse), nil
}
這裡我們知道大致是peer從orderer這裡拉區塊的,但是還存在疑問,那就是peer如何觸發orderer的deliver服務的?peer是如何呼叫requestBlock方法的?
- peer 呼叫deliver服務
從Recv()介面會呼叫try(),try()會呼叫bc.doAction(action, resetAttemptCounter),然後呼叫bc.connect(),裡面呼叫bc.createClient(conn).Deliver(ctx),從而peer呼叫了orderer的deliver服務。 - peer 呼叫requestBlock方法
從上面可知,requestBlock賦值給broadcastSetup,而broadcastSetup在連線orderer後會立即呼叫。
// broadcastSetup is a function that is called by the broadcastClient immediately after each
// successful connection to the ordering service
1.3 組織間peer同步區塊
peer間同步區塊是通過gossip服務來同步的,並且通過上述程式碼可知,leader和orderer同步區塊也是伴隨著gossip服務啟動(不過是屬於leader的)。
// Gossip messages with other nodes
logger.Debugf("[%s] Gossiping block [%d], peers number [%d]", b.chainID, blockNum, numberOfPeers)
if !b.isDone() {
// peer節點間通過gossip同步區塊
b.gossip.Gossip(gossipMsg)
}
1.4 peer是如何寫區塊
首先,其他peer是通過gossip服務同步區塊,則儲存區塊應該是在gossip服務裡面呼叫的,回到peer啟動時gossip服務的設定
service.GetGossipService().InitializeChannel(bundle.ConfigtxValidator().ChainID(), ordererAddresses, service.Support{
Validator: validator,
Committer: c,
Store: store,
Cs: simpleCollectionStore,
IdDeserializeFactory: csStoreSupport,
})
裡面會呼叫
g.chains[chainID] = state.NewGossipStateProvider(chainID, servicesAdapter, coordinator,
g.metrics.StateMetrics, getStateConfiguration())
裡面會呼叫
// Listen for incoming communication
go s.listen()
// Deliver in order messages into the incoming channel
go s.deliverPayloads()
deliverPayloads()會將gossip.payload 區塊給寫入賬本。
func (s *GossipStateProviderImpl) deliverPayloads() {
defer s.done.Done()
for {
select {
// Wait for notification that next seq has arrived
case <-s.payloads.Ready():
logger.Debugf("[%s] Ready to transfer payloads (blocks) to the ledger, next block number is = [%d]", s.chainID, s.payloads.Next())
// Collect all subsequent payloads
for payload := s.payloads.Pop(); payload != nil; payload = s.payloads.Pop() {
rawBlock := &common.Block{}
if err := pb.Unmarshal(payload.Data, rawBlock); err != nil {
logger.Errorf("Error getting block with seqNum = %d due to (%+v)...dropping block", payload.SeqNum, errors.WithStack(err))
continue
}
if rawBlock.Data == nil || rawBlock.Header == nil {
logger.Errorf("Block with claimed sequence %d has no header (%v) or data (%v)",
payload.SeqNum, rawBlock.Header, rawBlock.Data)
continue
}
logger.Debugf("[%s] Transferring block [%d] with %d transaction(s) to the ledger", s.chainID, payload.SeqNum, len(rawBlock.Data.Data))
// Read all private data into slice
var p util.PvtDataCollections
if payload.PrivateData != nil {
err := p.Unmarshal(payload.PrivateData)
if err != nil {
logger.Errorf("Wasn't able to unmarshal private data for block seqNum = %d due to (%+v)...dropping block", payload.SeqNum, errors.WithStack(err))
continue
}
}
// 此處會儲存區塊
if err := s.commitBlock(rawBlock, p); err != nil {
if executionErr, isExecutionErr := err.(*vsccErrors.VSCCExecutionFailureError); isExecutionErr {
logger.Errorf("Failed executing VSCC due to %v. Aborting chain processing", executionErr)
return
}
logger.Panicf("Cannot commit block to the ledger due to %+v", errors.WithStack(err))
}
}
case <-s.stopCh:
s.stopCh <- struct{}{}
logger.Debug("State provider has been stopped, finishing to push new blocks.")
return
}
}
}
總結
- peer啟動時會啟動gossip服務模組
- leader啟動gossip服務時會從orderer源源不斷的請求塊
- peer之間通過gossip同步區塊,當leader獲取區塊後會通過區塊
- peer當gossip模組中會將gossip.payload區塊資訊寫入賬本