1. 程式人生 > >Hyperledger Fabric Orderer節點啟動

Hyperledger Fabric Orderer節點啟動

register pin mage mode read pre hub alc star

Orderer 節點啟動通過 orderer 包下的 main() 方法實現,會進一步調用到 orderer/common/server 包中的 Main() 方法。

核心代碼如下所示。

// Main is the entry point of orderer process
func Main() {
	fullCmd := kingpin.MustParse(app.Parse(os.Args[1:]))

	// "version" command
	if fullCmd == version.FullCommand() {
		fmt.Println(metadata.GetVersionInfo())
		return
	}

	conf := config.Load()
	initializeLoggingLevel(conf)
	initializeLocalMsp(conf)

	Start(fullCmd, conf)
}

包括配置初始化過程和核心啟動過程兩個部分:

  • config.Load():從本地配置文件和環境變量中讀取配置信息,構建配置樹結構。
  • initializeLoggingLevel(conf):配置日誌級別。
  • initializeLocalMsp(conf):配置 MSP 結構。
  • Start():完成啟動後的核心工作。

整體過程

核心啟動過程都在 orderer/common/server包中的 Start() 方法,如下圖所示。

技術分享圖片

Start() 方法會初始化 gRPC 服務需要的結構,然後啟動服務。

核心代碼如下所示。

func Start(cmd string, conf *config.TopLevel) {
	logger.Debugf("Start()")
	signer := localmsp.NewSigner()
	manager := initializeMultichannelRegistrar(conf, signer)
	server := NewServer(manager, signer, &conf.Debug)

	switch cmd {
	case start.FullCommand(): // "start" command
		logger.Infof("Starting %s", metadata.GetVersionInfo())
		initializeProfilingService(conf)
		grpcServer := initializeGrpcServer(conf)
		ab.RegisterAtomicBroadcastServer(grpcServer.Server(), server)
		logger.Info("Beginning to serve requests")
		grpcServer.Start()
	case benchmark.FullCommand(): // "benchmark" command
		logger.Info("Starting orderer in benchmark mode")
		benchmarkServer := performance.GetBenchmarkServer()
		benchmarkServer.RegisterService(server)
		benchmarkServer.Start()
	}
}

包括兩大部分:

  • gRPC 服務結構初始化;
  • gRPC 服務啟動。

gRPC 服務結構初始化

包括創建新的 MSP 簽名結構,初始化 Registrar 結構來管理各個賬本結構,啟動共識過程,以及創建 gRPC 服務端結構。

核心步驟包括:

signer := localmsp.NewSigner() // 初始化簽名結構
manager := initializeMultichannelRegistrar(conf, signer, tlsCallback) // 初始化賬本管理器(Registrar)結構

其中,initializeMultichannelRegistrar(conf, signer)

方法最為關鍵,核心代碼如下:

func initializeMultichannelRegistrar(conf *config.TopLevel, signer crypto.LocalSigner, callbacks ...func(bundle *channelconfig.Bundle)) *multichannel.Registrar {
	// 創建操作賬本的工廠結構
	lf, _ := createLedgerFactory(conf)
	
	// 如果是首次啟動情況,默認先創建系統通道的本地賬本結構
	if len(lf.ChainIDs()) == 0 {
		logger.Debugf("There is no chain, hence we must be in bootstrapping")
		initializeBootstrapChannel(conf, lf)
	} else {
		logger.Info("Not bootstrapping because of existing chains")
	}
	//初始化共識插件,共識插件負責跟後臺的隊列打交道
	consenters := make(map[string]consensus.Consenter)
	consenters["solo"] = solo.New()
	consenters["kafka"] = kafka.New(conf.Kafka.TLS, conf.Kafka.Retry, conf.Kafka.Version, conf.Kafka.Verbose)

	// 創建各個賬本的管理器(Registrar)結構,並啟動共識過程
	return multichannel.NewRegistrar(lf, consenters, signer, callbacks...)
}

利用傳入的配置信息和簽名信息完成如下步驟:

  • 創建賬本操作的工廠結構;
  • 如果是新啟動情況,利用給定的系統初始區塊文件初始化系統通道的相關結構;
  • 完成共識插件(包括 solokafka 兩種)的初始化;
  • multichannel.NewRegistrar(lf, consenters, signer) 方法會掃描本地賬本數據(此時至少已存在系統通道),創建 Registrar 結構,並為每個賬本都啟動共識(如 Kafka 排序)過程。

