1. 程式人生 > 其它 >golang實現一個gRPC攔截器

golang實現一個gRPC攔截器

1. 什麼是gRPC攔截器

我們以etcd一個寫請求的流程來看gRPC攔截器做了什麼工作

當etcd處理一個寫請求,比如 put hello world 時,首先etcd client會使用負載均衡演算法選擇一個etcd節點,發起gRPC呼叫;

然後etcd節點收到請求後經過gRPC攔截器、Quota模組後,進入KVServer模組...

攔截器,通俗一點理解就是在執行一段程式碼之前,先去執行另外一段程式碼。

攔截器就可以理解為gRPC生態中的中介軟體(是不是和web中介軟體非常類似),攔截器一般在做統一介面的認證工作

假設有一個方法handler(ctx context.Context),我想要給這個方法賦予一個能力:允許在這個方法之前列印一行日誌

2. gRPC攔截器分析

以下使用golang來分析一個簡單的攔截器邏輯

2.1 定義結構

我們定義一個結構interceptor,這個結構包含兩個引數,一個上下文資訊context和處理器handler函式

type handler func(ctx context.Context)
type interceptor func(ctx context.Context, h handler)

2.2 申明賦值

接下來,為了實現我們的目標,對每個handler 的每個操作,我們都需要經過攔截器,於是我們宣告兩個interceptorhandler的變數並賦值

var h = func(ctx context.Context) {
	fmt.Println("some logic ...")
}
var interceptor1 = func(ctx context.Context, h handler) {
	fmt.Println("intercept!")
	h(ctx)
}

2.3 編寫執行函式

我們執行一下函式,測試效果

func main() {
	var ctx context.Context
	var ceps []interceptor
    
	var h = func(ctx context.Context) {
		fmt.Println("some logic ...")
	}
	var interceptor1 = func(ctx context.Context, h handler) {
		fmt.Println("intercept!")
		h(ctx)
	}

	ceps = append(ceps, interceptor1)
	for _, cep := range ceps {
		cep(ctx, h)
	}
}

輸出結果為:

$ go run main.go

intercept!
some logic ...

看起來我們的攔截器已經生效了,我們在ceps陣列中再增加一個攔截器,看看會發生什麼

var interceptor2 = func(ctx context.Context, h handler) {
	fmt.Println("intercept_2!")
	h(ctx)
}
ceps = append(ceps, interceptor2)

輸出結果為:

$ go run main.go

intercept_1!
some logic ...
intercept_2!
some logic ...

可以看到,輸出結果明顯是不符合邏輯的

我們認為的攔截器是什麼?不管我們中間經過了多少個攔截器的處理,都要保證handler函式只執行一次,也就是我們的業務邏輯只能執行一次

2.4 gRPC-go

在gRPC-go的原始碼裡有一個函式chainUnaryClientInterceptors(cc),看函式名字也能猜出來是做什麼

這個函式就是把所有的攔截器串聯成了一個攔截器,這樣保證了請求會經過所有攔截器,而最終handler函式只會被最後執行一次

那麼將所有攔截器串聯是如何做到的呢?

來看看這個函式的實現:

// chainUnaryClientInterceptors chains all unary client interceptors into one.
func chainUnaryClientInterceptors(cc *ClientConn) {
	interceptors := cc.dopts.chainUnaryInts

	// Prepend dopts.unaryInt to the chaining interceptors if it exists,
	// since unaryInt will be executed before any other chained interceptors.
	if cc.dopts.unaryInt != nil {
		interceptors = append([]UnaryClientInterceptor{cc.dopts.unaryInt}, interceptors...)
	}
	var chaindInt UnaryClientInterceptor
	if len(interceptors) == 0 {
		chaindInt = nil
	} else if len(interceptors) == 1 {
		chaindInt = interceptors[0]
	} else {
		chaindInt = func(ctx context.Context, method string, req, reply interface{},
			cc *ClientConn, invoker UnaryInvoker, opts ...CallOption) error {
			return interceptors[0](ctx, method, req, reply, cc, getChainUnaryInvoker(interceptors, 0, invoker), opts...)
		}
	}

	cc.dopts.unaryInt = chaindInt
}

重點在第二個if-else判斷上,我們可以看到當攔截器數目超過一個時,會呼叫getChainUnaryInvoker()這個函式,再繼續看看這個函式是如何把攔截器串聯起來的

