1. 程式人生 > 其它 >攔截器Interceptor

攔截器Interceptor

攔截器Interceptor

一、概述

想在每個 RPC 方法的前或後做某些事情,怎麼做?

gRPC 提供了 Interceptor 功能,包括客戶端攔截器和服務端攔截器。可以在接收到請求或者發起請求之前優先對請求中的資料做一些處理後再轉交給指定的服務處理並響應,很適合在這裡處理驗證、日誌等流程。

gRPC-go 在 v1.28.0版本增加了多 interceptor 支援,可以在不借助第三方庫(go-grpc-middleware)的情況下新增多個 interceptor 了。

go-grpc-middleware 中也提供了多種常用 interceptor ,可以直接使用。

在 gRPC 中,根據攔截的方法型別不同可以分為攔截 Unary 方法的一元攔截器

,和作用於 Stream 方法的流攔截器

在 gRPC 中,大類可分為兩種 RPC 方法,與攔截器的對應關係是:

  • 普通方法:一元攔截器(grpc.UnaryInterceptor)
  • 流方法:流攔截器(grpc.StreamInterceptor)

同時還分為服務端攔截器客戶端攔截器,所以一共有以下4種類型:

  • grpc.UnaryServerInterceptor
  • grpc.StreamServerInterceptor
  • grpc.UnaryClientInterceptor
  • grpc.StreamClientInterceptor

原始碼

二、客戶端攔截器

使用客戶端攔截器 只需要在 Dial的時候指定相應的 DialOption 即可。

Unary Interceptor

客戶端一元攔截器型別為 grpc.UnaryClientInterceptor,具體如下:

type UnaryClientInterceptor func(ctx context.Context, method string, req, reply interface{}, cc *ClientConn, invoker UnaryInvoker, opts ...CallOption) error

可以看到,所謂的攔截器其實就是一個函式,可以分為預處理(pre-processing)呼叫RPC方法(invoking RPC method)後處理(post-processing)

三個階段。

引數含義如下:

  • ctx:Go語言中的上下文,一般和 Goroutine 配合使用,起到超時控制的效果
  • method:當前呼叫的 RPC 方法名
  • req:本次請求的引數,只有在處理前階段修改才有效
  • reply:本次請求響應,需要在處理後階段才能獲取到
  • cc:gRPC 連線資訊
  • invoker:可以看做是當前 RPC 方法,一般在攔截器中呼叫 invoker 能達到呼叫 RPC 方法的效果,當然底層也是 gRPC 在處理。
  • opts:本次呼叫指定的 options 資訊

作為一個客戶端攔截器,可以在處理前檢查 req 看看本次請求帶沒帶 token 之類的鑑權資料,沒有的話就可以在攔截器中加上。

Stream Interceptor

type StreamClientInterceptor func(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, streamer Streamer, opts ...CallOption) (ClientStream, error)

由於 StreamAPI 和 UnaryAPI有所不同,因此攔截器方面也有所區別,比如 req 引數變成了 streamer 。同時其攔截過程也有所不同,不在是處理 req resp,而是對 streamer 這個流物件進行包裝,比如說實現自己的 SendMsg 和 RecvMsg 方法。

然後在這些方法中的預處理(pre-processing)呼叫RPC方法(invoking RPC method)後處理(post-processing)各個階段加入自己的邏輯。

三、服務端攔截器

服務端攔截器和客戶端攔截器類似,就不做過多描述。使用客戶端攔截器 只需要在 NewServer 的時候指定相應的 ServerOption 即可。

Unary Interceptor

定義如下:

type UnaryServerInterceptor func(ctx context.Context, req interface{}, info *UnaryServerInfo, handler UnaryHandler) (resp interface{}, err error)

引數具體含義如下:

  • ctx context.Context:請求上下文
  • req interface{}:RPC 方法的請求引數
  • info *UnaryServerInfo:RPC 方法的所有資訊
  • handler UnaryHandler:RPC 方法真正執行的邏輯

Stream Interceptor

type StreamServerInterceptor func(srv interface{}, ss ServerStream, info *StreamServerInfo, handler StreamHandler) error

四、UnaryInterceptor

一元攔截器可以分為3個階段:

  • 1)預處理(pre-processing)
  • 2)呼叫RPC方法(invoking RPC method)
  • 3)後處理(post-processing)

proto

syntax = "proto3";// 協議為proto3

//option go_package = "path;name";
//path 表示生成的go檔案的存放地址,會自動生成目錄的。
//name 表示生成的go檔案所屬的包名


