1. 程式人生 > 實用技巧 >微服務常用中介軟體

微服務常用中介軟體

前置中介軟體

func PrepareMiddleware(next MiddlewareFunc) MiddlewareFunc {
	return func(ctx context.Context, req interface{}) (resp interface{}, err error) {

		//處理traceId
		var traceId string
		//從ctx獲取grpc的metadata
		md, ok := metadata.FromIncomingContext(ctx)
		if ok {
			vals, ok := md[util.TraceID]
			if ok && len(vals) > 0 {
				traceId = vals[0]
			}
		}

		if len(traceId) == 0 {
			traceId = logs.GenTraceId()
		}

		ctx = logs.WithFieldContext(ctx) // 從ctx拿出field 沒有新建一個
		ctx = logs.WithTraceId(ctx, traceId)	// 從traceId放入ctx
		resp, err = next(ctx, req)
		return
	}
}

限流中介軟體

計數限流

type CounterLimit struct {
	counter      int64 //計數器
	limit        int64 //指定時間視窗內允許的最大請求數
	intervalNano int64 //指定的時間視窗
	unixNano     int64 //unix時間戳,單位為納秒
}

func NewCounterLimit(interval time.Duration, limit int64) *CounterLimit {

	return &CounterLimit{
		counter:      0,
		limit:        limit,
		intervalNano: int64(interval),
		unixNano:     time.Now().UnixNano(),
	}
}

func (c *CounterLimit) Allow() bool {

	now := time.Now().UnixNano()
	if now-c.unixNano > c.intervalNano { //如果當前過了當前的時間視窗,則重新進行計數
		atomic.StoreInt64(&c.counter, 0)
		atomic.StoreInt64(&c.unixNano, now)
		return true
	}

	atomic.AddInt64(&c.counter, 1)
	return c.counter < c.limit //判斷是否要進行限流
}

桶限流

type BucketLimit struct {
	rate       float64 //漏桶中水的漏出速率
	bucketSize float64 //漏桶最多能裝的水大小
	unixNano   int64   //unix時間戳
	curWater   float64 //當前桶裡面的水
}

func NewBucketLimit(rate float64, bucketSize int64) *BucketLimit {
	return &BucketLimit{
		bucketSize: float64(bucketSize),
		rate:       rate,
		unixNano:   time.Now().UnixNano(),
		curWater:   0,
	}
}

func (b *BucketLimit) reflesh() {
	now := time.Now().UnixNano()
	//時間差, 把納秒換成秒
	diffSec := float64(now-b.unixNano) / 1000 / 1000 / 1000
	b.curWater = math.Max(0, b.curWater-diffSec*b.rate)
	b.unixNano = now
	return
}

func (b *BucketLimit) Allow() bool {
	b.reflesh()
	if b.curWater < b.bucketSize {
		b.curWater = b.curWater + 1
		return true
	}

	return false
}

令牌桶限流

  • 基於golang.org/x/time/rate
import "golang.org/x/time/rate"

var limiter = rate.NewLimiter(50,100)

中介軟體實現

type Limiter interface { 
	Allow() bool
}

func NewRateLimitMiddleware(l Limiter) Middleware {
	return func(next MiddlewareFunc) MiddlewareFunc {
		return func(ctx context.Context, req interface{}) (resp interface{}, err error) {
			allow := l.Allow()
			if !allow {
				err = status.Error(codes.ResourceExhausted, "rate limited")
				return
			}

			return next(ctx, req)
		}
	}
}

熔斷器中介軟體

配置

  • 基於github.com/afex/hystrix-go/hystrix
hystrix.ConfigureCommand("服務名", hystrix.CommandConfig{
    Timeout:	1000, 				   // 超時配置,預設1000ms
    MaxConcurrentRequests:	100, 		// 併發控制,預設是10
    RequestVolumeThreshold:	500, 		// 熔斷器開啟之後,冷卻的時間,預設是500ms
    SleepWindow:30,     			   // 一個統計視窗的請求數量,預設是20
    ErrorPercentThreshold:50, 		   // 失敗百分比,預設是50%
})

觸發條件

  • 一個統計視窗內,請求數量大於RequestVolumeThreshold,且失敗率大於ErrorPercentThreshold, 才會觸發熔斷

中介軟體實現