// getChainUnaryInvoker recursively generate the chained unary invoker.
func getChainUnaryInvoker(interceptors []UnaryClientInterceptor, curr int, finalInvoker UnaryInvoker) UnaryInvoker {
	if curr == len(interceptors)-1 {
		return finalInvoker
	}

	return func(ctx context.Context, method string, req, reply interface{}, cc *ClientConn, opts ...CallOption) error {
		return interceptors[curr+1](ctx, method, req, reply, cc, getChainUnaryInvoker(interceptors, curr+1, finalInvoker), opts...)
	}
}

可以看到getChainUnaryInvoker()其實就是一個遞迴函式,它返回了一個UnaryInvoker,其也是一個函式

type UnaryInvoker func(ctx context.Context, method string, req, reply interface{}, cc *ClientConn, opts ...CallOption) error

實際上這個UnaryInvoker函式例項化時會呼叫第curr+1interceptor,也就會最終返回一個鏈式結構:

最終將這個finalInvoker賦值給了cc.dopts.unaryInt,但注意到此時並沒有呼叫攔截器,那麼什麼時候開始呼叫的呢?

chained攔截器在下面這個Invoke()函式中實現了真正的攔截器邏輯

err := c.cc.Invoke(ctx, "/helloworld.Greeter/SayHello", in, out, opts...)
func (cc *ClientConn) Invoke(ctx context.Context, method string, args, reply interface{}, opts ...CallOption) error {
	opts = combine(cc.dopts.callOptions, opts)
	if cc.dopts.unaryInt != nil {
		return cc.dopts.unaryInt(ctx, method, args, reply, cc, invoke, opts...)
	}
	return invoke(ctx, method, args, reply, cc, opts...)
}

還記得cc.dopts.unaryInt是什麼嗎?它就是我們最終生成的串聯攔截器結構,從這個入口進行呼叫攔截器,最終就會呼叫所有的攔截器,而最後再執行invoke()這個核心業務邏輯

3. 實現一個攔截器

3.1 重新定義資料結構

我們之前的問題是,如何保證handler只執行一遍?

這裡我們將原來的handler進行拆解,成為invoker,然後重新定義一個handler,用於在invoker之前處理一些邏輯

type invoker func(ctx context.Context, interceptors []interceptor, h handler) error
type handler func(ctx context.Context)
type interceptor func(ctx context.Context, h handler, ivk invoker) error

3.2 串聯所有攔截器

接下來我們實現一個把所有攔截器串聯起來的方法

func getInvoker(ctx context.Context, interceptors []interceptor, curr int, ivk invoker) invoker {
	if curr == len(interceptors)-1 {
		return ivk
	}
	return func(ctx context.Context, interceptors []interceptor, h handler) error {
		return interceptors[curr+1](ctx, h, getInvoker(ctx, interceptors, curr+1, ivk))
	}
}

3.3 返回第一個interceptor作為入口

func getChainInterceptor(ctx context.Context, interceptors []interceptor, ivk invoker) interceptor {
	if len(interceptors) == 0 {
		return nil
	} else if len(interceptors) == 1 {
		return interceptors[0]
	} else {
		return func(ctx context.Context, h handler, ivk invoker) error {
			return interceptors[0](ctx, h, getInvoker(ctx, interceptors, 0, ivk))
		}
	}
}

3.4 測試

我們還是定義兩個攔截器,看看是否將會串聯執行

func main() {
	var ctx context.Context
	var ceps []interceptor
	var h = func(ctx context.Context) {
		fmt.Println("some logic before ...")
	}

	var interceptor1 = func(ctx context.Context, h handler, ivk invoker) error {
		h(ctx)
		return ivk(ctx, ceps, h)
	}
	var interceptor2 = func(ctx context.Context, h handler, ivk invoker) error {
		h(ctx)
		return ivk(ctx, ceps, h)
	}
	ceps = append(ceps, interceptor1, interceptor2)

	var ivk = func(ctx context.Context, interceptors []interceptor, h handler) error {
		fmt.Println("invoker start")
		return nil
	}

	cep := getChainInterceptor(ctx, ceps, ivk)
	cep(ctx, h, ivk)
}

輸出結果為:

$ go run main.go

some logic before ...
some logic before ...
invoker start

我們可以看到在呼叫真正的業務邏輯函式invoker()之前,呼叫了兩個攔截器,而業務邏輯只被執行了一次,這就實現了一個簡單的攔截器

參考:

https://zhuanlan.zhihu.com/p/80023990

https://zhuanlan.zhihu.com/p/376438559

https://blog.csdn.net/Gassuih/article/details/116146535