1. 程式人生 > >rpcx服務框架淺析11-呼叫攔截鏈

rpcx服務框架淺析11-呼叫攔截鏈

RPCX分散式服務框架主要致力於提供高效能和透明化的RPC遠端服務呼叫。

      攔截器 :是在面向切面程式設計中應用的,就是在你的service或者一個方法前呼叫一個方法,或者在方法後呼叫一個方法。比如java的動態代理就是攔截器的簡單實現,在你呼叫方法前打印出字串(或者做其它業務邏輯的操作),也可以在你呼叫方法後打印出字串。RPCX框架中並沒有呼叫攔截的處理(掃原始碼好像沒看到)

      攔截鏈:多個攔截器構成一個鏈,完成一組攔截,可能還有執行的先後順序(任務職責鏈)。

攔截鏈簡單實現

輸入引數:

type CmdIn struct {
	Param string
}

func (this *CmdIn) String() string {
	return fmt.Sprintf("[CmdIn Param is %s]",this.Param)
}

輸出引數:

type CmdOut struct {
	Result int
	Info string
}

func (this *CmdOut) String() string {
	return fmt.Sprintf("Result[%d] Info[%s]",this.Result,this.Info)
}

處理Handler:

type Hello struct {
	Param string
}

func (this *Hello) Test1(ctx context.Context, in *CmdIn, out *CmdOut) error {
	fmt.Println("接收到服務消費方輸入:",in.Param)
	out.Result=12345678
	out.Info="我已經吃飯了!"
	return nil
}

封裝Invoker:

package defines

import (
	"fmt"
	"context"
	"reflect"
	"unicode"
	"unicode/utf8"
	"container/list"
)

var typeOfError = reflect.TypeOf((*error)(nil)).Elem()
var typeOfContext = reflect.TypeOf((*context.Context)(nil)).Elem()

type MethodType struct {
	method			reflect.Method
	ArgType    		reflect.Type
	ReplyType  		reflect.Type
}

type Reflect struct {
	Name     string
	Rcvr     reflect.Value
	Typ      reflect.Type
	Method   map[string]*MethodType
	filter   *list.List
}

func (this *Reflect) Setup(rcvr interface{}, name string,filter *list.List) {
	this.Typ = reflect.TypeOf(rcvr)
	this.Rcvr = reflect.ValueOf(rcvr)
	this.Name = reflect.Indirect(this.Rcvr).Type().Name()
	this.Method = suitableMethods(this.Typ, true)
	this.filter=filter
}

func (this *Reflect) Call(ctx context.Context,service string,method string,in *CmdIn,out *CmdOut) error {
	for e := this.filter.Front(); e != nil; e = e.Next() {
		f:=e.Value.(Filter)
		err:=f.Invoke(ctx,service,method,in,out)
		if err!=nil{
			return err
		}
	}

	mtype := this.Method[method]
	returnValues := mtype.method.Func.Call([]reflect.Value{this.Rcvr, reflect.ValueOf(ctx), reflect.ValueOf(in), reflect.ValueOf(out)})
	errInter := returnValues[0].Interface()
	if errInter != nil {
		return errInter.(error)
	}else{
		return nil
	}
}

func suitableMethods(typ reflect.Type, reportErr bool) map[string]*MethodType {
	methods := make(map[string]*MethodType)
	for m := 0; m < typ.NumMethod(); m++ {
		method := typ.Method(m)
		mtype := method.Type
		mname := method.Name
		if method.PkgPath != "" {
			continue
		}
		if mtype.NumIn() != 4 {
			if reportErr {
				fmt.Println("method", mname, "has wrong number of ins:", mtype.NumIn())
			}
			continue
		}
		ctxType := mtype.In(1)
		if !ctxType.Implements(typeOfContext) {
			if reportErr {
				fmt.Println("method", mname, " must use context.Context as the first parameter")
			}
			continue
		}

		argType := mtype.In(2)
		if !isExportedOrBuiltinType(argType) {
			if reportErr {
				fmt.Println(mname, "parameter type not exported:", argType)
			}
			continue
		}
		// Third arg must be a pointer.
		replyType := mtype.In(3)
		if replyType.Kind() != reflect.Ptr {
			if reportErr {
				fmt.Println("method", mname, "reply type not a pointer:", replyType)
			}
			continue
		}
		// Reply type must be exported.
		if !isExportedOrBuiltinType(replyType) {
			if reportErr {
				fmt.Println("method", mname, "reply type not exported:", replyType)
			}
			continue
		}
		// Method needs one out.
		if mtype.NumOut() != 1 {
			if reportErr {
				fmt.Println("method", mname, "has wrong number of outs:", mtype.NumOut())
			}
			continue
		}
		// The return type of the method must be error.
		if returnType := mtype.Out(0); returnType != typeOfError {
			if reportErr {
				fmt.Println("method", mname, "returns", returnType.String(), "not error")
			}
			continue
		}
		methods[mname] = &MethodType{method: method, ArgType: argType, ReplyType: replyType}
	}
	return methods
}

func isExportedOrBuiltinType(t reflect.Type) bool {
	for t.Kind() == reflect.Ptr {
		t = t.Elem()
	}
	return isExported(t.Name()) || t.PkgPath() == ""
}

func isExported(name string) bool {
	rune, _ := utf8.DecodeRuneInString(name)
	return unicode.IsUpper(rune)
}

呼叫示例:

func TestReflect_Setup(t *testing.T) {
	filters:=list.New()                    //攔截鏈
	filters.PushBack(&LimitFilter{})       //攔截元件1
	filters.PushBack(&StatisticsFilter{})  //攔截元件2

	hello:=&Hello{}
	reflect:=&Reflect{}
	reflect.Setup(hello,"Hello",filters)

	fmt.Println("發起消費方呼叫")
	in:=&CmdIn{
		Param:"吃飯了嗎?",
	}
	out:=&CmdOut{}
	err:=reflect.Call(context.Background(),"Hello","Test1",in,out)
	if err!=nil {
		fmt.Println(err)
	}else{
		fmt.Println("接收到服務提供方返回:",out)
	}
}

程式執行結果:

1.發起消費方呼叫2.攔截鏈 LimitFilter process3.攔截鏈 StatisticsFilter process4.接收到服務消費方輸入: 吃飯了嗎?5.接收到服務提供方返回: Result[12345678] Info[我已經吃飯了!]