1. 程式人生 > >06.Fabric原始碼解析–GRPC服務

06.Fabric原始碼解析–GRPC服務

Fabric原始碼解析6–GRPC服務

GRPC簡介

GRPC是由自谷歌開發的一項多語言開源的RPC技術,在fabric用於實現客戶端與伺服器端的遠端呼叫。比如chaincode,客戶定義了一項rpc服務並相應生成了客戶端程式碼和服務端程式碼,在此基礎上進行業務邏輯上的開發後,分別執行服務端程式碼和客戶端程式碼,實現客戶端呼叫伺服器端服務的目的。由於gprc相對來說還是很複雜的,所有還請自行學習。在gprc官網非常詳細的資料和例子。

極其粗線條的寫一下grpc的用法,為後文伏筆:

  1. XXX.proto檔案中定義一個rpc服務

    service Events {
    rpc Chat(stream SignedEvent) returns (stream Event) {}
    }

  2. 命令列使用protoc生成對應的XXX.pb.go原始碼

在XXX.pb.go中,Client API for Events service處為供客戶端使用的介面定義、介面例項、介面例項的初始化函式。Server API for Events service處為供服務端使用的介面定義,註冊函式。

如果其中某一端或同時兩端為流式RPC,在有流的一端,會專門為其流生成介面定義、介面例項。可以直接使用生成的例項,也可以自己實現介面,自定義例項。介面定義的主要方法就是Send和Recv。

protoc --go_out=plugins=grpc:. XXX.proto

編寫客戶端程式碼

//注意,node start,而其啟動的基本都是後臺服務端的服務,
//因此本文中不涉及客戶端的程式碼。

//填充grpc網路連結連線選項
var opts []grpc.DialOption
opts = append(opts, grpc.WithInsecure())
//建立連線伺服器端的grpc連線物件
conn, err := grpc.Dial("0.0.0.0:7051", opts...)
defer conn.Close()
//使用連線物件做引數,利用XXX.pb.go中的初始化函式建立grpc客戶端物件
client := NewEventsClient(conn)
//呼叫服務
client.Chat(...)

編寫服務端程式碼

//定義一個監聽物件,即伺服器監聽的地址
lis,err := net.Listen("tcp",":7051")
//建立grpc伺服器選項並填充
var serverOpts []grpc.ServerOption
//建立標準的grpc伺服器物件
server = grpc.NewServer(serverOpts...)
//建立服務端物件,根據XXX.pb.go中生成的介面定義,自己實現服務端介面
type eventSever{...}
func (e *eventSever)Chat(...){...}
es := new(eventServer)
//使用XXX.pb.go中的註冊函式註冊服務,註冊到grpc伺服器物件
RegisterEventsServer(server,es)
//根據監聽物件啟動grpc伺服器物件
server.Serve(lis)

Fabric中的grpc服務介面和例項

在/fabric/core/comm/server.go中,定義了安全配置項,GPRCServer的介面、實現和初始化函式。預設情況下fabric中是不使用tls的。

TLS安全配置項

type SecureServerConfig struct {
    //Whether or not to use TLS for communication
    UseTLS bool
    //PEM-encoded X509 public key to be used by the server for TLS communication
    //在core.yaml中指定,讀取的tls目錄下server.cert檔案資料儲存於此
    ServerCertificate []byte
    //PEM-encoded private key to be used by the server for TLS communication
    //在core.yaml中指定,讀取的tls目錄下server.key檔案資料儲存於此
    ServerKey []byte
    //Set of PEM-encoded X509 certificate authorities to optionally send
    //as part of the server handshake
    //在core.yaml中指定,讀取的tls目錄下ca.crt檔案資料儲存於此
    ServerRootCAs [][]byte
    //Whether or not TLS client must present certificates for authentication
    RequireClientCert bool
    //Set of PEM-encoded X509 certificate authorities to use when verifying
    //client certificates
    ClientRootCAs [][]byte
}

GRPCServer介面

