07.Fabric原始碼解析——peer的ChaincodeSupport服務
Fabric原始碼解析7——peer的ChaincodeSupport服務
Fabirc原始碼解析6中講述了peer結點如何建立和註冊grpc服務,接下來的幾篇文章將對peer註冊的各個服務進行詳述。該篇講述ChaincodeSupport服務,ChaincodeSupport服務為每個peer提供了chaincode操作的支援。registerChaincodeSupport(peerServer.Server())一句,位於/fabric/peer/node/start.go檔案中的serve函式中,給peerServer註冊了ChaincodeSupport服務。
ChaincodeSupport的服務原型
ChaincodeSupport的服務原型和生成的go定義在/fabric/protos/peer/下的chaincode_shim.proto和chaincode_shim.pb.go中,核心的實現程式碼在/fabric/core/chaincode/chaincode_support.go中。主要的定義的是一個rpc Register(stream ChaincodeMessage) returns (stream ChaincodeMessage){}服務。該服務實現客戶端和伺服器端ChaincodeMessage型別流資料的交換。用於服務端流資料交換的grpc流服務介面象為/fabric/protos/peer/chaincode_shim.pb.go中的ChaincodeSupport_RegisterServer,在/fabric/core/container/ccintf/ccintf.go中有對應用於容器內部間的流介面ChaincodeStream。
ChaincodeSupport的服務是一個全域性單例,該單例物件定義在chaincode_support.go中,var theChaincodeSupport *ChaincodeSupport。ChaincodeSupport物件自身儲存一系列配置值,而接收和處理客戶端ChaincodeMessage型別訊息的任務其實是委託給了一個個Handler物件。
//生成的收發的資料型別
type ChaincodeMessage struct { Type ChaincodeMessage_Type Timestamp *google_protobuf1.Timestamp Payload []byte Txid string Proposal *SignedProposal ChaincodeEvent *ChaincodeEvent }
//proto中ChaincodeSupport服務原型
service ChaincodeSupport {
rpc Register(stream ChaincodeMessage) returns (stream ChaincodeMessage) {}
}
//生成的服務端流介面
type ChaincodeSupport_RegisterServer interface {
Send(*ChaincodeMessage) error
Recv() (*ChaincodeMessage, error)
grpc.ClientStream
}
FSM
FSM是finite state machine的縮寫,有限狀態機,是ChaincodeSupport服務使用到的一個第三方庫,在github.com/looplab/fsm可以下載。FSM將一個事物從狀態A向狀態B的轉化看作一個事件,並可以設定在進入/離開某個狀態時自動呼叫的時機函式。每個狀態事件、狀態、時機函式都用字串關鍵字表示。在此簡單介紹一下用法:
//建立一個狀態機
//三個引數:1.預設狀態 2.定義狀態事件 3.定義狀態轉變時呼叫的函式
fsm := fsm.NewFSM(
"green",
fsm.Events{
//狀態事件的名稱 該事件的起始狀態Src 該事件的結束狀態Dst
//即:狀態事件warn(警告事件)表示事物的狀態從狀態green到狀態yellow
{Name: "warn", Src: []string{"green"}, Dst: "yellow"},
{Name: "panic", Src: []string{"yellow"}, Dst: "red"},
{Name: "calm", Src: []string{"red"}, Dst: "yellow"},
},
//狀態事件呼叫函式,在此稱為 時機函式。關鍵字用'_'隔開,格式是:"呼叫時機_事件或狀態"
//before表示在該事件或狀態發生之前呼叫該函式,如"before_warn",表示在warn
//這個狀態事件發生前呼叫這個函式。"before_yellow"表示進入yellow狀態之前呼叫
//該函式。
//依此類推,after表示在...之後,enter表示在進入...之時,leave表示在離開...
//之時。
fsm.Callbacks{
//fsm內定義的狀態事件函式,關鍵字指定的是XXX_event和XXX_state
//表示任一的狀態或狀態事件
"before_event": func(e *fsm.Event) {
fmt.Println("before_event")
},
"leave_state": func(e *fsm.Event) {
fmt.Println("leave_state")
},
//根據自定義狀態或事件所定義的狀態事件函式
"before_yellow": func(e *fsm.Event) {
fmt.Println("before_yellow")
},
"before_warn": func(e *fsm.Event) {
fmt.Println("before_warn")
},
},
)
//列印當前狀態,輸出是預設狀態green
fmt.Println(fsm.Current())
//觸發warn狀態事件,狀態將會從green轉變到yellow
//同時觸發"before_warn"、“before_yellow”、“before_event”、"leave_state"函式
fsm.Event("warn")
//列印當前狀態,輸出狀態是yellow
fmt.Println(fsm.Current())
任何專案中,服務是以所能提供的操作為中心的,ChaincodeSupport服務的操作(即可被外部呼叫的函式)有Launch,Register,Execute,HandleChaincodeStream,Stop。
Register
追溯ChaincodeSupport物件掛載的Register函式,最終呼叫的是/fabric/core/chaincode/handler.go中的HandleChaincodeStream函式。在HandleChaincodeStream函式中:
handler := newChaincodeSupportHandler(chaincodeSupport, stream)
handler.processStream()
建立了一個Handler,然後呼叫Handler的processStream函式對客戶端傳送的流資料進行了處理。這兩個函式都在同文件中實現。newChaincodeSupportHandler函式所傳入的兩個引數值得注意,一個是chaincodeSupport,一個是stream。前者是Register服務所在的ChaincodeSupport物件自身,賦值給了Hanlder物件成員chaincodeSupport,為的是讓Handler物件處理接收資料時能夠使用ChaincodeSupport物件的服務;後者是Register服務的grpc流介面,賦值給了Handler物件成員ChatStream,為的是Handler能夠從客戶端接收到資料。後文還會提到這點。
Handler
newChaincodeSupportHandler建立並初始化了一個Handler,初始化的成員有:
- ChatStream - grpc流服務介面,是用Register函式傳進來的。
- chaincodeSupport - chaincodeSupport自身。
- nextState - 狀態通道。
- FSM - 狀態機,參看上文。
- policyChecker - 策略檢查器,將在對應主題文章中詳述。
processStream
processStream用recv標識、| errc | msgAvil | nextState | keepalivetimer |四個頻道、select三者相互配合,形成了對客戶端訊息的接收控制。然後呼叫HandleMessage、serialSend、serialSendAsync處理接收到的訊息。
- errc - 錯誤頻道
- msgAvil - ChaincodeMessage頻道
- nextState - 包含ChaincodeMessage的頻道
- keepalivetime - 心跳頻道
HandleMessage
HandleMessage處理ChaincodeMessage資料的方式完全是由Handler中的狀態機FSM驅動的。在newChaincodeSupportHandler有大段程式碼是初始化其狀態機的:
v.FSM = fsm.NewFSM(createdstate,fsm.Events{...},fsm.Callbacks{...})
狀態機FSM所註冊的狀態事件有:
///fabric/protos/peer/chaincode_shim.pb.go中定義
//REGISTER即pb.ChaincodeMessage_REGISTER.String()對應的字串值,下同
//REGISTER事件表示從狀態createdstate到狀態establishedstate,下略。
REGISTER Src: []string{createdstate}, Dst: establishedstate
READY
PUT_STATE
DEL_STATE
INVOKE_CHAI
COMPLETED
GET_STATE
GET_STATE_B
GET_QUERY_R
GET_HISTORY
QUERY_STATE
QUERY_STATE
ERROR
RESPONSE
INIT
TRANSACTION
RESPONSE
INIT
TRANSACTION
狀態機FSM所涉及的事件狀態有:
//在/fabric/core/chaincode/handler.go中以常量的形式定義
createdstate = "created"
establishedstate = "established"
readystate = "ready"
endstate = "end"
狀態機FSM狀態事件所呼叫的時機函式為:
//在REGISTER事件發生之前呼叫beforeRegisterEvent,下同。
"before_REGISTER" : beforeRegisterEvent
"before_COMPLETED" : beforeCompletedEvent
"after_GET_STATE" : afterGetState
"after_GET_STATE_BY_RANGE" : afterGetStateByRange
"after_GET_QUERY_RESULT" : afterGetQueryResult
"after_GET_HISTORY_FOR_KEY" : afterGetHistoryForKey
"after_QUERY_STATE_NEXT" : afterQueryStateNext
"after_QUERY_STATE_CLOSE" : afterQueryStateClose
"after_PUT_STATE" : enterBusyState
"after_DEL_STATE" : enterBusyState
"after_INVOKE_CHAINCODE" : enterBusyState
//表示在進入established狀態之時呼叫enterEstablishedState,下同。
"enter_established" : enterEstablishedState
"enter_ready" : enterReadyState
"enter_end" : enterEndState
在HandleMessage函式中,對傳入的資料msg簡單驗證後,eventErr := handler.FSM.Event(msg.Type.String(), msg)觸發了狀態機的狀態事件,進而觸發了對應的時機函式。以“REGISTER型別的ChaincodeMessage”為例。客戶端通過grpc傳送REGISTER型別的ChaincodeMessage資訊,服務端通過msgAvil頻道接收後傳入HandlerMessage函式,狀態機對應執行REGISTER狀態事件,從狀態createdstate向狀態establishedstate轉變,同時在轉變之前自動觸發beforeRegisterEvent時機函式完成註冊。當狀態進入establishedstate時,又接著觸發了“enter_established”所對應的enterEstablishedState時機函式去通知客戶端註冊已經正確完成。
在beforeRegisterEvent函式中,err = handler.chaincodeSupport.registerHandler(handler)完成了註冊,使用的是前文所提到的在建立Handler時傳入進來的ChaincodeSupport物件的registerHandler函式。所謂的註冊,也不過是將Handler物件賦值給ChaincodeSupport物件中的runningChaincodes中的chaincodeMap對映:chainID作key,以Handler物件為成員handler值的chaincodeRTEnv物件作value。
serialSend或serialSendAsync
都是使用Handler中grpc服務端流介面ChatStream成員傳送ChaincodeMessage訊息的函式,兩者都將應答ChaincodeMessage資訊傳送給客戶端,也都實現了將所傳送的ChaincodeMessage資訊序列化的目的。區別在於serialSend是阻塞傳送,而serialSendAsync是利用新啟goroutine進行非阻塞傳送,且這些非阻塞的goroutine中任何一個發生傳送訊息的錯誤,都會利用errc頻道將錯誤傳送給processStream函式。
小結
不同型別的ChaincodeMessage的訊息,能夠觸發狀態機不同的狀態事件,處理資料,完成Chaincode上的操作。有關其他型別事件以及具體的實現,在此不再贅述。強調一句,ChaincodeSupport服務是以狀態機驅動的為chaincode提供支援的一項服務。