golang實現一個gRPC攔截器
1. 什麼是gRPC攔截器
我們以etcd一個寫請求的流程來看gRPC攔截器做了什麼工作
當etcd處理一個寫請求,比如 put hello world 時,首先etcd client會使用負載均衡演算法選擇一個etcd節點,發起gRPC呼叫;
然後etcd節點收到請求後經過gRPC攔截器、Quota模組後,進入KVServer模組...
攔截器,通俗一點理解就是在執行一段程式碼之前,先去執行另外一段程式碼。
攔截器就可以理解為gRPC生態中的中介軟體(是不是和web中介軟體非常類似),攔截器一般在做統一介面的認證工作
假設有一個方法handler(ctx context.Context)
,我想要給這個方法賦予一個能力:允許在這個方法之前列印一行日誌
2. gRPC攔截器分析
以下使用golang來分析一個簡單的攔截器邏輯
2.1 定義結構
我們定義一個結構interceptor
,這個結構包含兩個引數,一個上下文資訊context
和處理器handler
函式
type handler func(ctx context.Context)
type interceptor func(ctx context.Context, h handler)
2.2 申明賦值
接下來,為了實現我們的目標,對每個handler
的每個操作,我們都需要經過攔截器,於是我們宣告兩個interceptor
和handler
的變數並賦值
var h = func(ctx context.Context) { fmt.Println("some logic ...") } var interceptor1 = func(ctx context.Context, h handler) { fmt.Println("intercept!") h(ctx) }
2.3 編寫執行函式
我們執行一下函式,測試效果
func main() { var ctx context.Context var ceps []interceptor var h = func(ctx context.Context) { fmt.Println("some logic ...") } var interceptor1 = func(ctx context.Context, h handler) { fmt.Println("intercept!") h(ctx) } ceps = append(ceps, interceptor1) for _, cep := range ceps { cep(ctx, h) } }
輸出結果為:
$ go run main.go
intercept!
some logic ...
看起來我們的攔截器已經生效了,我們在ceps
陣列中再增加一個攔截器,看看會發生什麼
var interceptor2 = func(ctx context.Context, h handler) {
fmt.Println("intercept_2!")
h(ctx)
}
ceps = append(ceps, interceptor2)
輸出結果為:
$ go run main.go
intercept_1!
some logic ...
intercept_2!
some logic ...
可以看到,輸出結果明顯是不符合邏輯的
我們認為的攔截器是什麼?不管我們中間經過了多少個攔截器的處理,都要保證handler
函式只執行一次,也就是我們的業務邏輯只能執行一次
2.4 gRPC-go
在gRPC-go的原始碼裡有一個函式chainUnaryClientInterceptors(cc)
,看函式名字也能猜出來是做什麼
這個函式就是把所有的攔截器串聯成了一個攔截器,這樣保證了請求會經過所有攔截器,而最終handler
函式只會被最後執行一次
那麼將所有攔截器串聯是如何做到的呢?
來看看這個函式的實現:
// chainUnaryClientInterceptors chains all unary client interceptors into one.
func chainUnaryClientInterceptors(cc *ClientConn) {
interceptors := cc.dopts.chainUnaryInts
// Prepend dopts.unaryInt to the chaining interceptors if it exists,
// since unaryInt will be executed before any other chained interceptors.
if cc.dopts.unaryInt != nil {
interceptors = append([]UnaryClientInterceptor{cc.dopts.unaryInt}, interceptors...)
}
var chaindInt UnaryClientInterceptor
if len(interceptors) == 0 {
chaindInt = nil
} else if len(interceptors) == 1 {
chaindInt = interceptors[0]
} else {
chaindInt = func(ctx context.Context, method string, req, reply interface{},
cc *ClientConn, invoker UnaryInvoker, opts ...CallOption) error {
return interceptors[0](ctx, method, req, reply, cc, getChainUnaryInvoker(interceptors, 0, invoker), opts...)
}
}
cc.dopts.unaryInt = chaindInt
}
重點在第二個if-else判斷上,我們可以看到當攔截器數目超過一個時,會呼叫getChainUnaryInvoker()
這個函式,再繼續看看這個函式是如何把攔截器串聯起來的
// getChainUnaryInvoker recursively generate the chained unary invoker.
func getChainUnaryInvoker(interceptors []UnaryClientInterceptor, curr int, finalInvoker UnaryInvoker) UnaryInvoker {
if curr == len(interceptors)-1 {
return finalInvoker
}
return func(ctx context.Context, method string, req, reply interface{}, cc *ClientConn, opts ...CallOption) error {
return interceptors[curr+1](ctx, method, req, reply, cc, getChainUnaryInvoker(interceptors, curr+1, finalInvoker), opts...)
}
}
可以看到getChainUnaryInvoker()
其實就是一個遞迴函式,它返回了一個UnaryInvoker
,其也是一個函式
type UnaryInvoker func(ctx context.Context, method string, req, reply interface{}, cc *ClientConn, opts ...CallOption) error
實際上這個UnaryInvoker
函式例項化時會呼叫第curr+1
個interceptor
,也就會最終返回一個鏈式結構:
最終將這個finalInvoker
賦值給了cc.dopts.unaryInt
,但注意到此時並沒有呼叫攔截器,那麼什麼時候開始呼叫的呢?
chained攔截器在下面這個Invoke()
函式中實現了真正的攔截器邏輯
err := c.cc.Invoke(ctx, "/helloworld.Greeter/SayHello", in, out, opts...)
func (cc *ClientConn) Invoke(ctx context.Context, method string, args, reply interface{}, opts ...CallOption) error {
opts = combine(cc.dopts.callOptions, opts)
if cc.dopts.unaryInt != nil {
return cc.dopts.unaryInt(ctx, method, args, reply, cc, invoke, opts...)
}
return invoke(ctx, method, args, reply, cc, opts...)
}
還記得cc.dopts.unaryInt
是什麼嗎?它就是我們最終生成的串聯攔截器結構,從這個入口進行呼叫攔截器,最終就會呼叫所有的攔截器,而最後再執行invoke()
這個核心業務邏輯
3. 實現一個攔截器
3.1 重新定義資料結構
我們之前的問題是,如何保證handler
只執行一遍?
這裡我們將原來的handler
進行拆解,成為invoker
,然後重新定義一個handler
,用於在invoker
之前處理一些邏輯
type invoker func(ctx context.Context, interceptors []interceptor, h handler) error
type handler func(ctx context.Context)
type interceptor func(ctx context.Context, h handler, ivk invoker) error
3.2 串聯所有攔截器
接下來我們實現一個把所有攔截器串聯起來的方法
func getInvoker(ctx context.Context, interceptors []interceptor, curr int, ivk invoker) invoker {
if curr == len(interceptors)-1 {
return ivk
}
return func(ctx context.Context, interceptors []interceptor, h handler) error {
return interceptors[curr+1](ctx, h, getInvoker(ctx, interceptors, curr+1, ivk))
}
}
3.3 返回第一個interceptor作為入口
func getChainInterceptor(ctx context.Context, interceptors []interceptor, ivk invoker) interceptor {
if len(interceptors) == 0 {
return nil
} else if len(interceptors) == 1 {
return interceptors[0]
} else {
return func(ctx context.Context, h handler, ivk invoker) error {
return interceptors[0](ctx, h, getInvoker(ctx, interceptors, 0, ivk))
}
}
}
3.4 測試
我們還是定義兩個攔截器,看看是否將會串聯執行
func main() {
var ctx context.Context
var ceps []interceptor
var h = func(ctx context.Context) {
fmt.Println("some logic before ...")
}
var interceptor1 = func(ctx context.Context, h handler, ivk invoker) error {
h(ctx)
return ivk(ctx, ceps, h)
}
var interceptor2 = func(ctx context.Context, h handler, ivk invoker) error {
h(ctx)
return ivk(ctx, ceps, h)
}
ceps = append(ceps, interceptor1, interceptor2)
var ivk = func(ctx context.Context, interceptors []interceptor, h handler) error {
fmt.Println("invoker start")
return nil
}
cep := getChainInterceptor(ctx, ceps, ivk)
cep(ctx, h, ivk)
}
輸出結果為:
$ go run main.go
some logic before ...
some logic before ...
invoker start
我們可以看到在呼叫真正的業務邏輯函式invoker()
之前,呼叫了兩個攔截器,而業務邏輯只被執行了一次,這就實現了一個簡單的攔截器
參考:
https://zhuanlan.zhihu.com/p/80023990
https://zhuanlan.zhihu.com/p/376438559
https://blog.csdn.net/Gassuih/article/details/116146535