type GRPCServer interface {
    //返回GRPCServer監聽的地址
    Address() string
    //啟動下層的grpc.Server
    Start() error
    //停止下層的grpc.Server
    Stop()
    //返回GRPCServer例項物件
    Server() *grpc.Server
    //返回GRPCServer使用的網路監聽例項物件
    Listener() net.Listener
    //返回grpc.Server使用的Certificate
    ServerCertificate() tls.Certificate
    //標識GRPCServer例項是否使用TLS
    TLSEnabled() bool
    //增加PEM-encoded X509 certificate authorities到
    //用於驗證客戶端certificates的authorities列表
    AppendClientRootCAs(clientRoots [][]byte) error
    //用於驗證客戶端certificates的authorities列表中
    //刪除PEM-encoded X509 certificate authorities
    RemoveClientRootCAs(clientRoots [][]byte) error
    //基於一個PEM-encoded X509 certificate authorities列表
    //設定用於驗證客戶端certificates的authorities列表
    SetClientRootCAs(clientRoots [][]byte) error
}

GRPCServer實現例項

type grpcServerImpl struct {
    //server指定的監聽地址,地址格式:hostname:port
    address string
    //監聽address的監聽物件,用於處理網路請求
    listener net.Listener
    //標準的grpc伺服器,通過此物件進行各種grpc服務操作
    server *grpc.Server
    //Certificate presented by the server for TLS communication
    serverCertificate tls.Certificate
    //Key used by the server for TLS communication
    serverKeyPEM []byte
    //List of certificate authorities to optionally pass to the client during
    //the TLS handshake
    serverRootCAs []tls.Certificate
    //lock to protect concurrent access to append / remove
    lock *sync.Mutex
    //Set of PEM-encoded X509 certificate authorities used to populate
    //the tlsConfig.ClientCAs indexed by subject
    clientRootCAs map[string]*x509.Certificate
    //TLS configuration used by the grpc server
    tlsConfig *tls.Config
    //Is TLS enabled?
    tlsEnabled bool
}

同文件中的NewGRPCServerFromListener函式是grpcServerImpl的初始化函式,其中tls相關程式碼使用到了crypto下的tls、x509工具庫。

peer node start啟動的grpc服務

在start.go中serve函式中,建立的GRPCServer服務物件有兩個:

peerServer
globalEventsServer

peer伺服器peerServer,在/fabric/core/peer/peer.go中定義。事件伺服器globalEventsServer,是一個全域性單例,在/fabric/events/producer/producer.go中定義。

peerServer

建立peerServer
追溯serve函式中peerServer物件的建立,程式碼最終都使用了/fabric/core/comm/server.go中的NewGRPCServerFromListener函式建立了一個grpcServerImpl例項物件,物件中的。

//在serve函式中

peerServer, err := peer.CreatePeerServer(listenAddr, secureConfig)

//在CreatePeerServer函式中

peerServer, err = comm.NewGRPCServer(listenAddress, secureConfig)

//在NewGRPCServer函式中

lis, err := net.Listen("tcp", address)
NewGRPCServerFromListener(lis, secureConfig)

//在NewGRPCServerFromListener函式中,最終建立grpc標準伺服器並返回grpcServerImpl

grpcServer.server = grpc.NewServer(serverOpts...)
return grpcServer

註冊服務

註冊ChaincodeSupport服務

這是我們第一次遇到ChaincodeSupport這個物件,分為ChaincodeSupport服務原型和對應定義的ChaincodeSupport物件。從名稱上就可以看出,是對fabric的chaincode提供一系列。自然,peer的grpc服務中關於chaincode的操作需要這種支援服務。而ChaincodeSupport物件本身比較複雜,在/fabric/core/chaincode/chaincode_support.go中定義,提供了一干配置值成員和chaincode的執行環境,也實現了很多介面,如該處提到的服務所需的Register函式。因在此側重與grpc服務,因此不展開詳述。

服務原型定義:

ChaincodeSupport服務原型在/fabric/protos/peer/chaincode_shim.proto中定義,相應生成chaincode_shim.pb.go原始碼,在此只展示其中生成的服務端定義。

//服務原型

service ChaincodeSupport { 
rpc Register(stream ChaincodeMessage) returns (stream ChaincodeMessage) {} 
} 

//生成服務端的介面和註冊函式

type ChaincodeSupportServer interface { 
Register(ChaincodeSupport_RegisterServer) error 
} 
func RegisterChaincodeSupportServer(s *grpc.Server, srv ChaincodeSupportServer){ 
s.RegisterService(&_ChaincodeSupport_serviceDesc, srv) 
} 

//生成的服務端流的介面定義、介面例項

