go 中的grpc的stream 使用
阿新 • • 發佈:2021-07-22
前言
之前我們講了 grpc 怎麼簡單的使用 ,這次講講 grpc 中的 stream,srteam 顧名思義 就是 一種 流,可以源源不斷的 推送 資料,很適合 傳輸一些大資料,或者 服務端 和 客戶端 長時間 資料互動,比如 客戶端 可以向 服務端 訂閱 一個數據,服務端 就 可以利用 stream ,源源不斷地 推送資料。stream的種類:
客戶端推送 服務端 rpc GetStream (StreamReqData) returns (stream StreamResData){} 服務端推送 客戶端 rpc PutStream (stream StreamReqData) returns (StreamResData){} 客戶端與 服務端 互相 推送 rpc AllStream (stream StreamReqData) returns (stream StreamResData){}
其實這個流 已經 基本退化成 tcp了,grpc 底層為我們 分包了,所以真的很方便。
protobuf的定義:
syntax = "proto3";//宣告proto的版本 只能 是3,才支援 grpc //宣告 包名 package pro; //宣告grpc服務 service Greeter { /* 以下 分別是 服務端 推送流, 客戶端 推送流 ,雙向流。 */ rpc GetStream (StreamReqData) returns (stream StreamResData){} rpc PutStream (stream StreamReqData) returns (StreamResData){} rpc AllStream (stream StreamReqData) returns (stream StreamResData){} }//stream請求結構 message StreamReqData { string data = 1; } //stream返回結構 message StreamResData { string data = 1; }
我們在 protobuf 裡面 定義 要提供的服務,如果 你想把哪個資料 源源不斷的 推送 就在前面加個stream 就好了,定義好記得編譯。
服務端的實現:
package main import ( "context" "fmt" "google.golang.org/grpc" "grpc/pro" "log" "net" "sync" "time" ) const PORT = ":50051" type server struct { } //服務端 單向流 func (s *server)GetStream(req *pro.StreamReqData, res pro.Greeter_GetStreamServer) error{ i:= 0 for{ i++ res.Send(&pro.StreamResData{Data:fmt.Sprintf("%v",time.Now().Unix())}) time.Sleep(1*time.Second) if i >10 { break } } return nil } //客戶端 單向流 func (this *server) PutStream(cliStr pro.Greeter_PutStreamServer) error { for { if tem, err := cliStr.Recv(); err == nil { log.Println(tem) } else { log.Println("break, err :", err) break } } return nil } //客戶端服務端 雙向流 func(this *server) AllStream(allStr pro.Greeter_AllStreamServer) error { wg := sync.WaitGroup{} wg.Add(2) go func() { for { data, _ := allStr.Recv() log.Println(data) } wg.Done() }() go func() { for { allStr.Send(&pro.StreamResData{Data:"ssss"}) time.Sleep(time.Second) } wg.Done() }() wg.Wait() return nil } func main(){ //監聽埠 lis,err := net.Listen("tcp",PORT) if err != nil{ return } //建立一個grpc 伺服器 s := grpc.NewServer() //註冊事件 pro.RegisterGreeterServer(s,&server{}) //處理連結 s.Serve(lis) }
知識點:
- 每個函式都對應著 完成了 protobuf 裡面的 定義。
- 每個函式 形參都有對應的 推送 或者 接收 物件,我們只要 不斷迴圈 Recv(),或者 Send() 就能接收或者推送了!
- 當return出函式,就說明此次 推送 或者 接收 結束了,client 會 對應的 收到訊息!
客戶端呼叫:
package main import ( "google.golang.org/grpc" "grpc/pro" "log" "context" "time" _ "google.golang.org/grpc/balancer/grpclb" ) const ( ADDRESS = "localhost:50051" ) func main(){ //通過grpc 庫 建立一個連線 conn ,err := grpc.Dial(ADDRESS,grpc.WithInsecure()) if err != nil{ return } defer conn.Close() //通過剛剛的連線 生成一個client物件。 c := pro.NewGreeterClient(conn) //呼叫服務端推送流 reqstreamData := &pro.StreamReqData{Data:"aaa"} res,_ := c.GetStream(context.Background(),reqstreamData) for { aa,err := res.Recv() if err != nil { log.Println(err) break } log.Println(aa) } //客戶端 推送 流 putRes, _ := c.PutStream(context.Background()) i := 1 for { i++ putRes.Send(&pro.StreamReqData{Data:"ss"}) time.Sleep(time.Second) if i > 10 { break } } //服務端 客戶端 雙向流 allStr,_ := c.AllStream(context.Background()) go func() { for { data,_ := allStr.Recv() log.Println(data) } }() go func() { for { allStr.Send(&pro.StreamReqData{Data:"ssss"}) time.Sleep(time.Second) } }() select { } }
client 呼叫 流的函式, 就會 返回一個 流物件,只要 不斷地 對它進行讀取或者寫入,對應方就能收到。
總結:
grpc 的 stream 和 go的協程 配合 簡直完美。通過流 我們 可以更加 靈活的 實現自己的業務。如 訂閱,大資料傳輸等。
作者:xyt001
連結:https://www.jianshu.com/p/85e9cfa16247
來源:簡書
著作權歸作者所有。商業轉載請聯絡作者獲得授權,非商業轉載請註明出處。