//  生成pb.go命令:  protoc -I ./ --go_out=plugins=grpc:.\13unaryInterceptor\proto\  .\13unaryInterceptor\proto\simple.proto

option go_package = "./;proto";
package proto;

// 定義我們的服務(可定義多個服務,每個服務可定義多個介面)
service Simple{
  rpc Route (SimpleRequest) returns (SimpleResponse){};
}

// 定義傳送請求資訊
message SimpleRequest{
  // 定義傳送的引數,採用駝峰命名方式,小寫加下劃線,如:student_name
  // 引數型別 引數名 標識號(不可重複)
  string data = 1;
}

// 定義響應資訊
message SimpleResponse{
  // 定義接收的引數
  // 引數型別 引數名 標識號(不可重複)
  int32 code = 1;
  string value = 2;
}


編譯:

go-grpc-example> protoc -I ./ --go_out=plugins=grpc:.\13unaryInterceptor\proto\  .\13unaryInterceptor\proto\simple.proto

Client

package main

import (
	"context"
	pb "go-grpc-example/13unaryInterceptor/proto"
	"log"
	"time"

	"google.golang.org/grpc"
	"google.golang.org/grpc/credentials/insecure"
)

/*
@author RandySun
@create 2022-05-09-21:23
*/

const (
	// Address 監聽地址
	Address string = ":8001"
	// NetWork 網路通訊協議
	NetWork string = "tcp"
)

// unaryInterceptor 一個簡單的 unary interceptor 示例。
func unaryInterceptor(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
	// pre-processing var interceptor grpc.UnaryClientInterceptor
	start := time.Now()
	err := invoker(ctx, method, req, reply, cc, opts...) // invoking RPC method 呼叫 RPC 方法
	// post-processing
	end := time.Now()
	log.Fatalf("RPC: %s, req:%v start time: %s, end time: %s, err: %v", method, req, start.Format(time.RFC3339), end.Format(time.RFC3339), err)
	return err
}

func main() {
	// 連線伺服器
	conn, err := grpc.Dial(Address, grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithUnaryInterceptor(unaryInterceptor))
	if err != nil {
		log.Fatalf("net.Connect connect: %v", err)
	}
	defer conn.Close()
	// 建立gRpc連線
	grpcClient := pb.NewSimpleClient(conn)

	// 建立傳送結構體
	req := pb.SimpleRequest{
		Data: "grpc",
	}
	// 呼叫 Route 方法 同時傳入context.Context,  在有需要時可以讓我們改變RPC的行為,比如超時/取消一個正在執行的RPC
	res, err := grpcClient.Route(context.Background(), &req)
	if err != nil {
		log.Fatalf("Call Route err:%v", err)
	}
	// 列印返回直
	log.Println("服務的返回響應data:", res)

}

invoker(ctx, method, req, reply, cc, opts...) 是真正呼叫 RPC 方法。因此我們可以在呼叫前後增加自己的邏輯:比如呼叫前檢查一下引數之類的,呼叫後記錄一下本次請求處理耗時等。

建立連線時通過 grpc.WithUnaryInterceptor 指定要載入的攔截器即可。

Server

服務端的一元攔截器和客戶端類似:

package main

import (
	"context"
	pb "go-grpc-example/13unaryInterceptor/proto"
	"log"
	"net"
	"time"

	"google.golang.org/grpc"
)

/*
@author RandySun
@create 2022-05-09-21:23
*/

// SimpleService 定義我們的服務
type SimpleService struct {
}

// Route 實現Route方法
func (s *SimpleService) Route(ctx context.Context, req *pb.SimpleRequest) (*pb.SimpleResponse, error) {
	res := pb.SimpleResponse{
		Code:  200,
		Value: "hello " + req.Data,
	}
	return &res, nil
}

func unaryInterceptor(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
	start := time.Now()
	m, err := handler(ctx, req)
	end := time.Now()
	// 記錄請求引數 耗時 錯誤資訊等資料
	log.Fatalf("RPC: %s,req:%v start time: %s, end time: %s, err: %v", info.FullMethod, req, start.Format(time.RFC3339), end.Format(time.RFC3339), err)
	return m, err
}

const (
	// Address 監聽地址
	Address string = ":8001"
	// NetWork 網路通訊協議
	NetWork string = "tcp"
)