type ChaincodeSupport_RegisterServer interface { 
Send(*ChaincodeMessage) error 
Recv() (*ChaincodeMessage, error) 
grpc.ServerStream 
}//介面 

type chaincodeSupportRegisterServer struct { 
grpc.ServerStream 

}

註冊服務

在serve函式中使用registerChaincodeSupport(peerServer.Server())完成註冊。在該函式中:

//建立了一個ChaincodeSupport物件,基本都是讀取配置值填充成員
//ChaincodeSupport物件實現了生成的服務端介面ChaincodeSupportServer中的Register方法

ccSrv := chaincode.NewChaincodeSupport(...) 

//利用生成的註冊函式,完成註冊

pb.RegisterChaincodeSupportServer(grpcServer, ccSrv)

在registerChaincodeSupport中還有一句scc.RegisterSysCCs()實現了系統鏈的註冊,將在系統鏈主題文章中進行詳述。

Register實現

將在ChaincodeSupport主題文章中詳述。

註冊的其他服務

關於peerServer所註冊的服務,還有AdminServer、EndorserServer、GossipService,註冊的方式和註冊ChaincodeSupport服務一樣,畢竟都是用的gprc,還是萬變不離其宗的。各個服務到底負責什麼,會做什麼,將會在相應主題文章中詳述。

globalEventsServer
//在/fabric/events/producer/producer.go中定義
//全域性單例

var globalEventsServer *EventsServer

//定義和Chat實現

type EventsServer struct {
}
func (p *EventsServer) Chat(stream pb.Events_ChatServer) error {...}

//初始化函式

func NewEventsServer(bufferSize uint, timeout int) *EventsServer {...}

事件伺服器這個全域性單例沒有任何成員,只有一個專用初始化函式NewEventsServer,一個Chat實現。這一切看上去非常簡單,只是在專用初始化函式中的一句initializeEvents(bufferSize, timeout)又牽扯出了一段文字。原來globalEventsServer自己只是一個事件伺服器的代表,實際做事情的是initializeEvents(bufferSize, timeout)初始化並執行的eventProcessor物件,下文細說。

在serve函式中,使用ehubGrpcServer, err := createEventHubServer(secureConfig)完成了對事件伺服器的建立和註冊,ehubGrpcServer承接的就是globalEventsServer這個全域性單例。

建立globalEventsServer

//在createEventHubServer中
//建立grpcServerImpl物件,其中包含了grpc標準伺服器

lis, err = net.Listen("tcp", viper.GetString("peer.events.address"))
grpcServer, err := comm.NewGRPCServerFromListener(lis, secureConfig)

//建立事件伺服器,NewEventsServer返回的就是globalEventsServer

ehServer := producer.NewEventsServer(
        uint(viper.GetInt("peer.events.buffersize")),
        viper.GetInt("peer.events.timeout"))

註冊事件服務

事件服務原型定義:

事件服務原型在/fabric/protos/peer/events.proto中定義,相應生成events.pb.go原始碼,在此只展示其中生成的服務端定義。

//服務原型

service Events { 
rpc Chat(stream SignedEvent) returns (stream Event) {} 
} 

//生成服務端的介面和註冊函式

type EventsServer interface { 
Chat(Events_ChatServer) error 
} 
func RegisterEventsServer(s *grpc.Server, srv EventsServer) { 
s.RegisterService(&_Events_serviceDesc, srv) 
} 

//生成的服務端流的介面定義、介面例項

type Events_ChatServer interface { 
Send(*Event) error 
Recv() (*SignedEvent, error) 
grpc.ServerStream 
}//介面 
type eventsChatServer struct { 
grpc.ServerStream 
}//介面例項

註冊服務

//還是在createEventHubServer中
//ehServer物件實現了生成的服務端介面EventsServer的Chat方法
//利用生成的註冊函式,完成註冊
pb.RegisterEventsServer(grpcServer.Server(), ehServer)

###Chat實現

Chat的操作很清晰,迴圈的接收資料然後處理資料,即處理客戶端的Chat呼叫,這也自然而然的是grpc服務端所要做的。