func HystrixMiddleware(next MiddlewareFunc) MiddlewareFunc {
	return func(ctx context.Context, req interface{}) (resp interface{},err error) {

		rpcMeta := meta.GetRpcMeta(ctx)

		hystrixErr := hystrix.Do(rpcMeta.ServiceName, func() (err error) {
			resp, err = next(ctx, req)
			return err
		}, nil)

		if hystrixErr != nil {
             err = hystrixErr
			return
		}

		return 
	}
}

服務註冊與發現中介軟體

配置

https://www.cnblogs.com/zhaohaiyu/p/13566315.html

中介軟體實現

func NewDiscoveryMiddleware(discovery registry.Registry) Middleware {
	return func(next MiddlewareFunc) MiddlewareFunc {
		return func(ctx context.Context, req interface{}) (resp interface{}, err error) {
			//從ctx獲取rpc的metadata
			rpcMeta := meta.GetRpcMeta(ctx)
			if len(rpcMeta.AllNodes) > 0 {
				return next(ctx, req)
			}

			service, err := discovery.GetService(ctx, rpcMeta.ServiceName)
			if err != nil {
				logs.Error(ctx, "discovery service:%s failed, err:%v", rpcMeta.ServiceName, err)
				return
			}

			rpcMeta.AllNodes = service.Nodes
			resp, err = next(ctx, req)
			return
		}
	}
}

短連線中介軟體

配置

type RpcMeta struct {
	//呼叫方名字
	Caller string
	//服務提供方
	ServiceName string
	//呼叫的方法
	Method string
	//呼叫方叢集
	CallerCluster string
	//服務提供方叢集
	ServiceCluster string
	//TraceID
	TraceID string
	//環境
	Env string
	//呼叫方IDC
	CallerIDC string
	//服務提供方IDC
	ServiceIDC string
	//當前節點
	CurNode *registry.Node
	//歷史選擇節點
	HistoryNodes []*registry.Node
	//服務提供方的節點列表
	AllNodes []*registry.Node
	//當前請求使用的連線
	Conn *grpc.ClientConn
}

type rpcMetaContextKey struct{}

func GetRpcMeta(ctx context.Context) *RpcMeta {
	meta, ok := ctx.Value(rpcMetaContextKey{}).(*RpcMeta)
	if !ok {
		meta = &RpcMeta{}
	}

	return meta
}

中介軟體實現

func ShortConnectMiddleware(next MiddlewareFunc) MiddlewareFunc {
	return func(ctx context.Context, req interface{}) (resp interface{}, err error) {
		//從ctx獲取rpc的metadata
		rpcMeta := meta.GetRpcMeta(ctx)
		if rpcMeta.CurNode == nil{
			err = errno.InvalidNode
			logs.Error(ctx, "invalid instance")
			return
		}

		address := fmt.Sprintf("%s:%d", rpcMeta.CurNode.IP, rpcMeta.CurNode.Port)
		conn, err := grpc.Dial(address, grpc.WithInsecure())
		if err != nil {
			logs.Error(ctx, "connect %s failed, err:%v", address, err)
			return nil, errno.ConnFailed
		}

		rpcMeta.Conn = conn
		defer conn.Close()
		resp, err = next(ctx, req)
		return
	}
}

負載均衡中介軟體

配置

選擇

const (
	DefaultNodeWeight = 100
)

type LoadBalanceType int

const (
	LoadBalanceTypeRandom = iota
	LoadBalanceTypeRoundRobin 
)

type LoadBalance interface {
	Name() string
	Select(ctx context.Context, nodes []*registry.Node) (node *registry.Node, err error)
}

func GetLoadBalance(balanceType LoadBalanceType) (balancer LoadBalance) {

	switch (balanceType) {
		 case LoadBalanceTypeRandom:
			 balancer = NewRandomBalance()
		 case LoadBalanceTypeRoundRobin:
			 balancer = NewRoundRobinBalance()
		 default:
			 balancer = NewRandomBalance()
	}
	return
}

randombalance

type RandomBalance struct {
	name string
}

func NewRandomBalance() LoadBalance {
	return &RandomBalance{
		name: "random",
	}
}

func (r *RandomBalance) Name() string {
	return r.name
}