func main() {
	// 監聽本地埠
	listener, err := net.Listen(NetWork, Address)
	if err != nil {
		log.Fatalf("net.Listen err: %V", err)
	}
	log.Println(Address, "net.Listing...")
	// 建立grpc服務例項
	grpcServer := grpc.NewServer(grpc.UnaryInterceptor(unaryInterceptor))
	// 在grpc伺服器註冊我們的服務
	pb.RegisterSimpleServer(grpcServer, &SimpleService{})

	err = grpcServer.Serve(listener)
	if err != nil {
		log.Fatalf("grpcService.Serve err:%v", err)
	}
	log.Println("grpcService.Serve run succ")
}

服務端則是在 NewServer 時指定攔截器:

Test

Server

go build .\service.go
.\service.exe
2022/04/09 20:59:19 :8001 net.Listing...
2022/04/09 20:59:26 RPC: /proto.Simple/Route,req:data:"grpc" start time: 2022-04-09T20:59:26+08:00, end time: 2022-04-09T20:59:26+08:00, err: <nil>


Client

go build .\client.go
.\client.exe
2022/04/09 20:59:26 RPC: /proto.Simple/Route, req:data:"grpc" start time: 2022-04-09T20:59:26+08:00, end time: 2022-04-09T20:59:26+08:00, err: rpc error: code = Unavailable desc = error reading from server: read tcp 127.
0.0.1:58879->127.0.0.1:8001: wsarecv: An existing connection was forcibly closed by the remote host.

五、StreamInterceptor

流攔截器過程和一元攔截器有所不同,同樣可以分為3個階段:

  • 1)預處理(pre-processing)
  • 2)呼叫RPC方法(invoking RPC method)
  • 3)後處理(post-processing)

預處理階段和一元攔截器類似,但是呼叫RPC方法和後處理這兩個階段則完全不同。

StreamAPI 的請求和響應都是通過 Stream 進行傳遞的,更進一步是通過 Streamer 呼叫 SendMsg 和 RecvMsg 這兩個方法獲取的。

然後 Streamer 又是呼叫RPC方法來獲取的,所以在流攔截器中我們可以對 Streamer 進行包裝,然後實現 SendMsg 和 RecvMsg 這兩個方法

proto

syntax = "proto3";// 協議為proto3


package proto;
//option go_package = "grpc/03serverStream/proto";
option go_package = "./;proto";


//  protoc -I ./ --go_out=plugins=grpc:.\14streamInterceptor\proto\  .\14streamInterceptor\proto\bothStream.proto
// 定義我們的服務(可定義多個服務,每個服務可定義多個介面)

service Stream{
  // 雙向流式rpc,同時在請求引數前和響應引數前加上stream
  rpc Conversations(stream StreamRequest) returns(stream StreamResponse){};
}

// 定義傳送請求資訊
message StreamRequest{
  //流請求引數
  string question = 1;
}

// 定義流式響應資訊
message StreamResponse{
  //流響應資料
  string answer = 1;
}

編譯:

go-grpc-example> protoc -I ./ --go_out=plugins=grpc:.\14streamInterceptor\proto\  .\14streamInterceptor\proto\bothStream.proto

Client

本例中通過結構體嵌入的方式,對 Streamer 進行包裝,在 SendMsg 和 RecvMsg 之前打印出具體的值。

package main

import (
	"context"
	"fmt"
	pb "go-grpc-example/14streamInterceptor/proto"

	"io"
	"log"
	"strconv"
	"time"

	"google.golang.org/grpc"
	"google.golang.org/grpc/credentials/insecure"
)

/*
@author RandySun
@create 2022-05-09-21:43
*/
// Address 連線地址
const Address string = ":8000"

var streamClient pb.StreamClient

// conversations 呼叫服務端的Conversations方法
func conversations() {
	//呼叫服務端的Conversations方法,獲取流
	stream, err := streamClient.Conversations(context.Background())
	if err != nil {
		log.Fatalf("get conversations stream err: %v", err)
	}
	for n := 0; n < 5; n++ {
		fmt.Println(12223)
		err := stream.Send(&pb.StreamRequest{Question: "stream client rpc " + strconv.Itoa(n)})
		if err != nil {
			log.Fatalf("stream request err: %v", err)
		}
		// 接收服務端訊息
		res, err := stream.Recv()
		if err == io.EOF {
			break
		}
		if err != nil {
			log.Fatalf("Conversations get stream err: %v", err)
		}
		// 列印返回值
		log.Println(res.Answer)
	}
	//最後關閉流
	err = stream.CloseSend()
	if err != nil {
		log.Fatalf("Conversations close stream err: %v", err)
	}
}