handler, err := newEventHandler(stream),根據服務端流介面stream建立一個handler,用於處理接收的客戶端傳送的SignedEvent型別資料。handler於後文詳述。
in, err := stream.Recv(),接收SignedEvent型別的資料,這也是gprc雙向流的標準用法。
err = handler.HandleMessage(in),使用handler處理資料,HandleMessage是實際的資料處理函式。
在HandleMessage函式中,客戶端傳送簽名過的SignedEvent型別資料,檢查有效性後,若是註冊或登出事件,則註冊或登出,並返回Event型別資料;若是其他型別的事件,則列印一條錯誤訊息後返回。

evt, err := validateEventMessage(msg),利用local MSP驗證資料的有效性。關於local MSP將在對應主題文章中詳述。
switch evt.Event.(type) {…},判斷事件型別,並對註冊事件或登出事件進行註冊或登出。
if err := d.ChatStream.Send(evt);err != nil{…},若是註冊事件或登出事件,執行相應操作之後返回Event資料給客戶端,該資料是在驗證函式validateEventMessage中獲取的。
啟動事件服務
在serve函式中靠後的地方,if ehubGrpcServer != nil {go ehubGrpcServer.Start()}將該服務的gprc服務端啟動起來了。Start內部呼叫了grpc伺服器啟動的標準函式server.Serve(lis)。

事件實際處理者eventProcessor

事件處理者也是一個全域性單例,接收不同型別的事件進行處理。在/fabric/events/events.go中定義。

按照事件型別分類的事件處理鏈條,處理鏈handlerList介面有兩種具體實現:一般處理鏈genericHandlerList和chaincode專用處理鏈chaincodeHandlerList。在此以一般處理鏈為例,其實現了對handlers三個操作:

1. add
2. del
3. foreach

其中遍歷操作foreach則對handlers中的每個handler執行了由引數指定的動作。這裡的handlers映射了handler與bool值,bool值應該是起到類似於開關的作用。

handler在/fabric/events/handle.go中定義,其成員ChatStream是一個events.pb.go中生成的Events_ChatServer型別的gprc流介面,用於傳送流資料。handler掛載了一系列操作函式,如register,HandleMessage,SendMessage,Stop。

eventChannel

帶快取且專門處理Event型別資料的事件頻道,所有的事件都是通過此頻道分發出去的。快取大小由core.yaml定義為100,由initializeEvents的引數帶進來並設定。Event型別在由events.proto中定義並對應生成events.pb.go中定義。

timeout

頻道eventChannel若滿時等待的時間,在core.yaml中設定並有釋義。

我們將從其初始化函式,也就是上文提到的initializeEvents入手,分析事件處理者eventProcessor。

事件型別

initializeEvents的前兩句很容易理解,if gEventProcessor != nil{…}只為保證gEventProcessor的單例性質,gEventProcessor = &eventProcessor{…}則為gEventProcessor建立了物件例項,分配了記憶體空間。而addInternalEventTypes(),實質4次呼叫了AddEventType,則為新增內部事件型別並相應的分配了這些型別各自的處理鏈handlerList。

新增的已知的事件型別由/fabric/protos/peer/events.proto中定義,對應生成的events.pb.go中的四種:

EventType_BLOCK - 塊事件,對應genericHandlerList
EventType_CHAINCODE - chaincode事件,對應chaincodeHandlerList
EventType_REGISTER - addInternalEventTypes中有但是AddEventType未做處理
EventType_REJECTION - 拒絕事件,對應genericHandlerList

start函式

initializeEvents最後一句就是go gEventProcessor.start(),就是另起一個goroutine執行全域性單例gEventProcessor的start函式。start函式是一個死迴圈,不斷從eventChannel中接收資料Event型別資料並處理。過程如下:

e := <-ep.eventChannel,獲取一個事件e。
eType := getMessageType(e),獲取事件e的型別eType。
if hl, _ = ep.eventConsumers[eType]; hl == nil {...}

根據eType獲取該事件型別的處理鏈hl,同時判斷該型別是否存在,若不存在則會被忽略本次事件而continue繼續處理下一個事件。
**hl.foreach(…),**呼叫hl的foreach函式,foreach遍歷了hl.handlers中的每個handler,並對每個handler執行第二個引數指定的動作。該動作為func(h *handler){if e.Event !=nil{h.SendMessage(e)}},即呼叫每個handler的SendMessage傳送事件e。SendMessage則是使用handler中自有的ChatStream這個生成的grpc流服務介面去傳送事件e:err := d.ChatStream.Send(msg)。