func (r *RandomBalance) Select(ctx context.Context, nodes []*registry.Node) (node *registry.Node, err error) {

	if len(nodes) == 0 {
		err = errno.NotHaveInstance
		return
	}

	defer func() {
		if node != nil {
			setSelected(ctx, node)
		}
	}()

	var newNodes  = filterNodes(ctx, nodes)
	if len(newNodes) == 0 {
		err = errno.AllNodeFailed
		return
	}

	var totalWeight int
	for _, val := range newNodes {
		if val.Weight == 0 {
			val.Weight = DefaultNodeWeight
		}
		totalWeight += val.Weight
	}

	curWeight := rand.Intn(totalWeight)
	curIndex := -1
	for index, node := range nodes {
		curWeight -= node.Weight
		if curWeight < 0 {
			curIndex = index
			break
		}
	}

	if curIndex == -1 {
		err = errno.AllNodeFailed
		return
	}

	node = nodes[curIndex]
	return
}

roundrobin

type RoundRobinBalance struct {
	name  string
	index int
}

func NewRoundRobinBalance() LoadBalance {
	return &RoundRobinBalance{
		name: "roundrobin",
	}
}

func (r *RoundRobinBalance) Name() string {
	return r.name
}

func (r *RoundRobinBalance) Select(ctx context.Context, nodes []*registry.Node) (node *registry.Node, err error) {

	if len(nodes) == 0 {
		err = errno.NotHaveInstance
		return
	}

	defer func() {
		if node != nil {
			setSelected(ctx, node)
		}
	}()

	var newNodes = filterNodes(ctx, nodes)
	if len(newNodes) == 0 {
		err = errno.AllNodeFailed
		return
	}

	r.index = (r.index + 1) % len(nodes)
	node = nodes[r.index]
	return
}

中介軟體實現

func NewLoadBalanceMiddleware(balancer loadbalance.LoadBalance) Middleware {
	return func(next MiddlewareFunc) MiddlewareFunc {
		return func(ctx context.Context, req interface{}) (resp interface{}, err error) {
			//從ctx獲取rpc的metadata
			rpcMeta := meta.GetRpcMeta(ctx)
			if len(rpcMeta.AllNodes) == 0 {
				err = errno.NotHaveInstance
				logs.Error(ctx, "not have instance")
				return
			}
			//生成loadbalance的上下文,用來過濾已經選擇的節點
			ctx = loadbalance.WithBalanceContext(ctx)
			for {
				rpcMeta.CurNode, err = balancer.Select(ctx, rpcMeta.AllNodes)
				if err != nil {
					return
				}

				logs.Debug(ctx, "select node:%#v", rpcMeta.CurNode)
				rpcMeta.HistoryNodes = append(rpcMeta.HistoryNodes, rpcMeta.CurNode)
				resp, err = next(ctx, req)
				if err != nil {
					//連線錯誤的話,進行重試
					if errno.IsConnectError(err) {
						continue
					}
					return
				}
				break
			}
			return
		}
	}
}

監測中介軟體

  • 基於prometheus

配置

// 服務端取樣打點
type Metrics struct {
	requestCounter *prom.CounterVec
	codeCounter    *prom.CounterVec
	latencySummary *prom.SummaryVec
}

//生成server metrics例項
func NewServerMetrics() *Metrics {
	return &Metrics{
		requestCounter: promauto.NewCounterVec(
			prom.CounterOpts{
				Name: "zhy_server_request_total",
				Help: "Total number of RPCs completed on the server, regardless of success or failure.",
			}, []string{"service", "method"}),
		codeCounter: promauto.NewCounterVec(
			prom.CounterOpts{
				Name: "zhy_server_handled_code_total",
				Help: "Total number of RPCs completed on the server, regardless of success or failure.",
			}, []string{"service", "method", "grpc_code"}),
		latencySummary: promauto.NewSummaryVec(
			prom.SummaryOpts{
				Name:       "zhy_proc_cost",
				Help:       "RPC latency distributions.",
				Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.99: 0.001},
			},
			[]string{"service", "method"},
		),
	}
}