// wrappedStream  用於包裝 grpc.ClientStream 結構體並攔截其對應的方法。
type wrappedStream struct {
	grpc.ClientStream
}

func newWrappedStream(s grpc.ClientStream) grpc.ClientStream {
	return &wrappedStream{s}
}

func (w *wrappedStream) RecvMsg(m interface{}) error {
	log.Fatalf("Receive a message2 (Type: %T) at %v", m, time.Now().Format(time.RFC3339))
	return w.ClientStream.RecvMsg(m)
}

func (w *wrappedStream) SendMsg(m interface{}) error {
	log.Fatalf("Send a message1 (Type: %T) at %v", m, time.Now().Format(time.RFC3339))
	return w.ClientStream.SendMsg(m)
}

// streamInterceptor 一個簡單的 stream interceptor 示例。
func streamInterceptor(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) {
	s, err := streamer(ctx, desc, cc, method, opts...)
	if err != nil {
		return nil, err
	}
	log.Fatalf("一個簡單的 client stream interceptor 示例")

	return newWrappedStream(s), nil
}
func main() {
	// 連線伺服器
	conn, err := grpc.Dial(Address, grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithStreamInterceptor(streamInterceptor))
	if err != nil {
		log.Fatalf("net.Connect err: %v", err)
	}
	defer conn.Close()

	// 建立gRPC連線
	streamClient = pb.NewStreamClient(conn)
	conversations()
}

連線時則通過 grpc.WithStreamInterceptor 指定要載入的攔截器。

Server

和客戶端類似。

相似的,通過 函式指定要載入的攔截器。

package main

import (
	"fmt"
	pb "go-grpc-example/14streamInterceptor/proto"

	"io"
	"log"
	"net"
	"strconv"
	"time"

	"google.golang.org/grpc"
)

/*
@author RandySun
@create 2022-05-09-21:43
*/
// StreamService 定義我們的服務
type StreamService struct{}

// Conversations 實現Conversations方法
func (s *StreamService) Conversations(srv pb.Stream_ConversationsServer) error {
	n := 1
	for {
		fmt.Println(1111)
		req, err := srv.Recv()
		if err == io.EOF {
			return nil
		}

		if err != nil {
			return err
		}
		err = srv.Send(&pb.StreamResponse{
			Answer: "from stream server answer: the " + strconv.Itoa(n) + "question is " + req.Question,
		})

		if err != nil {
			return err
		}

		n++
		log.Printf("from stream client question: %s", req.Question)
	}
}

const (
	// Address 監聽地址
	Address string = ":8000"
	// Network 網路通訊協議
	Network string = "tcp"
)

type wrappedStream struct {
	grpc.ServerStream
}

func newWrappedStream(s grpc.ServerStream) grpc.ServerStream {
	return &wrappedStream{s}
}

func (w *wrappedStream) RecvMsg(m interface{}) error {
	log.Fatalf("Receive a message (Type: %T) at %s", m, time.Now().Format(time.RFC3339))
	return w.ServerStream.RecvMsg(m)
}

func (w *wrappedStream) SendMsg(m interface{}) error {
	log.Fatalf("Send a message (Type: %T) at %v", m, time.Now().Format(time.RFC3339))
	return w.ServerStream.SendMsg(m)
}

func streamInterceptor(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
	// 包裝 grpc.ServerStream 以替換 RecvMsg SendMsg這兩個方法。
	err := handler(srv, newWrappedStream(ss))
	if err != nil {
		log.Fatalf("RPC failed with error %v", err)
	}
	log.Fatalf("一個簡單的 server stream interceptor 示例")

	return err
}

func main() {
	// 監聽本地埠
	listener, err := net.Listen(Network, Address)
	if err != nil {
		log.Fatalf("net.Listen err: %v", err)
	}
	log.Println(Address + " net.Listing...")
	// 新建gRPC伺服器例項
	grpcServer := grpc.NewServer(grpc.StreamInterceptor(streamInterceptor))
	// 在gRPC伺服器註冊我們的服務
	pb.RegisterStreamServer(grpcServer, &StreamService{})

	//用伺服器 Serve() 方法以及我們的埠資訊區實現阻塞等待,直到程序被殺死或者 Stop() 被呼叫
	err = grpcServer.Serve(listener)
	if err != nil {
		log.Fatalf("grpcServer.Serve err: %v", err)
	}
}

五、如何實現多個攔截器

另外,可以發現 gRPC 本身居然只能設定一個攔截器,難道所有的邏輯都只能寫在一起?

