1. 程式人生 > 其它 >go 中的grpc的stream 使用

go 中的grpc的stream 使用

前言

之前我們講了 grpc 怎麼簡單的使用 ,這次講講 grpc 中的 stream,srteam 顧名思義 就是 一種 流,可以源源不斷的 推送 資料,很適合 傳輸一些大資料,或者 服務端 和 客戶端 長時間 資料互動,比如 客戶端 可以向 服務端 訂閱 一個數據,服務端 就 可以利用 stream ,源源不斷地 推送資料。

stream的種類:

客戶端推送 服務端 rpc GetStream (StreamReqData) returns (stream StreamResData){}
服務端推送 客戶端 rpc PutStream (stream StreamReqData) returns (StreamResData){}
客戶端與 服務端 互相 推送 rpc AllStream (stream StreamReqData) returns (stream StreamResData){}

  

其實這個流 已經 基本退化成 tcp了,grpc 底層為我們 分包了,所以真的很方便。


protobuf的定義:

syntax = "proto3";//宣告proto的版本 只能 是3,才支援 grpc

//宣告 包名
package pro;

//宣告grpc服務
service Greeter {
   /*
   以下 分別是 服務端 推送流, 客戶端 推送流 ,雙向流。
   */
  rpc GetStream (StreamReqData) returns (stream StreamResData){}
  rpc PutStream (stream StreamReqData) returns (StreamResData){}
  rpc AllStream (stream StreamReqData) returns (stream StreamResData){}
}


//stream請求結構 message StreamReqData { string data = 1; } //stream返回結構 message StreamResData { string data = 1; }

我們在 protobuf 裡面 定義 要提供的服務,如果 你想把哪個資料 源源不斷的 推送 就在前面加個stream 就好了,定義好記得編譯。

服務端的實現:

package main

import (
    "context"
    "fmt"
    "google.golang.org/grpc"
    "grpc/pro"
    "log"
    "
net" "sync" "time" ) const PORT = ":50051" type server struct { } //服務端 單向流 func (s *server)GetStream(req *pro.StreamReqData, res pro.Greeter_GetStreamServer) error{ i:= 0 for{ i++ res.Send(&pro.StreamResData{Data:fmt.Sprintf("%v",time.Now().Unix())}) time.Sleep(1*time.Second) if i >10 { break } } return nil } //客戶端 單向流 func (this *server) PutStream(cliStr pro.Greeter_PutStreamServer) error { for { if tem, err := cliStr.Recv(); err == nil { log.Println(tem) } else { log.Println("break, err :", err) break } } return nil } //客戶端服務端 雙向流 func(this *server) AllStream(allStr pro.Greeter_AllStreamServer) error { wg := sync.WaitGroup{} wg.Add(2) go func() { for { data, _ := allStr.Recv() log.Println(data) } wg.Done() }() go func() { for { allStr.Send(&pro.StreamResData{Data:"ssss"}) time.Sleep(time.Second) } wg.Done() }() wg.Wait() return nil } func main(){ //監聽埠 lis,err := net.Listen("tcp",PORT) if err != nil{ return } //建立一個grpc 伺服器 s := grpc.NewServer() //註冊事件 pro.RegisterGreeterServer(s,&server{}) //處理連結 s.Serve(lis) }

知識點:

  1. 每個函式都對應著 完成了 protobuf 裡面的 定義。
  2. 每個函式 形參都有對應的 推送 或者 接收 物件,我們只要 不斷迴圈 Recv(),或者 Send() 就能接收或者推送了!
  3. 當return出函式,就說明此次 推送 或者 接收 結束了,client 會 對應的 收到訊息!

客戶端呼叫:

package main

import (
    "google.golang.org/grpc"

    "grpc/pro"
    "log"
    "context"
    "time"
    _ "google.golang.org/grpc/balancer/grpclb"
)

const (
    ADDRESS = "localhost:50051"
)


func main(){
    //通過grpc 庫 建立一個連線
    conn ,err := grpc.Dial(ADDRESS,grpc.WithInsecure())
    if err != nil{
        return
    }
    defer conn.Close()
    //通過剛剛的連線 生成一個client物件。
    c := pro.NewGreeterClient(conn)
    //呼叫服務端推送流
    reqstreamData := &pro.StreamReqData{Data:"aaa"}
    res,_ := c.GetStream(context.Background(),reqstreamData)
    for {
        aa,err := res.Recv()
        if err != nil {
            log.Println(err)
            break
        }
        log.Println(aa)
    }
    //客戶端 推送 流
    putRes, _ := c.PutStream(context.Background())
    i := 1
    for {
        i++
        putRes.Send(&pro.StreamReqData{Data:"ss"})
        time.Sleep(time.Second)
        if i > 10 {
            break
        }
    }
    //服務端 客戶端 雙向流
    allStr,_ := c.AllStream(context.Background())
    go func() {
        for {
            data,_ := allStr.Recv()
            log.Println(data)
        }
    }()

    go func() {
        for {
            allStr.Send(&pro.StreamReqData{Data:"ssss"})
            time.Sleep(time.Second)
        }
    }()

    select {
    }

}

client 呼叫 流的函式, 就會 返回一個 流物件,只要 不斷地 對它進行讀取或者寫入,對應方就能收到。

總結:

grpc 的 stream 和 go的協程 配合 簡直完美。通過流 我們 可以更加 靈活的 實現自己的業務。如 訂閱,大資料傳輸等。



作者:xyt001
連結:https://www.jianshu.com/p/85e9cfa16247
來源:簡書
著作權歸作者所有。商業轉載請聯絡作者獲得授權,非商業轉載請註明出處。