06.Fabric原始碼解析–GRPC服務
Fabric原始碼解析6–GRPC服務
GRPC簡介
GRPC是由自谷歌開發的一項多語言開源的RPC技術,在fabric用於實現客戶端與伺服器端的遠端呼叫。比如chaincode,客戶定義了一項rpc服務並相應生成了客戶端程式碼和服務端程式碼,在此基礎上進行業務邏輯上的開發後,分別執行服務端程式碼和客戶端程式碼,實現客戶端呼叫伺服器端服務的目的。由於gprc相對來說還是很複雜的,所有還請自行學習。在gprc官網非常詳細的資料和例子。
極其粗線條的寫一下grpc的用法,為後文伏筆:
-
XXX.proto檔案中定義一個rpc服務
service Events {
rpc Chat(stream SignedEvent) returns (stream Event) {}
} -
命令列使用protoc生成對應的XXX.pb.go原始碼
在XXX.pb.go中,Client API for Events service處為供客戶端使用的介面定義、介面例項、介面例項的初始化函式。Server API for Events service處為供服務端使用的介面定義,註冊函式。
如果其中某一端或同時兩端為流式RPC,在有流的一端,會專門為其流生成介面定義、介面例項。可以直接使用生成的例項,也可以自己實現介面,自定義例項。介面定義的主要方法就是Send和Recv。
protoc --go_out=plugins=grpc:. XXX.proto
編寫客戶端程式碼
//注意,node start,而其啟動的基本都是後臺服務端的服務,
//因此本文中不涉及客戶端的程式碼。
//填充grpc網路連結連線選項 var opts []grpc.DialOption opts = append(opts, grpc.WithInsecure()) //建立連線伺服器端的grpc連線物件 conn, err := grpc.Dial("0.0.0.0:7051", opts...) defer conn.Close() //使用連線物件做引數,利用XXX.pb.go中的初始化函式建立grpc客戶端物件 client := NewEventsClient(conn) //呼叫服務 client.Chat(...)
編寫服務端程式碼
//定義一個監聽物件,即伺服器監聽的地址
lis,err := net.Listen("tcp",":7051")
//建立grpc伺服器選項並填充
var serverOpts []grpc.ServerOption
//建立標準的grpc伺服器物件
server = grpc.NewServer(serverOpts...)
//建立服務端物件,根據XXX.pb.go中生成的介面定義,自己實現服務端介面
type eventSever{...}
func (e *eventSever)Chat(...){...}
es := new(eventServer)
//使用XXX.pb.go中的註冊函式註冊服務,註冊到grpc伺服器物件
RegisterEventsServer(server,es)
//根據監聽物件啟動grpc伺服器物件
server.Serve(lis)
Fabric中的grpc服務介面和例項
在/fabric/core/comm/server.go中,定義了安全配置項,GPRCServer的介面、實現和初始化函式。預設情況下fabric中是不使用tls的。
TLS安全配置項
type SecureServerConfig struct {
//Whether or not to use TLS for communication
UseTLS bool
//PEM-encoded X509 public key to be used by the server for TLS communication
//在core.yaml中指定,讀取的tls目錄下server.cert檔案資料儲存於此
ServerCertificate []byte
//PEM-encoded private key to be used by the server for TLS communication
//在core.yaml中指定,讀取的tls目錄下server.key檔案資料儲存於此
ServerKey []byte
//Set of PEM-encoded X509 certificate authorities to optionally send
//as part of the server handshake
//在core.yaml中指定,讀取的tls目錄下ca.crt檔案資料儲存於此
ServerRootCAs [][]byte
//Whether or not TLS client must present certificates for authentication
RequireClientCert bool
//Set of PEM-encoded X509 certificate authorities to use when verifying
//client certificates
ClientRootCAs [][]byte
}
GRPCServer介面
type GRPCServer interface {
//返回GRPCServer監聽的地址
Address() string
//啟動下層的grpc.Server
Start() error
//停止下層的grpc.Server
Stop()
//返回GRPCServer例項物件
Server() *grpc.Server
//返回GRPCServer使用的網路監聽例項物件
Listener() net.Listener
//返回grpc.Server使用的Certificate
ServerCertificate() tls.Certificate
//標識GRPCServer例項是否使用TLS
TLSEnabled() bool
//增加PEM-encoded X509 certificate authorities到
//用於驗證客戶端certificates的authorities列表
AppendClientRootCAs(clientRoots [][]byte) error
//用於驗證客戶端certificates的authorities列表中
//刪除PEM-encoded X509 certificate authorities
RemoveClientRootCAs(clientRoots [][]byte) error
//基於一個PEM-encoded X509 certificate authorities列表
//設定用於驗證客戶端certificates的authorities列表
SetClientRootCAs(clientRoots [][]byte) error
}
GRPCServer實現例項
type grpcServerImpl struct {
//server指定的監聽地址,地址格式:hostname:port
address string
//監聽address的監聽物件,用於處理網路請求
listener net.Listener
//標準的grpc伺服器,通過此物件進行各種grpc服務操作
server *grpc.Server
//Certificate presented by the server for TLS communication
serverCertificate tls.Certificate
//Key used by the server for TLS communication
serverKeyPEM []byte
//List of certificate authorities to optionally pass to the client during
//the TLS handshake
serverRootCAs []tls.Certificate
//lock to protect concurrent access to append / remove
lock *sync.Mutex
//Set of PEM-encoded X509 certificate authorities used to populate
//the tlsConfig.ClientCAs indexed by subject
clientRootCAs map[string]*x509.Certificate
//TLS configuration used by the grpc server
tlsConfig *tls.Config
//Is TLS enabled?
tlsEnabled bool
}
同文件中的NewGRPCServerFromListener函式是grpcServerImpl的初始化函式,其中tls相關程式碼使用到了crypto下的tls、x509工具庫。
peer node start啟動的grpc服務
在start.go中serve函式中,建立的GRPCServer服務物件有兩個:
peerServer
globalEventsServer
peer伺服器peerServer,在/fabric/core/peer/peer.go中定義。事件伺服器globalEventsServer,是一個全域性單例,在/fabric/events/producer/producer.go中定義。
peerServer
建立peerServer
追溯serve函式中peerServer物件的建立,程式碼最終都使用了/fabric/core/comm/server.go中的NewGRPCServerFromListener函式建立了一個grpcServerImpl例項物件,物件中的。
//在serve函式中
peerServer, err := peer.CreatePeerServer(listenAddr, secureConfig)
//在CreatePeerServer函式中
peerServer, err = comm.NewGRPCServer(listenAddress, secureConfig)
//在NewGRPCServer函式中
lis, err := net.Listen("tcp", address)
NewGRPCServerFromListener(lis, secureConfig)
//在NewGRPCServerFromListener函式中,最終建立grpc標準伺服器並返回grpcServerImpl
grpcServer.server = grpc.NewServer(serverOpts...)
return grpcServer
註冊服務
註冊ChaincodeSupport服務
這是我們第一次遇到ChaincodeSupport這個物件,分為ChaincodeSupport服務原型和對應定義的ChaincodeSupport物件。從名稱上就可以看出,是對fabric的chaincode提供一系列。自然,peer的grpc服務中關於chaincode的操作需要這種支援服務。而ChaincodeSupport物件本身比較複雜,在/fabric/core/chaincode/chaincode_support.go中定義,提供了一干配置值成員和chaincode的執行環境,也實現了很多介面,如該處提到的服務所需的Register函式。因在此側重與grpc服務,因此不展開詳述。
服務原型定義:
ChaincodeSupport服務原型在/fabric/protos/peer/chaincode_shim.proto中定義,相應生成chaincode_shim.pb.go原始碼,在此只展示其中生成的服務端定義。
//服務原型
service ChaincodeSupport {
rpc Register(stream ChaincodeMessage) returns (stream ChaincodeMessage) {}
}
//生成服務端的介面和註冊函式
type ChaincodeSupportServer interface {
Register(ChaincodeSupport_RegisterServer) error
}
func RegisterChaincodeSupportServer(s *grpc.Server, srv ChaincodeSupportServer){
s.RegisterService(&_ChaincodeSupport_serviceDesc, srv)
}
//生成的服務端流的介面定義、介面例項
type ChaincodeSupport_RegisterServer interface {
Send(*ChaincodeMessage) error
Recv() (*ChaincodeMessage, error)
grpc.ServerStream
}//介面
type chaincodeSupportRegisterServer struct {
grpc.ServerStream
}
註冊服務:
在serve函式中使用registerChaincodeSupport(peerServer.Server())完成註冊。在該函式中:
//建立了一個ChaincodeSupport物件,基本都是讀取配置值填充成員
//ChaincodeSupport物件實現了生成的服務端介面ChaincodeSupportServer中的Register方法
ccSrv := chaincode.NewChaincodeSupport(...)
//利用生成的註冊函式,完成註冊
pb.RegisterChaincodeSupportServer(grpcServer, ccSrv)
在registerChaincodeSupport中還有一句scc.RegisterSysCCs()實現了系統鏈的註冊,將在系統鏈主題文章中進行詳述。
Register實現:
將在ChaincodeSupport主題文章中詳述。
註冊的其他服務
關於peerServer所註冊的服務,還有AdminServer、EndorserServer、GossipService,註冊的方式和註冊ChaincodeSupport服務一樣,畢竟都是用的gprc,還是萬變不離其宗的。各個服務到底負責什麼,會做什麼,將會在相應主題文章中詳述。
globalEventsServer
//在/fabric/events/producer/producer.go中定義
//全域性單例
var globalEventsServer *EventsServer
//定義和Chat實現
type EventsServer struct {
}
func (p *EventsServer) Chat(stream pb.Events_ChatServer) error {...}
//初始化函式
func NewEventsServer(bufferSize uint, timeout int) *EventsServer {...}
事件伺服器這個全域性單例沒有任何成員,只有一個專用初始化函式NewEventsServer,一個Chat實現。這一切看上去非常簡單,只是在專用初始化函式中的一句initializeEvents(bufferSize, timeout)又牽扯出了一段文字。原來globalEventsServer自己只是一個事件伺服器的代表,實際做事情的是initializeEvents(bufferSize, timeout)初始化並執行的eventProcessor物件,下文細說。
在serve函式中,使用ehubGrpcServer, err := createEventHubServer(secureConfig)完成了對事件伺服器的建立和註冊,ehubGrpcServer承接的就是globalEventsServer這個全域性單例。
建立globalEventsServer
//在createEventHubServer中
//建立grpcServerImpl物件,其中包含了grpc標準伺服器
lis, err = net.Listen("tcp", viper.GetString("peer.events.address"))
grpcServer, err := comm.NewGRPCServerFromListener(lis, secureConfig)
//建立事件伺服器,NewEventsServer返回的就是globalEventsServer
ehServer := producer.NewEventsServer(
uint(viper.GetInt("peer.events.buffersize")),
viper.GetInt("peer.events.timeout"))
註冊事件服務
事件服務原型定義:
事件服務原型在/fabric/protos/peer/events.proto中定義,相應生成events.pb.go原始碼,在此只展示其中生成的服務端定義。
//服務原型
service Events {
rpc Chat(stream SignedEvent) returns (stream Event) {}
}
//生成服務端的介面和註冊函式
type EventsServer interface {
Chat(Events_ChatServer) error
}
func RegisterEventsServer(s *grpc.Server, srv EventsServer) {
s.RegisterService(&_Events_serviceDesc, srv)
}
//生成的服務端流的介面定義、介面例項
type Events_ChatServer interface {
Send(*Event) error
Recv() (*SignedEvent, error)
grpc.ServerStream
}//介面
type eventsChatServer struct {
grpc.ServerStream
}//介面例項
註冊服務:
//還是在createEventHubServer中
//ehServer物件實現了生成的服務端介面EventsServer的Chat方法
//利用生成的註冊函式,完成註冊
pb.RegisterEventsServer(grpcServer.Server(), ehServer)
###Chat實現:
Chat的操作很清晰,迴圈的接收資料然後處理資料,即處理客戶端的Chat呼叫,這也自然而然的是grpc服務端所要做的。
handler, err := newEventHandler(stream),根據服務端流介面stream建立一個handler,用於處理接收的客戶端傳送的SignedEvent型別資料。handler於後文詳述。
in, err := stream.Recv(),接收SignedEvent型別的資料,這也是gprc雙向流的標準用法。
err = handler.HandleMessage(in),使用handler處理資料,HandleMessage是實際的資料處理函式。
在HandleMessage函式中,客戶端傳送簽名過的SignedEvent型別資料,檢查有效性後,若是註冊或登出事件,則註冊或登出,並返回Event型別資料;若是其他型別的事件,則列印一條錯誤訊息後返回。
evt, err := validateEventMessage(msg),利用local MSP驗證資料的有效性。關於local MSP將在對應主題文章中詳述。
switch evt.Event.(type) {…},判斷事件型別,並對註冊事件或登出事件進行註冊或登出。
if err := d.ChatStream.Send(evt);err != nil{…},若是註冊事件或登出事件,執行相應操作之後返回Event資料給客戶端,該資料是在驗證函式validateEventMessage中獲取的。
啟動事件服務
在serve函式中靠後的地方,if ehubGrpcServer != nil {go ehubGrpcServer.Start()}將該服務的gprc服務端啟動起來了。Start內部呼叫了grpc伺服器啟動的標準函式server.Serve(lis)。
事件實際處理者eventProcessor
事件處理者也是一個全域性單例,接收不同型別的事件進行處理。在/fabric/events/events.go中定義。
按照事件型別分類的事件處理鏈條,處理鏈handlerList介面有兩種具體實現:一般處理鏈genericHandlerList和chaincode專用處理鏈chaincodeHandlerList。在此以一般處理鏈為例,其實現了對handlers三個操作:
1. add
2. del
3. foreach
其中遍歷操作foreach則對handlers中的每個handler執行了由引數指定的動作。這裡的handlers映射了handler與bool值,bool值應該是起到類似於開關的作用。
handler在/fabric/events/handle.go中定義,其成員ChatStream是一個events.pb.go中生成的Events_ChatServer型別的gprc流介面,用於傳送流資料。handler掛載了一系列操作函式,如register,HandleMessage,SendMessage,Stop。
eventChannel
帶快取且專門處理Event型別資料的事件頻道,所有的事件都是通過此頻道分發出去的。快取大小由core.yaml定義為100,由initializeEvents的引數帶進來並設定。Event型別在由events.proto中定義並對應生成events.pb.go中定義。
timeout
頻道eventChannel若滿時等待的時間,在core.yaml中設定並有釋義。
我們將從其初始化函式,也就是上文提到的initializeEvents入手,分析事件處理者eventProcessor。
事件型別
initializeEvents的前兩句很容易理解,if gEventProcessor != nil{…}只為保證gEventProcessor的單例性質,gEventProcessor = &eventProcessor{…}則為gEventProcessor建立了物件例項,分配了記憶體空間。而addInternalEventTypes(),實質4次呼叫了AddEventType,則為新增內部事件型別並相應的分配了這些型別各自的處理鏈handlerList。
新增的已知的事件型別由/fabric/protos/peer/events.proto中定義,對應生成的events.pb.go中的四種:
EventType_BLOCK - 塊事件,對應genericHandlerList
EventType_CHAINCODE - chaincode事件,對應chaincodeHandlerList
EventType_REGISTER - addInternalEventTypes中有但是AddEventType未做處理
EventType_REJECTION - 拒絕事件,對應genericHandlerList
start函式
initializeEvents最後一句就是go gEventProcessor.start(),就是另起一個goroutine執行全域性單例gEventProcessor的start函式。start函式是一個死迴圈,不斷從eventChannel中接收資料Event型別資料並處理。過程如下:
e := <-ep.eventChannel,獲取一個事件e。
eType := getMessageType(e),獲取事件e的型別eType。
if hl, _ = ep.eventConsumers[eType]; hl == nil {...}
根據eType獲取該事件型別的處理鏈hl,同時判斷該型別是否存在,若不存在則會被忽略本次事件而continue繼續處理下一個事件。
**hl.foreach(…),**呼叫hl的foreach函式,foreach遍歷了hl.handlers中的每個handler,並對每個handler執行第二個引數指定的動作。該動作為func(h *handler){if e.Event !=nil{h.SendMessage(e)}},即呼叫每個handler的SendMessage傳送事件e。SendMessage則是使用handler中自有的ChatStream這個生成的grpc流服務介面去傳送事件e:err := d.ChatStream.Send(msg)。