關於這一點,你可以放心。採用開源專案 go-grpc-middleware 就可以解決這個問題,本章也會使用它。

import "github.com/grpc-ecosystem/go-grpc-middleware"

myServer := grpc.NewServer(
    grpc.StreamInterceptor(grpc_middleware.ChainStreamServer(
        ...
    )),
    grpc.UnaryInterceptor(grpc_middleware.ChainUnaryServer(
       ...
    )),
)

編寫 gRPC interceptor 的程式碼,我們會將實現以下攔截器:

  • logging:RPC 方法的入參出參的日誌輸出
  • recover:RPC 方法的異常保護和日誌輸出

proto:

syntax = "proto3";// 協議為proto3

//option go_package = "path;name";
//path 表示生成的go檔案的存放地址,會自動生成目錄的。
//name 表示生成的go檔案所屬的包名

//   protoc -I ./ --go_out=plugins=grpc:.\15manyInterceptor\proto\  .\15manyInterceptor\proto\simple.proto
option go_package = "./;proto";
package proto;

// 定義我們的服務(可定義多個服務,每個服務可定義多個介面)
service Simple{
  rpc Route (SimpleRequest) returns (SimpleResponse){};
}

// 定義傳送請求資訊
message SimpleRequest{
  // 定義傳送的引數,採用駝峰命名方式,小寫加下劃線,如:student_name
  // 引數型別 引數名 標識號(不可重複)
  string data = 1;
}

// 定義響應資訊
message SimpleResponse{
  // 定義接收的引數
  // 引數型別 引數名 標識號(不可重複)
  int32 code = 1;
  string value = 2;
}


編譯:

go-grpc-example> protoc -I ./ --go_out=plugins=grpc:.\15manyInterceptor\proto\  .\15manyInterceptor\proto\simple.proto

實現client

package main

import (
	"context"
	"fmt"
	pb "go-grpc-example/15manyInterceptor/proto"

	"log"
	"time"

	"google.golang.org/grpc"
	"google.golang.org/grpc/credentials/insecure"
)

/*
@author RandySun
@create 2022-05-09-21:53
*/
const (
	// Address 監聽地址
	Address string = ":8001"
	// NetWork 網路通訊協議
	NetWork string = "tcp"
)

// unaryInterceptor 一個簡單的 unary interceptor 示例。
func unaryInterceptor(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
	// pre-processing var interceptor grpc.UnaryClientInterceptor
	start := time.Now()
	err := invoker(ctx, method, req, reply, cc, opts...) // invoking RPC method 呼叫 RPC 方法
	// post-processing
	end := time.Now()
	log.Fatalf("RPC: %s, req:%v start time: %s, end time: %s, err: %v", method, req, start.Format(time.RFC3339), end.Format(time.RFC3339), err)
	return err
}

func main() {
	// 連線伺服器
	conn, err := grpc.Dial(Address, grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithUnaryInterceptor(unaryInterceptor))
	if err != nil {
		log.Fatalf("net.Connect connect: %v", err)
	}
	defer conn.Close()
	// 建立gRpc連線
	grpcClient := pb.NewSimpleClient(conn)

	// 建立傳送結構體
	req := pb.SimpleRequest{
		Data: "grpc",
	}
	// 呼叫 Route 方法 同時傳入context.Context,  在有需要時可以讓我們改變RPC的行為,比如超時/取消一個正在執行的RPC
	res, err := grpcClient.Route(context.Background(), &req)
	if err != nil {
		log.Fatalf("Call Route err:%v", err)
	}
	fmt.Println(res, 44444)
	// 列印返回直
	log.Println("服務的返回響應data:", res)

}

實現 server interceptor

logging

func LoggingInterceptor(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
	log.Printf("gRPC method: %s, %v", info.FullMethod, req)
	resp, err := handler(ctx, req)
	log.Printf("gRPC method: %s, %v", info.FullMethod, resp)
	return resp, err
}

recover

func RecoveryInterceptor(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) {
	defer func() {
		if e := recover(); e != nil {
			debug.PrintStack()
			err = status.Errorf(codes.Internal, "Panic err: %v", e)
		}
	}()

	return handler(ctx, req)
}
package main

import (
	"context"
	pb "go-grpc-example/15manyInterceptor/proto"
	"log"
	"net"
	"runtime/debug"

	grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware"
	"google.golang.org/grpc"
	"google.golang.org/grpc/codes"
	"google.golang.org/grpc/status"
)

