1. 程式人生 > >Golang gRPC學習(03): grpc官方示例程式route_guide簡析

Golang gRPC學習(03): grpc官方示例程式route_guide簡析

程式碼主要來源於grpc的官方examples程式碼: [route_guide](https://github.com/grpc/grpc-go/tree/master/examples/route_guide) https://github.com/grpc/grpc-go/tree/master/examples/route_guide ## 服務定義 RouteGuide  ```go service RouteGuide {   // A simple RPC.   //   // Obtains the feature at a given position.   //   // A feature with an empty name is returned if there's no feature at the given   // position.   rpc GetFeature(Point) returns (Feature) {}   // A server-to-client streaming RPC.   //   // Obtains the Features available within the given Rectangle.  Results are   // streamed rather than returned at once (e.g. in a response message with a   // repeated field), as the rectangle may cover a large area and contain a   // huge number of features.   rpc ListFeatures(Rectangle) returns (stream Feature) {}   // A client-to-server streaming RPC.   //   // Accepts a stream of Points on a route being traversed, returning a   // RouteSummary when traversal is completed.   rpc RecordRoute(stream Point) returns (RouteSummary) {}   // A Bidirectional streaming RPC.   //   // Accepts a stream of RouteNotes sent while a route is being traversed,   // while receiving other RouteNotes (e.g. from other users).   rpc RouteChat(stream RouteNote) returns (stream RouteNote) {} } ``` **從定義裡看:** rpc GetFeature(Point) returns (Feature) : 定義最簡單的RPC服務 rpc ListFeatures(Rectangle) returns (stream Feature): 帶有 stream 的RPC服務,返回是stream rpc RecordRoute(stream Point) returns (RouteSummary):帶有 stream 的RPC服務,客戶端是stream rpc RouteChat(stream RouteNote) returns (stream RouteNote):2端都是stream的RPC服務 **rpc服務其他引數定義:** ```go message Point {   int32 latitude = 1;   int32 longitude = 2; } message Rectangle {   // One corner of the rectangle.   Point lo = 1;   // The other corner of the rectangle.   Point hi = 2; } message Feature {   // The name of the feature.   string name = 1;   // The point where the feature is detected.   Point location = 2; } message RouteNote {   // The location from which the message is sent.   Point location = 1;   // The message to be sent.   string message = 2; } message RouteSummary {   // The number of points received.   int32 point_count = 1;   // The number of known features passed while traversing the route.   int32 feature_count = 2;   // The distance covered in metres.   int32 distance = 3;   // The duration of the traversal in seconds.   int32 elapsed_time = 4; } ``` ## route_guide.pb.go route_guide.pb.go,這個檔案是幹嘛的? 這個是grpc自動生成的檔案,裡面有序列化,反序列化,執行是函式。 先看看裡面的介面,其中裡面有2個主要介面: - 1.RouteGuideClient interface - 2.RouteGuideServer interface **1.RouteGuideClient interface定義** ```go type RouteGuideClient interface {     GetFeature(ctx context.Context, in *Point, opts ...grpc.CallOption) (*Feature, error)     ListFeatures(ctx context.Context, in *Rectangle, opts ...grpc.CallOption) (RouteGuide_ListFeaturesClient, error)     RecordRoute(ctx context.Context, opts ...grpc.CallOption) (RouteGuide_RecordRouteClient, error)     RouteChat(ctx context.Context, opts ...grpc.CallOption) (RouteGuide_RouteChatClient, error) } ``` 仔細看看,這裡面定義的一些方法,都是route_guide.proto檔案裡的service RouteGuide裡的rpc方法,而且是一一對應的。 這個就是client需要操作的方法,是grpc自動生成的。客戶端請求這些方法來呼叫服務。 從這裡可以看出,grpc把proto中定義的服務對映為了一個interface,裡面包含service中定義的rpc方法。 **2.RouteGuideServer interface** ```go type RouteGuideServer interface {     GetFeature(context.Context, *Point) (*Feature, error)     ListFeatures(*Rectangle, RouteGuide_ListFeaturesServer) error     RecordRoute(RouteGuide_RecordRouteServer) error       RouteChat(RouteGuide_RouteChatServer) error } ``` 這個也是與route_guide.proto檔案裡的service RouteGuide裡的rpc方法是一一對應的。 同理,這裡跟上面的RouteGuideClient一樣,把service對映成了interface。 ## 客戶端請求client.go 看看3個用stream傳送資料的函式,客戶端用stream,服務端用stream,2端都用stream, ### ListFeatures(Rectangle) returns (stream Feature) 方法 我們先看看 rpc ListFeatures(Rectangle) returns (stream Feature) {} 這個rpc方法,它返回的是一個 stream,在 `client.go` 檔案裡它是用 `printFeatures()` 這個函式來表示的, ```go // printFeatures lists all the features within the given bounding Rectangle. func printFeatures(client pb.RouteGuideClient, rect *pb.Rectangle) {     log.Printf("Looking for features within %v", rect)     ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)     defer cancel() //這裡呼叫 ListFeatures(), rpc定義的方法,返回一個stream stream, err := client.ListFeatures(ctx, rect)     if err != nil {         log.Fatalf("%v.ListFeatures(_) = _, %v", client, err)     }     for {//for迴圈不斷的接收資料         feature, err := stream.Recv()         if err == io.EOF {             break         }         if err != nil {             log.Fatalf("%v.ListFeatures(_) = _, %v", client, err)         }         log.Println(feature)     } } ``` ### RecordRoute(stream Point) returns (RouteSummary) 這個 rpc RecordRoute(stream Point) returns (RouteSummary) 方法,通過stream傳送訊息,在 `client.go` 檔案裡是 `runRecordRoute()` 方法 ```go func runRecordRoute(client pb.RouteGuideClient) {     // Create a random number of random points     r := rand.New(rand.NewSource(time.Now().UnixNano()))     pointCount := int(r.Int31n(100)) + 2 // Traverse at least two points     var points []*pb.Point     for i := 0; i < pointCount; i++ {         points = append(points, randomPoint(r))     }     log.Printf("Traversing %d points.", len(points))     ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)     defer cancel() // 呼叫RecordRoute() 方法     stream, err := client.RecordRoute(ctx)     if err != nil {         log.Fatalf("%v.RecordRoute(_) = _, %v", client, err)     }     for _, point := range points { // stream.Send() stream方式傳送資料         if err := stream.Send(point); err != nil {             log.Fatalf("%v.Send(%v) = %v", stream, point, err)         }     } // 關閉     reply, err := stream.CloseAndRecv()     if err != nil {         log.Fatalf("%v.CloseAndRecv() got error %v, want %v", stream, err, nil)     }     log.Printf("Route summary: %v", reply) } ``` ### RouteChat(stream RouteNote) returns (stream RouteNote) 這個是 rpc RouteChat(stream RouteNote) returns (stream RouteNote),2端都是操作stream,在 `client.go` 裡面 `runRouteChat()` ```go func runRouteChat(client pb.RouteGuideClient) {     notes := []*pb.RouteNote{         {Location: &pb.Point{Latitude: 0, Longitude: 1}, Message: "First message"},         {Location: &pb.Point{Latitude: 0, Longitude: 2}, Message: "Second message"},         {Location: &pb.Point{Latitude: 0, Longitude: 3}, Message: "Third message"},         {Location: &pb.Point{Latitude: 0, Longitude: 1}, Message: "Fourth message"},         {Location: &pb.Point{Latitude: 0, Longitude: 2}, Message: "Fifth message"},         {Location: &pb.Point{Latitude: 0, Longitude: 3}, Message: "Sixth message"},     }     ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)     defer cancel()     stream, err := client.RouteChat(ctx)     if err != nil {         log.Fatalf("%v.RouteChat(_) = _, %v", client, err)     }     waitc := make(chan struct{})     go func() { //開一個協程來執行接收的動作         for {             in, err := stream.Recv() //stream方式接收             if err == io.EOF {                 // read done.                 close(waitc) //讀取完成close掉chan,給外面的waitc傳送一個結束的訊號表示協程工作已完成                 return             }             if err != nil {                 log.Fatalf("Failed to receive a note : %v", err)             }             log.Printf("Got message %s at point(%d, %d)", in.Message, in.Location.Latitude, in.Location.Longitude)         }     }() // 在main協程裡面 strem 傳送資料     for _, note := range notes {         if err := stream.Send(note); err != nil {             log.Fatalf("Failed to send a note: %v", err)         }     }     stream.CloseSend() // 關閉stream     <-waitc //stream 接收完成通知退出協程,main協程也結束執行 } ``` ## 服務端server.go 定義了一個struct,routeGuideServer struct,然後是操作這個struct, ```go type routeGuideServer struct {     pb.UnimplementedRouteGuideServer     savedFeatures []*pb.Feature // read-only after initialized     mu         sync.Mutex // protects routeNotes     routeNotes map[string][]*pb.RouteNote } ``` ### rpc GetFeature() 這個rpc方法,客戶端和服務端都不是stream方式傳送,獲取, ```go func (s *routeGuideServer) GetFeature(ctx context.Context, point *pb.Point) (*pb.Feature, error) {     for _, feature := range s.savedFeatures {         if proto.Equal(feature.Location, point) {             return feature, nil         }     }     // No feature was found, return an unnamed feature     return &pb.Feature{Location: point}, nil } ``` 裡面有一個比較的函式 proto.Equal(a, b Message) bool,在 `protobuf/proto/equal.go` 裡,比較2值是否相等。 ### rpc ListFeatures() rpc ListFeatures(Rectangle) returns (stream Feature) 這個rpc方法返回是一個stream,也就是服務端傳送是stream方式傳送, ```go func (s *routeGuideServer) ListFeatures(rect *pb.Rectangle, stream pb.RouteGuide_ListFeaturesServer) error {     for _, feature := range s.savedFeatures {         if inRange(feature.Location, rect) { //stream 方式傳送             if err := stream.Send(feature); err != nil {                 return err             }         }     }     return nil } ``` ### rpc RecordRoute rpc RecordRoute(stream Point) returns (RouteSummary) 客戶端傳送(請求服務端)的資料是一個stream,那服務端server接收肯定也要用stream, ```go func (s *routeGuideServer) RecordRoute(stream pb.RouteGuide_RecordRouteServer) error {     var pointCount, featureCount, distance int32     var lastPoint *pb.Point     startTime := time.Now()     for {         point, err := stream.Recv() //接收stream         if err == io.EOF {             endTime := time.Now()             return stream.SendAndClose(&pb.RouteSummary{                 PointCount:   pointCount,                 FeatureCount: featureCount,                 Distance:     distance,                 ElapsedTime:  int32(endTime.Sub(startTime).Seconds()),             })         }         if err != nil {             return err         }         pointCount++         for _, feature := range s.savedFeatures {             if proto.Equal(feature.Location, point) {                 featureCount++             }         }         if lastPoint != nil {             distance += calcDistance(lastPoint, point)         }         lastPoint = point     } } ``` RecordRoute 函式的引數 `pb.RouteGuide_RecordRouteServer` 是什麼? 它在 route_guide.pg.go 定義的是一個 interface,裡面有SendAndClose(), Recv() 方法, ```go type RouteGuide_RecordRouteServer interface {     SendAndClose(*RouteSummary) error     Recv() (*Point, error)     grpc.ServerStream } ``` ### rpc RouteChat rpc RouteChat(stream RouteNote) returns (stream RouteNote) 這個rpc方法,客戶端和服務端都是stream傳送,接收, ```go func (s *routeGuideServer) RouteChat(stream pb.RouteGuide_RouteChatServer) error {     for {         in, err := stream.Recv() // stream接收         if err == io.EOF {             return nil         }         if err != nil {             return err         }         key := serialize(in.Location)         s.mu.Lock()         s.routeNotes[key] = append(s.routeNotes[key], in)         // Note: this copy prevents blocking other clients while serving this one.         // We don't need to do a deep copy, because elements in the slice are         // insert-only and never modified.         rn := make([]*pb.RouteNote, len(s.routeNotes[key]))         copy(rn, s.routeNotes[key])         s.mu.Unlock()         for _, note := range rn {             if err := stream.Send(note); err != nil { //stream傳送                 return err             }         }     } } ``` ### newServer ```go // 執行服務端函式,返回一個struct,這個struct又實現了interface, //所以main函式裡可以直接呼叫 pb.RegisterRouteGuideServer(grpcServer, newServer()) func newServer() *routeGuideServer {     s := &routeGuideServer{routeNotes: make(map[string][]*pb.RouteNote)}     s.loadFeatures(*jsonDBFile)     return s } func main() {     flag.Parse()     lis, err := net.Listen("tcp", fmt.Sprintf("localhost:%d", *port))     if err != nil {         log.Fatalf("failed to listen: %v", err)     }     var opts []grpc.ServerOption     if *tls {         if *certFile == "" {             *certFile = testdata.Path("server1.pem")         }         if *keyFile == "" {             *keyFile = testdata.Path("server1.key")         }         creds, err := credentials.NewServerTLSFromFile(*certFile, *keyFile)         if err != nil {             log.Fatalf("Failed to generate credentials %v", err)         }         opts = []grpc.ServerOption{grpc.Creds(creds)}     }     grpcServer := grpc.NewServer(opts...)     pb.RegisterRouteGuideServer(grpcServer, newServer())     grpcServer.Serve(lis) } ``` ### 服務端一些輔助函式 `server.go` 檔案裡面還有一些輔助函式: **1.loadFeatures** ```go // 載入json形式的 features 資料,經緯度,名稱 func (s *routeGuideServer) loadFeatures(filePath string) ``` 下面這種格式的資料,跟 `route_guide.proto` 裡 `message Feature` 資料定義一致 ```json { "location": {         "latitude": 407838351,         "longitude": -746143763     },     "name": "Patriots Path, Mendham, NJ 07945, USA" } ``` **2.toRadians** 計算弧度 3.calcDistance 計算距離 4.inRange 在範圍內 5.serialize 序列化 ## **gRPC 系列程式碼地址:** > >- [01hello](https://github.com/jiujuan/grpc-tutorial/tree/master/01hello) grpc helloworld >- [02fourinteractionmode](https://github.com/jiujuan/grpc-tutorial/tree/master/02fourinteractionmode) grpc 四種傳輸方式 >- [03customer](https://github.com/jiujuan/grpc-tutorial/tree/master/03customer) grpc 一個小練