//生成server metrics例項
func NewRpcMetrics() *Metrics {
	return &Metrics{
		requestCounter: promauto.NewCounterVec(
			prom.CounterOpts{
				Name: "zhy_rpc_call_total",
				Help: "Total number of RPCs completed on the server, regardless of success or failure.",
			}, []string{"service", "method"}),
		codeCounter: promauto.NewCounterVec(
			prom.CounterOpts{
				Name: "zhy_rpc_code_total",
				Help: "Total number of RPCs completed on the server, regardless of success or failure.",
			}, []string{"service", "method", "grpc_code"}),
		latencySummary: promauto.NewSummaryVec(
			prom.SummaryOpts{
				Name:       "zhy_rpc_cost",
				Help:       "RPC latency distributions.",
				Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.99: 0.001},
			},
			[]string{"service", "method"},
		),
	}
}

func (m *Metrics) IncrRequest(ctx context.Context, serviceName, methodName string) {
	m.requestCounter.WithLabelValues(serviceName, methodName).Inc()
}

func (m *Metrics) IncrCode(ctx context.Context, serviceName, methodName string, err error) {
	st, _ := status.FromError(err)
	m.codeCounter.WithLabelValues(serviceName, methodName, st.Code().String()).Inc()
}

func (m *Metrics) Latency(ctx context.Context, serviceName, methodName string, us int64) {

	m.latencySummary.WithLabelValues(serviceName, methodName).Observe(float64(us))
}

中介軟體實現

var (
	DefaultServerMetrics = prometheus.NewServerMetrics()
	DefaultRpcMetrics    = prometheus.NewRpcMetrics()
)

func init() {
	go func() {
		http.Handle("/metrics", promhttp.Handler())
		addr := fmt.Sprintf("0.0.0.0:%d", 8888)
		http.ListenAndServe(addr, nil)
	}()
}

// 服務端中介軟體
func PrometheusServerMiddleware(next MiddlewareFunc) MiddlewareFunc {
	return func(ctx context.Context, req interface{}) (resp interface{}, err error) {

		serverMeta := meta.GetServerMeta(ctx)
		DefaultServerMetrics.IncrRequest(ctx, serverMeta.ServiceName, serverMeta.Method)

		startTime := time.Now()
		resp, err = next(ctx, req)

		DefaultServerMetrics.IncrCode(ctx, serverMeta.ServiceName, serverMeta.Method, err)
		DefaultServerMetrics.Latency(ctx, serverMeta.ServiceName,
			serverMeta.Method, time.Since(startTime).Nanoseconds()/1000)
		return
	}
}

// 客戶端的中介軟體
func PrometheusClientMiddleware(next MiddlewareFunc) MiddlewareFunc {
	return func(ctx context.Context, req interface{}) (resp interface{}, err error) {

		rpcMeta := meta.GetRpcMeta(ctx)
		DefaultRpcMetrics.IncrRequest(ctx, rpcMeta.ServiceName, rpcMeta.Method)

		startTime := time.Now()
		resp, err = next(ctx, req)

		DefaultRpcMetrics.IncrCode(ctx, rpcMeta.ServiceName, rpcMeta.Method, err)
		DefaultRpcMetrics.Latency(ctx, rpcMeta.ServiceName,
			rpcMeta.Method, time.Since(startTime).Nanoseconds()/1000)
		return
	}
}

分散式追蹤中介軟體

配置

const (
	binHdrSuffix = "-bin"
)

// metadataTextMap extends a metadata.MD to be an opentracing textmap
type metadataTextMap metadata.MD

// Set is a opentracing.TextMapReader interface that extracts values.
func (m metadataTextMap) Set(key, val string) {
	// gRPC allows for complex binary values to be written.
	encodedKey, encodedVal := encodeKeyValue(key, val)
	// The metadata object is a multimap, and previous values may exist, but for opentracing headers, we do not append
	// we just override.
	m[encodedKey] = []string{encodedVal}
}

// ForeachKey is a opentracing.TextMapReader interface that extracts values.
func (m metadataTextMap) ForeachKey(callback func(key, val string) error) error {
	for k, vv := range m {
		for _, v := range vv {
			if decodedKey, decodedVal, err := metadata.DecodeKeyValue(k, v); err == nil {
				if err = callback(decodedKey, decodedVal); err != nil {
					return err
				}
			} else {
				return fmt.Errorf("failed decoding opentracing from gRPC metadata: %v", err)
			}
		}
	}
	return nil
}