/*
@author RandySun
@create 2022-05-09-21:53
*/
// SimpleService 定義我們的服務
type SimpleService struct {
}

// Route 實現Route方法
func (s *SimpleService) Route(ctx context.Context, req *pb.SimpleRequest) (*pb.SimpleResponse, error) {
	res := pb.SimpleResponse{
		Code:  200,
		Value: "hello " + req.Data,
	}
	return &res, nil
}

// LoggingInterceptor RPC 方法的入參出參的日誌輸出
func LoggingInterceptor(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
	log.Printf("gRPC method: %s, %v", info.FullMethod, req)
	resp, err := handler(ctx, req)
	log.Printf("gRPC method: %s, %v", info.FullMethod, resp)
	return resp, err
}

// RecoveryInterceptor RPC 方法的異常保護和日誌輸出
func RecoveryInterceptor(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) {
	//RPC 方法的異常保護和日誌輸出
	defer func() {
		if e := recover(); e != nil {
			debug.PrintStack()
			err = status.Errorf(codes.Internal, "Panic err: %v", e)
		}
	}()

	return handler(ctx, req)
}

const (
	// Address 監聽地址
	Address string = ":8001"
	// NetWork 網路通訊協議
	NetWork string = "tcp"
)

func main() {
	// 監聽本地埠
	listener, err := net.Listen(NetWork, Address)
	if err != nil {
		log.Fatalf("net.Listen err: %V", err)
	}
	log.Println(Address, "net.Listing...")
	// 建立grpc服務例項
	opts := []grpc.ServerOption{
		grpc_middleware.WithUnaryServerChain(
			RecoveryInterceptor,
			LoggingInterceptor,
		),
	}
	grpcServer := grpc.NewServer(opts...)
	// 在grpc伺服器註冊我們的服務
	pb.RegisterSimpleServer(grpcServer, &SimpleService{})

	err = grpcServer.Serve(listener)
	if err != nil {
		log.Fatalf("grpcService.Serve err:%v", err)
	}
	log.Println("grpcService.Serve run succ")
}

logging

啟動 server.go,執行 client.go 發起請求,得到結果:

$ go run server.go
2022/04/09 21:34:14 :8001 net.Listing...
2022/04/09 21:34:21 gRPC method: /proto.Simple/Route, data:"grpc"
2022/04/09 21:34:21 gRPC method: /proto.Simple/Route, code:200 value:"hello grpc"

recover

在 RPC 方法中人為地製造執行時錯誤,再重複啟動 server/client.go,得到結果:

client

$ go run client.go
code:200  value:"hello grpc" 44444
2022/04/09 21:44:36 服務的返回響應data: code:200  value:"hello grpc"

server

$ go run server.go
goroutine 23 [running]:
runtime/debug.Stack(0xc420223588, 0x1033da9, 0xc420001980)
	/usr/local/Cellar/go/1.10.1/libexec/src/runtime/debug/stack.go:24 +0xa7
runtime/debug.PrintStack()
	/usr/local/Cellar/go/1.10.1/libexec/src/runtime/debug/stack.go:16 +0x22
main.RecoveryInterceptor.func1(0xc420223a10)
...

檢查服務是否仍然執行,即可知道 Recovery 是否成功生效

六、小結

1)攔截器分類與定義 gRPC 攔截器可以分為:一元攔截器和流攔截器,服務端攔截器和客戶端攔截器。

一共有以下4種類型:

  • grpc.UnaryServerInterceptor
  • grpc.StreamServerInterceptor
  • grpc.UnaryClientInterceptor
  • grpc.StreamClientInterceptor

攔截器本質上就是一個特定型別的函式,所以實現攔截器只需要實現對應型別方法(方法簽名相同)即可。

2)攔截器執行過程

一元攔截器

  • 1)預處理
  • 2)呼叫RPC方法
  • 3)後處理

流攔截器

  • 1)預處理
  • 2)呼叫RPC方法 獲取 Streamer
  • 3)後處理
    • 呼叫 SendMsg 、RecvMsg 之前
    • 呼叫 SendMsg 、RecvMsg
    • 呼叫 SendMsg 、RecvMsg 之後

3)攔截器使用及執行順序

配置多個攔截器時,會按照引數傳入順序依次執行

所以,如果想配置一個 Recovery 攔截器則必須放在第一個,放在最後則無法捕獲前面執行的攔截器中觸發的 panic。

同時也可以將 一元和流攔截器一起配置,gRPC 會根據不同方法選擇對應型別的攔截器執行。