超級賬本hyperledger fabric第五集:共識排序及原始碼閱讀
阿新 • • 發佈:2018-12-20
一.共識機制
達成共識需要3個階段,交易背書,交易排序,交易驗證
- 交易背書:模擬的
- 交易排序:確定交易順序,最終將排序好的交易打包區塊分發
- 交易驗證:區塊儲存前要進行一下交易驗證
二.orderer節點的作用
- 交易排序
- 目的:保證系統的最終一致性(有限狀態機)
- solo:單節點排序
- kafka:外接的分散式訊息佇列
- 區塊分發
- orderer中的區塊並不是最終持久化的區塊
- 是一箇中間狀態的區塊
- 包含了所有交易,不管是有效還是無效,都會打包傳給組織的錨節點
- 多通道的資料隔離
- 客戶端可以使用某個通道,傳送交易
三.原始碼目錄
- 從goland中閱讀
- 原始碼目錄
- bccsp:與密碼學相關的,加密,數字簽名,證書,將密碼學中的函式抽象成了介面,方便呼叫和擴充套件
- bddtests:行為驅動開發,從需求直接到開發
- common:公共庫、錯誤處理、日誌出項,賬本儲存,相關工具
- core:是fabric的核心庫,子目錄是各個模組的目錄 / comm:網路通訊相關
- devenv:官方提供的開發環境,使用的是Vagrant
- docs:文件
- events:事件監聽機制
- examples:例子程式
- gossip:通訊協議,組織內部的通訊,區塊同步
- gotools:用於編譯
- images:docker映象相關
- msp:成員服務管理,member serivce provider,讀取證書做簽名
- orderer:排序節點
- peer:peer節點
- proposals:用於擴充套件,新功能的提案
- protos:資料結構的定義
四.共識機制原始碼
orderer節點的原始碼
- 首先看orderer目錄下的main.go ,main.go裡有一個NewServer可以進入server.go
main.go中func main() 主要起到判斷作用,如果接收到的是start命令,就載入和初始化各種配置,如果接收到的是version指令,就列印版本號;之後在下面定義了上面的各種方法。
func main() { kingpin.Version("0.0.1") //判斷接受到的引數Args switch kingpin.MustParse(app.Parse(os.Args[1:])) { // 如果接受到"start" command case start.FullCommand(): logger.Infof("Starting %s", metadata.GetVersionInfo()) //載入配置 conf := config.Load() //初始化日誌級別 //生產環境下日誌級別調高 initializeLoggingLevel(conf) //初始化profile,go內建的觀察程式執行的工具 //可以通過http呼叫 initializeProfilingService(conf) //初始化grpc服務端 grpcServer := initializeGrpcServer(conf) //載入msp簽名證書 initializeLocalMsp(conf) //msp證書給簽名者例項化 signer := localmsp.NewSigner() //初始化鏈的管理者(也就是主節點) manager := initializeMultiChainManager(conf, signer) //例項化服務 server := NewServer(manager, signer) //繫結服務 ab.RegisterAtomicBroadcastServer(grpcServer.Server(), server) logger.Info("Beginning to serve requests") //啟動服務 grpcServer.Start() // 如果接受到"version" command case version.FullCommand(): //列印版本號 fmt.Println(metadata.GetVersionInfo()) } }
我們來看下初始化管理者的程式碼
func initializeMultiChainManager(conf *config.TopLevel, signer crypto.LocalSigner) multichain.Manager {
//建立賬本工廠,產生臨時區塊
lf, _ := createLedgerFactory(conf)
//判斷鏈是否存在
if len(lf.ChainIDs()) == 0 {
//鏈不存在
//啟動引導鏈
initializeBootstrapChannel(conf, lf)
} else {
logger.Info("Not bootstrapping because of existing chains")
}
//例項化共識機制
//有solo和kafka兩種模式
consenters := make(map[string]multichain.Consenter)
consenters["solo"] = solo.New()
consenters["kafka"] = kafka.New(conf.Kafka.TLS, conf.Kafka.Retry, conf.Kafka.Version)
return multichain.NewManagerImpl(lf, consenters, signer)
}
- orderer的配置檔案在orderer.yaml中,監聽的地址是127.0.0.1;監聽的埠是7050;BCCSP是密碼學,賬本儲存最終還是儲存到硬碟中。
- 接下來咱們看下例項化服務server.go,這裡面定義了交易收集和廣播區塊。
type server struct {
//交易收集
bh broadcast.Handler
//廣播區塊
dh deliver.Handler
}
我們具體看下交易收集:broadcast.go
func (bh *handlerImpl) Handle(srv ab.AtomicBroadcast_BroadcastServer) error {
logger.Debugf("Starting new broadcast loop")
for {
//接收交易
msg, err := srv.Recv()
if err == io.EOF {
logger.Debugf("Received EOF, hangup")
return nil
}
if err != nil {
logger.Warningf("Error reading from stream: %s", err)
return err
}
payload, err := utils.UnmarshalPayload(msg.Payload)
if err != nil {
logger.Warningf("Received malformed message, dropping connection: %s", err)
return srv.Send(&ab.BroadcastResponse{Status: cb.Status_BAD_REQUEST})
}
//驗證訊息體的內容,有錯誤則返回Status_BAD_REQUEST
if payload.Header == nil {
logger.Warningf("Received malformed message, with missing header, dropping connection")
return srv.Send(&ab.BroadcastResponse{Status: cb.Status_BAD_REQUEST})
}
chdr, err := utils.UnmarshalChannelHeader(payload.Header.ChannelHeader)
if err != nil {
logger.Warningf("Received malformed message (bad channel header), dropping connection: %s", err)
return srv.Send(&ab.BroadcastResponse{Status: cb.Status_BAD_REQUEST})
}
if chdr.Type == int32(cb.HeaderType_CONFIG_UPDATE) {
logger.Debugf("Preprocessing CONFIG_UPDATE")
msg, err = bh.sm.Process(msg)
if err != nil {
logger.Warningf("Rejecting CONFIG_UPDATE because: %s", err)
return srv.Send(&ab.BroadcastResponse{Status: cb.Status_BAD_REQUEST})
}
err = proto.Unmarshal(msg.Payload, payload)
if err != nil || payload.Header == nil {
logger.Criticalf("Generated bad transaction after CONFIG_UPDATE processing")
return srv.Send(&ab.BroadcastResponse{Status: cb.Status_INTERNAL_SERVER_ERROR})
}
chdr, err = utils.UnmarshalChannelHeader(payload.Header.ChannelHeader)
if err != nil {
logger.Criticalf("Generated bad transaction after CONFIG_UPDATE processing (bad channel header): %s", err)
return srv.Send(&ab.BroadcastResponse{Status: cb.Status_INTERNAL_SERVER_ERROR})
}
if chdr.ChannelId == "" {
logger.Criticalf("Generated bad transaction after CONFIG_UPDATE processing (empty channel ID)")
return srv.Send(&ab.BroadcastResponse{Status: cb.Status_INTERNAL_SERVER_ERROR})
}
}
//獲取support物件
support, ok := bh.sm.GetChain(chdr.ChannelId)
if !ok {
logger.Warningf("Rejecting broadcast because channel %s was not found", chdr.ChannelId)
return srv.Send(&ab.BroadcastResponse{Status: cb.Status_NOT_FOUND})
}
logger.Debugf("[channel: %s] Broadcast is filtering message of type %s", chdr.ChannelId, cb.HeaderType_name[chdr.Type])
//將訊息傳到support的過濾器中過濾
//是區塊的第一次過濾,第二次是在區塊切割時過濾的
_, filterErr := support.Filters().Apply(msg)
if filterErr != nil {
logger.Warningf("[channel: %s] Rejecting broadcast message because of filter error: %s", chdr.ChannelId, filterErr)
return srv.Send(&ab.BroadcastResponse{Status: cb.Status_BAD_REQUEST})
}
//訊息入列,然後被solo或kafka處理
if !support.Enqueue(msg) {
return srv.Send(&ab.BroadcastResponse{Status: cb.Status_SERVICE_UNAVAILABLE})
}
if logger.IsEnabledFor(logging.DEBUG) {
logger.Debugf("[channel: %s] Broadcast has successfully enqueued message of type %s", chdr.ChannelId, cb.HeaderType_name[chdr.Type])
}
//返回正確的200碼
err = srv.Send(&ab.BroadcastResponse{Status: cb.Status_SUCCESS})
if err != nil {
logger.Warningf("[channel: %s] Error sending to stream: %s", chdr.ChannelId, err)
return err
}
}
}
我們具體看下廣播區塊:deliver.go
func (ds *deliverServer) Handle(srv ab.AtomicBroadcast_DeliverServer) error {
logger.Debugf("Starting new deliver loop")
for {
logger.Debugf("Attempting to read seek info message")
//接收請求
envelope, err := srv.Recv()
if err == io.EOF {
logger.Debugf("Received EOF, hangup")
return nil
}
if err != nil {
logger.Warningf("Error reading from stream: %s", err)
return err
}
//做校驗
payload, err := utils.UnmarshalPayload(envelope.Payload)
if err != nil {
logger.Warningf("Received an envelope with no payload: %s", err)
return sendStatusReply(srv, cb.Status_BAD_REQUEST)
}
if payload.Header == nil {
logger.Warningf("Malformed envelope received with bad header")
return sendStatusReply(srv, cb.Status_BAD_REQUEST)
}
chdr, err := utils.UnmarshalChannelHeader(payload.Header.ChannelHeader)
if err != nil {
logger.Warningf("Failed to unmarshal channel header: %s", err)
return sendStatusReply(srv, cb.Status_BAD_REQUEST)
}
//獲取chain物件
chain, ok := ds.sm.GetChain(chdr.ChannelId)
if !ok {
// Note, we log this at DEBUG because SDKs will poll waiting for channels to be created
// So we would expect our log to be somewhat flooded with these
logger.Debugf("Rejecting deliver because channel %s not found", chdr.ChannelId)
return sendStatusReply(srv, cb.Status_NOT_FOUND)
}
//監聽是否有錯誤發生
//有錯誤,返回503
erroredChan := chain.Errored()
select {
case <-erroredChan:
logger.Warningf("[channel: %s] Rejecting deliver request because of consenter error", chdr.ChannelId)
return sendStatusReply(srv, cb.Status_SERVICE_UNAVAILABLE)
default:
}
lastConfigSequence := chain.Sequence()
//對鏈配置資訊校驗
sf := sigfilter.New(policies.ChannelReaders, chain.PolicyManager())
result, _ := sf.Apply(envelope)
if result != filter.Forward {
logger.Warningf("[channel: %s] Received unauthorized deliver request", chdr.ChannelId)
return sendStatusReply(srv, cb.Status_FORBIDDEN)
}
//解析請求訊息內容
seekInfo := &ab.SeekInfo{}
if err = proto.Unmarshal(payload.Data, seekInfo); err != nil {
logger.Warningf("[channel: %s] Received a signed deliver request with malformed seekInfo payload: %s", chdr.ChannelId, err)
return sendStatusReply(srv, cb.Status_BAD_REQUEST)
}
- 在server.go中會遇到chain呼叫Manager Manage.go
type Manager interface {
//獲取鏈物件
GetChain(chainID string) (ChainSupport, bool)
//獲取系統鏈,用於引導其他鏈的生成
SystemChannelID() string
//生成或更新鏈的配置
NewChannelConfig(envConfigUpdate *cb.Envelope) (configtxapi.Manager, error)
}
//配置資源
type configResources struct {
configtxapi.Manager
}
//獲取orderer相關的配置
//點進Orderer可以看相關配置
func (cr *configResources) SharedConfig() config.Orderer {
oc, ok := cr.OrdererConfig()
if !ok {
logger.Panicf("[channel %s] has no orderer configuration", cr.ChainID())
}
return oc
}
//定義賬本資源
type ledgerResources struct {
//配置資源
*configResources
//賬本的讀寫物件
//對賬本操作的入口
ledger ledger.ReadWriter
}
//manager的實現類
type multiLedger struct {
//鏈
chains map[string]*chainSupport
//共識機制
consenters map[string]Consenter
//賬本讀寫工廠
ledgerFactory ledger.Factory
//簽名物件
signer crypto.LocalSigner
//系統鏈的標識
systemChannelID string
//定義系統鏈
systemChannel *chainSupport
}
//獲取某條鏈更新的配置交易
func getConfigTx(reader ledger.Reader) *cb.Envelope {
//獲取鏈上最新的一個區塊
lastBlock := ledger.GetBlock(reader, reader.Height()-1)
//根據最新的區塊資訊,可以找到最新的配置交易的區塊
index, err := utils.GetLastConfigIndexFromBlock(lastBlock)
if err != nil {
logger.Panicf("Chain did not have appropriately encoded last config in its latest block: %s", err)
}
//讀取配置區塊
configBlock := ledger.GetBlock(reader, index)
if configBlock == nil {
logger.Panicf("Config block does not exist")
}
//讀取最新的配置交易
return utils.ExtractEnvelopeOrPanic(configBlock, 0)
}
//manager的例項化
func NewManagerImpl(ledgerFactory ledger.Factory, consenters map[string]Consenter, signer crypto.LocalSigner) Manager {
//接收傳來的引數
//直接賦值,上面定義的
ml := &multiLedger{
chains: make(map[string]*chainSupport),
ledgerFactory: ledgerFactory,
consenters: consenters,
signer: signer,
}
//讀取本地儲存的鏈的ID
existingChains := ledgerFactory.ChainIDs()
//迴圈
for _, chainID := range existingChains {
//根據賬本工廠例項化賬本讀的物件
//rl:read ledger
rl, err := ledgerFactory.GetOrCreate(chainID)
if err != nil {
logger.Panicf("Ledger factory reported chainID %s but could not retrieve it: %s", chainID, err)
}
//獲取最新的配置交易
configTx := getConfigTx(rl)
if configTx == nil {
logger.Panic("Programming error, configTx should never be nil here")
}
//將配置交易和ledger物件繫結
ledgerResources := ml.newLedgerResources(configTx)
chainID := ledgerResources.ChainID()
//讀取鏈是否有聯盟配置
//聯盟配置:是否有建立其他鏈的許可權
//一般只有系統鏈有聯盟配置
if _, ok := ledgerResources.ConsortiumsConfig(); ok {
//有聯盟配置
if ml.systemChannelID != "" {
//已經存在系統鏈,報錯
logger.Panicf("There appear to be two system chains %s and %s", ml.systemChannelID, chainID)
}
//例項化ChainSupport,依次賦值
chain := newChainSupport(createSystemChainFilters(ml, ledgerResources),
ledgerResources,
consenters,
signer)
logger.Infof("Starting with system channel %s and orderer type %s", chainID, chain.SharedConfig().ConsensusType())
ml.chains[chainID] = chain
ml.systemChannelID = chainID
ml.systemChannel = chain
// We delay starting this chain, as it might try to copy and replace the chains map via newChain before the map is fully built
//延遲啟動
//其他鏈完成後,啟動系統鏈
defer chain.start()
} else {
logger.Debugf("Starting chain: %s", chainID)
chain := newChainSupport(createStandardFilters(ledgerResources),
ledgerResources,
consenters,
signer)
//建立標準鏈
ml.chains[chainID] = chain
//啟動
chain.start()
}
}
//系統鏈不存在,則報錯
if ml.systemChannelID == "" {
logger.Panicf("No system chain found. If bootstrapping, does your system channel contain a consortiums group definition?")
}
//返回ml
return ml
}
//返回系統鏈id
func (ml *multiLedger) SystemChannelID() string {
return ml.systemChannelID
}
// GetChain retrieves the chain support for a chain (and whether it exists)
//得到鏈
func (ml *multiLedger) GetChain(chainID string) (ChainSupport, bool) {
cs, ok := ml.chains[chainID]
return cs, ok
}
//例項化一個賬本資源物件
func (ml *multiLedger) newLedgerResources(configTx *cb.Envelope) *ledgerResources {
//初始化配置交易
initializer := configtx.NewInitializer()
//生成配置manager
configManager, err := configtx.NewManagerImpl(configTx, initializer, nil)
if err != nil {
logger.Panicf("Error creating configtx manager and handlers: %s", err)
}
//得到chainID
chainID := configManager.ChainID()
//根據chainID,例項化賬本物件
ledger, err := ml.ledgerFactory.GetOrCreate(chainID)
if err != nil {
logger.Panicf("Error getting ledger for %s", chainID)
}
//最終返回賦值後的賬本資源物件
return &ledgerResources{
configResources: &configResources{Manager: configManager},
ledger: ledger,
}
}
//生成一條新鏈
func (ml *multiLedger) newChain(configtx *cb.Envelope) {
//建立賬本資源物件
ledgerResources := ml.newLedgerResources(configtx)
//組裝區塊,通過Append加到鏈上
ledgerResources.ledger.Append(ledger.CreateNextBlock(ledgerResources.ledger, []*cb.Envelope{configtx}))
// Copy the map to allow concurrent reads from broadcast/deliver while the new chainSupport is
//得到新的鏈,可以加鎖
newChains := make(map[string]*chainSupport)
for key, value := range ml.chains {
newChains[key] = value
}
cs := newChainSupport(createStandardFilters(ledgerResources), ledgerResources, ml.consenters, ml.signer)
chainID := ledgerResources.ChainID()
logger.Infof("Created and starting new chain %s", chainID)
newChains[string(chainID)] = cs
cs.start()
ml.chains = newChains
}
func (ml *multiLedger) channelsCount() int {
return len(ml.chains)
}
//生成新的鏈的配置
func (ml *multiLedger) NewChannelConfig(envConfigUpdate *cb.Envelope) (configtxapi.Manager, error) {
//下面是生成新鏈前,做各種校驗
configUpdatePayload, err := utils.UnmarshalPayload(envConfigUpdate.Payload)
if err != nil {
return nil, fmt.Errorf("Failing initial channel config creation because of payload unmarshaling error: %s", err)
}
- 接下來看chainsupport.go
//定義共識機制的介面
type ConsenterSupport interface {
//本地簽名
crypto.LocalSigner
//區塊切割物件
BlockCutter() blockcutter.Receiver
//配置
SharedConfig() config.Orderer
//切割好的交易打包成區塊
CreateNextBlock(messages []*cb.Envelope) *cb.Block
//將區塊寫入
WriteBlock(block *cb.Block, committers []filter.Committer, encodedMetadataValue []byte) *cb.Block
//獲取鏈的ID
ChainID() string
//獲取鏈當前的區塊高度
Height() uint64 // Returns the number of blocks on the chain this specific consenter instance is associated with
}
type ChainSupport interface {
//背書策略
PolicyManager() policies.Manager
//讀取賬本的介面
Reader() ledger.Reader
//處理賬本的錯誤
Errored() <-chan struct{}
//處理交易輸入的介面
broadcast.Support
//定義共識機制的介面
ConsenterSupport
//序列
//每次對鏈進行修改,Sequence是加1的
Sequence() uint64
//將一個交易轉為配置交易
//Envelope:可以理解為交易
ProposeConfigUpdate(env *cb.Envelope) (*cb.ConfigEnvelope, error)
}
type chainSupport struct {
//鏈的資源資訊,鏈的配置和賬本讀寫物件
*ledgerResources
//鏈
chain Chain
//區塊切割
cutter blockcutter.Receiver
//過濾器
//orderer過濾一些交易為空的資料
filters *filter.RuleSet
//簽名
signer crypto.LocalSigner
//最新配置資訊所在的區塊高度
lastConfig uint64
//最新配置資訊所在的序列化
lastConfigSeq uint64
}
func newChainSupport(
filters *filter.RuleSet,
ledgerResources *ledgerResources,
consenters map[string]Consenter,
signer crypto.LocalSigner,
) *chainSupport {
//建立區塊切割物件
cutter := blockcutter.NewReceiverImpl(ledgerResources.SharedConfig(), filters)
//根據配置查詢orderer使用的共識機制
consenterType := ledgerResources.SharedConfig().ConsensusType()
//得到共識機制
consenter, ok := consenters[consenterType]
if !ok {
logger.Fatalf("Error retrieving consenter of type: %s", consenterType)
}
//賦值
cs := &chainSupport{
ledgerResources: ledgerResources,
cutter: cutter,
filters: filters,
signer: signer,
}
//序列號
cs.lastConfigSeq = cs.Sequence()
var err error
//最新區塊
lastBlock := ledger.GetBlock(cs.Reader(), cs.Reader().Height()-1)
if lastBlock.Header.Number != 0 {
//獲取最新配置資訊所在的區塊高度
cs.lastConfig, err = utils.GetLastConfigIndexFromBlock(lastBlock)
if err != nil {
logger.Fatalf("[channel: %s] Error extracting last config block from block metadata: %s", cs.ChainID(), err)
}
}
//獲取區塊元資料資訊
metadata, err := utils.GetMetadataFromBlock(lastBlock, cb.BlockMetadataIndex_ORDERER)
if err != nil {
logger.Fatalf("[channel: %s] Error extracting orderer metadata: %s", cs.ChainID(), err)
}
logger.Debugf("[channel: %s] Retrieved metadata for tip of chain (blockNumber=%d, lastConfig=%d, lastConfigSeq=%d): %+v", cs.ChainID(), lastBlock.Header.Number, cs.lastConfig, cs.lastConfigSeq, metadata)
//用共識機制操作Chain
cs.chain, err = consenter.HandleChain(cs, metadata)
if err != nil {
logger.Fatalf("[channel: %s] Error creating consenter: %s", cs.ChainID(), err)
}
return cs
}
//例項化過濾器
func createStandardFilters(ledgerResources *ledgerResources) *filter.RuleSet {
return filter.NewRuleSet([]filter.Rule{
filter.EmptyRejectRule,
sizefilter.MaxBytesRule(ledgerResources.SharedConfig()),
sigfilter.New(policies.ChannelWriters, ledgerResources.PolicyManager()),
configtxfilter.NewFilter(ledgerResources),
filter.AcceptRule,
})
}
func (cs *chainSupport) WriteBlock(block *cb.Block, committers []filter.Committer, encodedMetadataValue []byte) *cb.Block {
//遍歷所有提交的交易
for _, committer := range committers {
committer.Commit()
}
// Set the orderer-related metadata field
//判斷元資料
if encodedMetadataValue != nil {
block.Metadata.Metadata[cb.BlockMetadataIndex_ORDERER] = utils.MarshalOrPanic(&cb.Metadata{Value: encodedMetadataValue})
}
//進行區塊簽名
cs.addBlockSignature(block)
//配置簽名
cs.addLastConfigSignature(block)
//將區塊寫入賬本中
err := cs.ledger.Append(block)
if err != nil {
logger.Panicf("[channel: %s] Could not append block: %s", cs.ChainID(), err)
}
logger.Debugf("[channel: %s] Wrote block %d", cs.ChainID(), block.GetHeader().Number)
return block
}
- 點Receiver進去,看區塊切割,在blockcutter.go中:
func (r *receiver) Ordered(msg *cb.Envelope) (messageBatches [][]*cb.Envelope, committerBatches [][]filter.Committer, validTx bool, pending bool) {
//將交易資訊再次過濾
//第一次過濾是orderer接收到交易請求時
committer, err := r.filters.Apply(msg)
if err != nil {
logger.Debugf("Rejecting message: %s", err)
return // We don't bother to determine `pending` here as it's not processed in error case
}
// message is valid
//將交易標記為有效
validTx = true
//計算交易體的大小
messageSizeBytes := messageSizeBytes(msg)
//判斷是否交易隔離,配置交易進行隔離
//交易體的大小,如果比最大交易體大小大,認為交易內容過大,進行單獨切塊
if committer.Isolated() || messageSizeBytes > r.sharedConfigManager.BatchSize().PreferredMaxBytes {
if committer.Isolated() {
logger.Debugf("Found message which requested to be isolated, cutting into its own batch")
} else {
logger.Debugf("The current message, with %v bytes, is larger than the preferred batch size of %v bytes and will be isolated.", messageSizeBytes, r.sharedConfigManager.BatchSize().PreferredMaxBytes)
}
//若存在每被七個的交易
//將未被切割的交易存放到區塊
if len(r.pendingBatch) > 0 {
messageBatch, committerBatch := r.Cut()
messageBatches = append(messageBatches, messageBatch)
committerBatches = append(committerBatches, committerBatch)
}
//單獨切割當前交易
messageBatches = append(messageBatches, []*cb.Envelope{msg})
committerBatches = append(committerBatches, []filter.Committer{committer})
return
}
//不隔離的交易,這裡處理
//判斷加上當前交易後,區塊大小是否超出預先設定的大小
messageWillOverflowBatchSizeBytes := r.pendingBatchSizeBytes+messageSizeBytes > r.sharedConfigManager.BatchSize().PreferredMaxBytes
//如果超出了預定大小,進入if
if messageWillOverflowBatchSizeBytes {
logger.Debugf("The current message, with %v bytes, will overflow the pending batch of %v bytes.", messageSizeBytes, r.pendingBatchSizeBytes)
logger.Debugf("Pending batch would overflow if current message is added, cutting batch now.")
//進行切割
messageBatch, committerBatch := r.Cut()
messageBatches = append(messageBatches, messageBatch)
committerBatches = append(committerBatches, committerBatch)
}
logger.Debugf("Enqueuing message into batch")
r.pendingBatch = append(r.pendingBatch, msg)
r.pendingBatchSizeBytes += messageSizeBytes
r.pendingCommitters = append(r.pendingCommitters, committer)
pending = true
//若區塊佇列超出閾值範圍,進行切割
if uint32(len(r.pendingBatch)) >= r.sharedConfigManager.BatchSize().MaxMessageCount {
logger.Debugf("Batch size met, cutting batch")
messageBatch, committerBatch := r.Cut()
messageBatches = append(messageBatches, messageBatch)
committerBatches = append(committerBatches, committerBatch)
pending = false
}
return
}
//完成切割這個動作
func (r *receiver) Cut() ([]*cb.Envelope, []filter.Committer) {
batch := r.pendingBatch
r.pendingBatch = nil
committers := r.pendingCommitters
r.pendingCommitters = nil
r.pendingBatchSizeBytes = 0
return batch, committers
}
func messageSizeBytes(message *cb.Envelope) uint32 {
//將訊息體和簽名加起來求長度
return uint32(len(message.Payload) + len(message.Signature))
}
- 在consensus.go中是共識:
func (ch *chain) main() {
//定義定時器
var timer <-chan time.Time
//迴圈
for {
select {
//不停的從交易的channel中獲取交易
//將獲取到的交易,傳送給區塊切割物件
//返回需要切割的區塊
case msg := <-ch.sendChan:
//區塊切割
batches, committers, ok, _ := ch.support.BlockCutter().Ordered(msg)
//判斷交易是否有效
//判斷定時器是否未空
if ok && len(batches) == 0 && timer == nil {
//例項化定時器
timer = time.After(ch.support.SharedConfig().BatchTimeout())
continue
}
//建立區塊
//最終儲存到orderer節點的臨時賬本中
for i, batch := range batches {
block := ch.support.CreateNextBlock(batch)
ch.support.WriteBlock(block, committers[i], nil)
}
//判斷交易的有效性
if len(batches) > 0 {
//定時器重新計時
timer = nil
}
//定時器觸發
//馬上進行區塊切割
case <-timer:
//clear the timer
timer = nil
//區塊切割
batch, committers := ch.support.BlockCutter().Cut()
if len(batch) == 0 {
logger.Warningf("Batch timer expired with no pending requests, this might indicate a bug")
continue
}
logger.Debugf("Batch timer expired, creating block")
//建立區塊
block := ch.support.CreateNextBlock(batch)
//寫區塊
ch.support.WriteBlock(block, committers, nil)
case <-ch.exitChan:
logger.Debugf("Exiting")
//直接退出
return
}
}
}