說明:Registrar 結構(位於 orderer.common.multichannel 包)是 Orderer 組件中最核心的結構,管理了 Orderer 中所有的賬本、共識插件等數據結構。

創建 Registrar 結構並啟動共識過程

NewRegistrar(lf, consenters, signer) 方法位於 orderer.common.multichannel 包,負責初始化鏈支持、消息處理器等重要數據結構,並為各個賬本啟動共識過程。

核心代碼如下:

existingChains := ledgerFactory.ChainIDs()
for _, chainID := range existingChains { // 啟動本地所有的賬本結構的共識過程
	if _, ok := ledgerResources.ConsortiumsConfig(); ok { // 如果是系統賬本(默認在首次啟動時會自動創建)
		chain := newChainSupport(r, ledgerResources, consenters, signer)
		chain.Processor = msgprocessor.NewSystemChannel(chain, r.templator, msgprocessor.CreateSystemChannelFilters(r, chain))
		r.chains[chainID] = chain
		r.systemChannelID = chainID
		r.systemChannel = chain
		defer chain.start() // 啟動共識過程
	else // 如果是應用賬本
		chain := newChainSupport(r, ledgerResources, consenters, signer)
		r.chains[chainID] = chain
		chain.start()  // 啟動共識過程
	}

chain.start() 方法負責啟動共識過程。以 Kafka 共識插件為例,最終以協程方式調用到 orderer.consensus.kafka 包中的 startThread() 方法,將在後臺持續運行。

func (chain *chainImpl) Start() {
	go startThread(chain)
}

startThread() 方法將為指定的賬本結構配置共識服務,並將其啟動,核心代碼包括:

// 創建 Producer 結構
chain.producer, err = setupProducerForChannel(chain.consenter.retryOptions(), chain.haltChan, chain.SharedConfig().KafkaBrokers(), chain.consenter.brokerConfig(), chain.channel)
// 發送 CONNECT 消息給 Kafka,如果失敗,則退出
sendConnectMessage(chain.consenter.retryOptions(), chain.haltChan, chain.producer, chain.channel)

// 創建處理對應 Kafka topic 的 Consumer 結構
chain.parentConsumer, err = setupParentConsumerForChannel(chain.consenter.retryOptions(), chain.haltChan, chain.SharedConfig().KafkaBrokers(), chain.consenter.brokerConfig(), chain.channel)
// 配置從指定 partition 讀取消息的 PartitionConsumer 結構
chain.channelConsumer, err = setupChannelConsumerForChannel(chain.consenter.retryOptions(), chain.haltChan, chain.parentConsumer, chain.channel, chain.lastOffsetPersisted+1)

// 從該鏈對應的 Kafka 分區不斷讀取消息,並進行處理過程
chain.processMessagesToBlocks() 

主要包括如下步驟:

  • 創建到 Kafka 集群的 Producer 結構並發送 CONNECT 消息;
  • 為對應的 topic 創建 Consumer 結構,並配置從指定分區讀取消息的 PartitionConsumer 結構;
  • 對鏈對應的 Kafka 分區中消息的進行循環處理。這部分更詳細內容可以參考 Orderer 節點對排序後消息的處理過程。

gRPC 服務啟動

初始化 gRPC 服務結構,完成綁定並啟動監聽。

// 初始化 gRPC 服務端結構
server := NewServer(manager, signer, &conf.Debug)

// 創建 gRPC 服務連接
grpcServer := initializeGrpcServer(conf)

// 綁定 gRPC 服務並啟動
ab.RegisterAtomicBroadcastServer(grpcServer.Server(), server)
grpcServer.Start()

其中,NewServer(manager, signer, &conf.Debug) 方法(位於 orderer.common.server 包)最為核心,將 gRPC 相關的服務結構進行初始化,並綁定到 gRPC 請求上。分別響應 Deliver() 和 Broadcast() 兩個 gRPC 調用。

// NewServer creates an ab.AtomicBroadcastServer based on the broadcast target and ledger Reader
func NewServer(r *multichannel.Registrar, _ crypto.LocalSigner, debug *localconfig.Debug) ab.AtomicBroadcastServer {
	s := &server{
		dh:    deliver.NewHandlerImpl(deliverSupport{Registrar: r}),
		bh:    broadcast.NewHandlerImpl(broadcastSupport{Registrar: r}),
		debug: debug,
	}
	return s
}

來源:https://github.com/yeasy/hyperledger_code_fabric/blob/master/process/orderer_start.md

Hyperledger Fabric Orderer節點啟動