// encodeKeyValue encodes key and value qualified for transmission via gRPC.
// note: copy pasted from private values of grpc.metadata
func encodeKeyValue(k, v string) (string, string) {
	k = strings.ToLower(k)
	if strings.HasSuffix(k, binHdrSuffix) {
		val := base64.StdEncoding.EncodeToString([]byte(v))
		v = string(val)
	}
	return k, v
}


func InitTrace(serviceName, reportAddr, sampleType string, rate float64) (err error) {

	transport := transport.NewHTTPTransport(
		reportAddr,
		transport.HTTPBatchSize(16),
	)


	cfg := &config.Configuration{
		Sampler: &config.SamplerConfig{
			Type:  sampleType,
			Param: rate,
		},
		Reporter: &config.ReporterConfig{
			LogSpans: true,
		},
	}

	r := jaeger.NewRemoteReporter(transport)
	tracer, closer, err := cfg.New(serviceName,
		config.Logger(jaeger.StdLogger),
		config.Reporter(r))
	if err != nil {
		fmt.Printf("ERROR: cannot init Jaeger: %v\n", err)
		return
	}

	_ = closer
	opentracing.SetGlobalTracer(tracer)
	return
}

中介軟體實現

func TraceServerMiddleware(next MiddlewareFunc) MiddlewareFunc {
	return func(ctx context.Context, req interface{}) (resp interface{}, err error) {
		//從ctx獲取grpc的metadata
		md, ok := metadata.FromIncomingContext(ctx)
		if !ok {
			//沒有的話,新建一個
			md = metadata.Pairs()
		}

		tracer := opentracing.GlobalTracer()
		parentSpanContext, err := tracer.Extract(opentracing.HTTPHeaders, metadataTextMap(md))
		if err != nil && err != opentracing.ErrSpanContextNotFound {
			logs.Warn(ctx, "trace extract failed, parsing trace information: %v", err)
		}

		serverMeta := meta.GetServerMeta(ctx)
		//開始追蹤該方法
		serverSpan := tracer.StartSpan(
			serverMeta.Method,
			ext.RPCServerOption(parentSpanContext),
			ext.SpanKindRPCServer,
		)

		serverSpan.SetTag(util.TraceID, logs.GetTraceId(ctx))
		ctx = opentracing.ContextWithSpan(ctx, serverSpan)
		resp, err = next(ctx, req)
		//記錄錯誤
		if err != nil {
			ext.Error.Set(serverSpan, true)
			serverSpan.LogFields(log.String("event", "error"), log.String("message", err.Error()))
		}

		serverSpan.Finish()
		return
	}
}

func TraceClientMiddleware(next MiddlewareFunc) MiddlewareFunc {
	return func(ctx context.Context, req interface{}) (resp interface{}, err error) {

		tracer := opentracing.GlobalTracer()
		var parentSpanCtx opentracing.SpanContext
		if parent := opentracing.SpanFromContext(ctx); parent != nil {
			parentSpanCtx = parent.Context()
		}

		opts := []opentracing.StartSpanOption{
			opentracing.ChildOf(parentSpanCtx),
			ext.SpanKindRPCClient,
			opentracing.Tag{Key: string(ext.Component), Value: "koala_rpc"},
			opentracing.Tag{Key: util.TraceID, Value: logs.GetTraceId(ctx)},
		}

		rpcMeta := meta.GetRpcMeta(ctx)
		clientSpan := tracer.StartSpan(rpcMeta.ServiceName, opts...)

		md, ok := metadata.FromOutgoingContext(ctx)
		if !ok {
			md = metadata.Pairs()
		}

		if err := tracer.Inject(clientSpan.Context(), opentracing.HTTPHeaders, metadataTextMap(md)); err != nil {
			logs.Debug(ctx, "grpc_opentracing: failed serializing trace information: %v", err)
		}

		ctx = metadata.NewOutgoingContext(ctx, md)
		ctx = metadata.AppendToOutgoingContext(ctx, util.TraceID, logs.GetTraceId(ctx))
		ctx = opentracing.ContextWithSpan(ctx, clientSpan)

		resp, err = next(ctx, req)
		//記錄錯誤
		if err != nil {
			ext.Error.Set(clientSpan, true)
			clientSpan.LogFields(log.String("event", "error"), log.String("message", err.Error()))
		}

		clientSpan.Finish()
		return
	}
}