Fabric原始碼分析-共識模組
正好這些天要有一個需求要幫客戶魔改Fabric-v0.6,把一些hyperchain的高階特性移植過去,藉此機會把之前看過的原始碼在梳理一下。
下面就是對Fabric共識模組的原始碼分析和梳理,程式碼都是以Fabric-v0.6-preview為例,在1.0及後續版本中都移除了PBFT部分,用了更好的SBFT,目前這一部分還在開發中。
目錄結構
可以看到共識模組目錄如下。
consensus
├── controller
├── executor
├── helper
│ └── persist
├── noops
├── pbft
└── util
└── events
目錄含義如下
controller
noops
。executor
封裝了訊息佇列中對交易的處理。helper
對外提供介面呼叫和資料持久化介面。noops
提供瞭如何編寫Fabric共識演算法的Demo。pbft
PBFT演算法的具體實現。util
實現了一個peer節點到共識演算法的一個訊息通道,和一個訊息佇列。
流程概覽
Fabric網路通過一個EventLoop
和共識演算法進行互動,所有的操作都通過對事件迴圈中的事件監聽進行推進。
整體流程如下圖所示。
Consensus模組介面
fabric/consensus/consensus.go
對外提供共識模組的方法呼叫。
其中最核心也是每個演算法必須實現的介面是Consenter
type ExecutionConsumer interface {
Executed(tag interface{})
Committed(tag interface{}, target *pb.BlockchainInfo)
RolledBack(tag interface{})
StateUpdated(tag interface{}, target *pb.BlockchainInfo)
}
type Consenter interface {
RecvMsg(msg *pb.Message, senderHandle *pb.PeerID) error
ExecutionConsumer
}
介面的具體實現在fabric/consensus/pbft/external.go
。
因為對交易的操作都是非同步的,所以必須手動實現Executed
,Committed
,RolledBack
,StateUpdated
方法來監聽對應動作的完成。
RecvMsg
方法用來從不用的peer
節點接收訊息。
初始化共識模組
共識演算法引擎在peer
啟動的時候初始化,初始化的具體函式如下所示。
// consensus/helper/engine.go
func GetEngine(coord peer.MessageHandlerCoordinator) (peer.Engine, error) {
var err error
engineOnce.Do(func() {
engine = new(EngineImpl)
engine.helper = NewHelper(coord)
engine.consenter = controller.NewConsenter(engine.helper)
engine.helper.setConsenter(engine.consenter)
engine.peerEndpoint, err = coord.GetPeerEndpoint()
engine.consensusFan = util.NewMessageFan()
go func() {
logger.Debug("Starting up message thread for consenter")
for msg := range engine.consensusFan.GetOutChannel() {
engine.consenter.RecvMsg(msg.Msg, msg.Sender)
}
}()
})
return engine, err
}
GetEngine
的作用是進行共識模組的初始化,同時啟動一個goroutine
等待訊息進入。
具體的engine.consenter
是在consensus/controller/controller.go
裡選擇。
// consensus/controller/controller.go
func NewConsenter(stack consensus.Stack) consensus.Consenter {
plugin := strings.ToLower(viper.GetString("peer.validator.consensus.plugin"))
if plugin == "pbft" {
logger.Infof("Creating consensus plugin %s", plugin)
return pbft.GetPlugin(stack)
}
logger.Info("Creating default consensus plugin (noops)")
return noops.GetNoops(stack)
}
預設選擇的是noops
,如果需要新增自己編寫的共識模組需要在這裡自行新增判斷。
noops
只是演示如何編寫Fabric共識模組,不要用在生產環境。
如果選擇了PBFT
則會呼叫consensus/pbft/pbft.go
進行初始化。
使用PBFT
的batch
模式啟動時會呼叫newObcBatch
進行PBFT
演算法初始化。
PBFT只有batch
一種模式。
// consensus/pbft/batch.go
func newObcBatch(id uint64, config *viper.Viper, stack consensus.Stack) *obcBatch {
var err error
...
op.manager = events.NewManagerImpl()
op.manager.SetReceiver(op)
etf := events.NewTimerFactoryImpl(op.manager)
op.pbft = newPbftCore(id, config, op, etf)
op.manager.Start()
blockchainInfoBlob := stack.GetBlockchainInfoBlob()
op.externalEventReceiver.manager = op.manager
...
return op
}
newObcBatch
主要做了這幾項工作
- 初始化了
eventLoop
的訊息佇列。 - 設定了訊息的接收者,用來處理對應的訊息。
- 建立監聽訊息超時的定時器。
- 初始化
pbft
演算法。 - 啟動訊息佇列,不斷監聽事件的到來並且分發給接收者處理。
訊息處理
Fabric的共識訊息是通過eventLoop
注射給對應處理函式的。
// consensus/util/events/events.go
func SendEvent(receiver Receiver, event Event) {
next := event
for {
next = receiver.ProcessEvent(next)
if next == nil {
break
}
}
}
func (em *managerImpl) Inject(event Event) {
if em.receiver != nil {
SendEvent(em.receiver, event)
}
}
func (em *managerImpl) eventLoop() {
for {
select {
case next := <-em.events:
em.Inject(next)
case <-em.exit:
logger.Debug("eventLoop told to exit")
return
}
}
}
eventLoop
函式不斷的從em.events
裡取出事件,通過Inject
注射給對應的接收者,注意,通過SendEvent
注射給接收者的ProcessEvent
方法。
SendEvent
函式實現非常有意思,如果receiver.ProcessEvent
的返回不為nil
則不斷的呼叫receiver.ProcessEvent
直到找到對應的訊息處理函式,在ProcessEvent
函式中,其餘case
均為事件處理函式,唯獨pbftMessage
依賴SendEvent
傳送訊息給其餘函式處理。
// consensus/pbft/pbft-core.go
func (instance *pbftCore) ProcessEvent(e events.Event) events.Event {
...
case *pbftMessage:
return pbftMessageEvent(*et)
case pbftMessageEvent:
msg := et
logger.Debugf("Replica %d received incoming message from %v", instance.id, msg.sender)
next, err := instance.recvMsg(msg.msg, msg.sender)
if err != nil {
break
}
return next
case *RequestBatch:
err = instance.recvRequestBatch(et)
case *PrePrepare:
err = instance.recvPrePrepare(et)
...
}
可以看到*pbftMessage
和pbftMessageEvent
這兩個case
通過recvMsg
的返回值又把訊息分發給其餘case
,非常巧妙。
PBFT
演算法的不同階段都會按著上面的流程對映到不同的處理函式往前推進,本質上是一個狀態機。
至此Fabric的Consensus
模組主要流程已經梳理清楚,熟悉了這個流程以後再結合PBFT
演算法的過程就可以很容易在此基礎上新增新的功能了。
https://zhuanlan.zhihu.com/p/35255567