微服務常用中介軟體
阿新 • • 發佈:2020-09-09
前置中介軟體
func PrepareMiddleware(next MiddlewareFunc) MiddlewareFunc { return func(ctx context.Context, req interface{}) (resp interface{}, err error) { //處理traceId var traceId string //從ctx獲取grpc的metadata md, ok := metadata.FromIncomingContext(ctx) if ok { vals, ok := md[util.TraceID] if ok && len(vals) > 0 { traceId = vals[0] } } if len(traceId) == 0 { traceId = logs.GenTraceId() } ctx = logs.WithFieldContext(ctx) // 從ctx拿出field 沒有新建一個 ctx = logs.WithTraceId(ctx, traceId) // 從traceId放入ctx resp, err = next(ctx, req) return } }
限流中介軟體
計數限流
type CounterLimit struct { counter int64 //計數器 limit int64 //指定時間視窗內允許的最大請求數 intervalNano int64 //指定的時間視窗 unixNano int64 //unix時間戳,單位為納秒 } func NewCounterLimit(interval time.Duration, limit int64) *CounterLimit { return &CounterLimit{ counter: 0, limit: limit, intervalNano: int64(interval), unixNano: time.Now().UnixNano(), } } func (c *CounterLimit) Allow() bool { now := time.Now().UnixNano() if now-c.unixNano > c.intervalNano { //如果當前過了當前的時間視窗,則重新進行計數 atomic.StoreInt64(&c.counter, 0) atomic.StoreInt64(&c.unixNano, now) return true } atomic.AddInt64(&c.counter, 1) return c.counter < c.limit //判斷是否要進行限流 }
桶限流
type BucketLimit struct { rate float64 //漏桶中水的漏出速率 bucketSize float64 //漏桶最多能裝的水大小 unixNano int64 //unix時間戳 curWater float64 //當前桶裡面的水 } func NewBucketLimit(rate float64, bucketSize int64) *BucketLimit { return &BucketLimit{ bucketSize: float64(bucketSize), rate: rate, unixNano: time.Now().UnixNano(), curWater: 0, } } func (b *BucketLimit) reflesh() { now := time.Now().UnixNano() //時間差, 把納秒換成秒 diffSec := float64(now-b.unixNano) / 1000 / 1000 / 1000 b.curWater = math.Max(0, b.curWater-diffSec*b.rate) b.unixNano = now return } func (b *BucketLimit) Allow() bool { b.reflesh() if b.curWater < b.bucketSize { b.curWater = b.curWater + 1 return true } return false }
令牌桶限流
- 基於
golang.org/x/time/rate
包
import "golang.org/x/time/rate"
var limiter = rate.NewLimiter(50,100)
中介軟體實現
type Limiter interface {
Allow() bool
}
func NewRateLimitMiddleware(l Limiter) Middleware {
return func(next MiddlewareFunc) MiddlewareFunc {
return func(ctx context.Context, req interface{}) (resp interface{}, err error) {
allow := l.Allow()
if !allow {
err = status.Error(codes.ResourceExhausted, "rate limited")
return
}
return next(ctx, req)
}
}
}
熔斷器中介軟體
配置
- 基於
github.com/afex/hystrix-go/hystrix
包
hystrix.ConfigureCommand("服務名", hystrix.CommandConfig{
Timeout: 1000, // 超時配置,預設1000ms
MaxConcurrentRequests: 100, // 併發控制,預設是10
RequestVolumeThreshold: 500, // 熔斷器開啟之後,冷卻的時間,預設是500ms
SleepWindow:30, // 一個統計視窗的請求數量,預設是20
ErrorPercentThreshold:50, // 失敗百分比,預設是50%
})
觸發條件
- 一個統計視窗內,請求數量大於
RequestVolumeThreshold
,且失敗率大於ErrorPercentThreshold
, 才會觸發熔斷
中介軟體實現
func HystrixMiddleware(next MiddlewareFunc) MiddlewareFunc {
return func(ctx context.Context, req interface{}) (resp interface{},err error) {
rpcMeta := meta.GetRpcMeta(ctx)
hystrixErr := hystrix.Do(rpcMeta.ServiceName, func() (err error) {
resp, err = next(ctx, req)
return err
}, nil)
if hystrixErr != nil {
err = hystrixErr
return
}
return
}
}
服務註冊與發現中介軟體
配置
https://www.cnblogs.com/zhaohaiyu/p/13566315.html
中介軟體實現
func NewDiscoveryMiddleware(discovery registry.Registry) Middleware {
return func(next MiddlewareFunc) MiddlewareFunc {
return func(ctx context.Context, req interface{}) (resp interface{}, err error) {
//從ctx獲取rpc的metadata
rpcMeta := meta.GetRpcMeta(ctx)
if len(rpcMeta.AllNodes) > 0 {
return next(ctx, req)
}
service, err := discovery.GetService(ctx, rpcMeta.ServiceName)
if err != nil {
logs.Error(ctx, "discovery service:%s failed, err:%v", rpcMeta.ServiceName, err)
return
}
rpcMeta.AllNodes = service.Nodes
resp, err = next(ctx, req)
return
}
}
}
短連線中介軟體
配置
type RpcMeta struct {
//呼叫方名字
Caller string
//服務提供方
ServiceName string
//呼叫的方法
Method string
//呼叫方叢集
CallerCluster string
//服務提供方叢集
ServiceCluster string
//TraceID
TraceID string
//環境
Env string
//呼叫方IDC
CallerIDC string
//服務提供方IDC
ServiceIDC string
//當前節點
CurNode *registry.Node
//歷史選擇節點
HistoryNodes []*registry.Node
//服務提供方的節點列表
AllNodes []*registry.Node
//當前請求使用的連線
Conn *grpc.ClientConn
}
type rpcMetaContextKey struct{}
func GetRpcMeta(ctx context.Context) *RpcMeta {
meta, ok := ctx.Value(rpcMetaContextKey{}).(*RpcMeta)
if !ok {
meta = &RpcMeta{}
}
return meta
}
中介軟體實現
func ShortConnectMiddleware(next MiddlewareFunc) MiddlewareFunc {
return func(ctx context.Context, req interface{}) (resp interface{}, err error) {
//從ctx獲取rpc的metadata
rpcMeta := meta.GetRpcMeta(ctx)
if rpcMeta.CurNode == nil{
err = errno.InvalidNode
logs.Error(ctx, "invalid instance")
return
}
address := fmt.Sprintf("%s:%d", rpcMeta.CurNode.IP, rpcMeta.CurNode.Port)
conn, err := grpc.Dial(address, grpc.WithInsecure())
if err != nil {
logs.Error(ctx, "connect %s failed, err:%v", address, err)
return nil, errno.ConnFailed
}
rpcMeta.Conn = conn
defer conn.Close()
resp, err = next(ctx, req)
return
}
}
負載均衡中介軟體
配置
選擇
const (
DefaultNodeWeight = 100
)
type LoadBalanceType int
const (
LoadBalanceTypeRandom = iota
LoadBalanceTypeRoundRobin
)
type LoadBalance interface {
Name() string
Select(ctx context.Context, nodes []*registry.Node) (node *registry.Node, err error)
}
func GetLoadBalance(balanceType LoadBalanceType) (balancer LoadBalance) {
switch (balanceType) {
case LoadBalanceTypeRandom:
balancer = NewRandomBalance()
case LoadBalanceTypeRoundRobin:
balancer = NewRoundRobinBalance()
default:
balancer = NewRandomBalance()
}
return
}
randombalance
type RandomBalance struct {
name string
}
func NewRandomBalance() LoadBalance {
return &RandomBalance{
name: "random",
}
}
func (r *RandomBalance) Name() string {
return r.name
}
func (r *RandomBalance) Select(ctx context.Context, nodes []*registry.Node) (node *registry.Node, err error) {
if len(nodes) == 0 {
err = errno.NotHaveInstance
return
}
defer func() {
if node != nil {
setSelected(ctx, node)
}
}()
var newNodes = filterNodes(ctx, nodes)
if len(newNodes) == 0 {
err = errno.AllNodeFailed
return
}
var totalWeight int
for _, val := range newNodes {
if val.Weight == 0 {
val.Weight = DefaultNodeWeight
}
totalWeight += val.Weight
}
curWeight := rand.Intn(totalWeight)
curIndex := -1
for index, node := range nodes {
curWeight -= node.Weight
if curWeight < 0 {
curIndex = index
break
}
}
if curIndex == -1 {
err = errno.AllNodeFailed
return
}
node = nodes[curIndex]
return
}
roundrobin
type RoundRobinBalance struct {
name string
index int
}
func NewRoundRobinBalance() LoadBalance {
return &RoundRobinBalance{
name: "roundrobin",
}
}
func (r *RoundRobinBalance) Name() string {
return r.name
}
func (r *RoundRobinBalance) Select(ctx context.Context, nodes []*registry.Node) (node *registry.Node, err error) {
if len(nodes) == 0 {
err = errno.NotHaveInstance
return
}
defer func() {
if node != nil {
setSelected(ctx, node)
}
}()
var newNodes = filterNodes(ctx, nodes)
if len(newNodes) == 0 {
err = errno.AllNodeFailed
return
}
r.index = (r.index + 1) % len(nodes)
node = nodes[r.index]
return
}
中介軟體實現
func NewLoadBalanceMiddleware(balancer loadbalance.LoadBalance) Middleware {
return func(next MiddlewareFunc) MiddlewareFunc {
return func(ctx context.Context, req interface{}) (resp interface{}, err error) {
//從ctx獲取rpc的metadata
rpcMeta := meta.GetRpcMeta(ctx)
if len(rpcMeta.AllNodes) == 0 {
err = errno.NotHaveInstance
logs.Error(ctx, "not have instance")
return
}
//生成loadbalance的上下文,用來過濾已經選擇的節點
ctx = loadbalance.WithBalanceContext(ctx)
for {
rpcMeta.CurNode, err = balancer.Select(ctx, rpcMeta.AllNodes)
if err != nil {
return
}
logs.Debug(ctx, "select node:%#v", rpcMeta.CurNode)
rpcMeta.HistoryNodes = append(rpcMeta.HistoryNodes, rpcMeta.CurNode)
resp, err = next(ctx, req)
if err != nil {
//連線錯誤的話,進行重試
if errno.IsConnectError(err) {
continue
}
return
}
break
}
return
}
}
}
監測中介軟體
- 基於
prometheus
配置
// 服務端取樣打點
type Metrics struct {
requestCounter *prom.CounterVec
codeCounter *prom.CounterVec
latencySummary *prom.SummaryVec
}
//生成server metrics例項
func NewServerMetrics() *Metrics {
return &Metrics{
requestCounter: promauto.NewCounterVec(
prom.CounterOpts{
Name: "zhy_server_request_total",
Help: "Total number of RPCs completed on the server, regardless of success or failure.",
}, []string{"service", "method"}),
codeCounter: promauto.NewCounterVec(
prom.CounterOpts{
Name: "zhy_server_handled_code_total",
Help: "Total number of RPCs completed on the server, regardless of success or failure.",
}, []string{"service", "method", "grpc_code"}),
latencySummary: promauto.NewSummaryVec(
prom.SummaryOpts{
Name: "zhy_proc_cost",
Help: "RPC latency distributions.",
Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.99: 0.001},
},
[]string{"service", "method"},
),
}
}
//生成server metrics例項
func NewRpcMetrics() *Metrics {
return &Metrics{
requestCounter: promauto.NewCounterVec(
prom.CounterOpts{
Name: "zhy_rpc_call_total",
Help: "Total number of RPCs completed on the server, regardless of success or failure.",
}, []string{"service", "method"}),
codeCounter: promauto.NewCounterVec(
prom.CounterOpts{
Name: "zhy_rpc_code_total",
Help: "Total number of RPCs completed on the server, regardless of success or failure.",
}, []string{"service", "method", "grpc_code"}),
latencySummary: promauto.NewSummaryVec(
prom.SummaryOpts{
Name: "zhy_rpc_cost",
Help: "RPC latency distributions.",
Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.99: 0.001},
},
[]string{"service", "method"},
),
}
}
func (m *Metrics) IncrRequest(ctx context.Context, serviceName, methodName string) {
m.requestCounter.WithLabelValues(serviceName, methodName).Inc()
}
func (m *Metrics) IncrCode(ctx context.Context, serviceName, methodName string, err error) {
st, _ := status.FromError(err)
m.codeCounter.WithLabelValues(serviceName, methodName, st.Code().String()).Inc()
}
func (m *Metrics) Latency(ctx context.Context, serviceName, methodName string, us int64) {
m.latencySummary.WithLabelValues(serviceName, methodName).Observe(float64(us))
}
中介軟體實現
var (
DefaultServerMetrics = prometheus.NewServerMetrics()
DefaultRpcMetrics = prometheus.NewRpcMetrics()
)
func init() {
go func() {
http.Handle("/metrics", promhttp.Handler())
addr := fmt.Sprintf("0.0.0.0:%d", 8888)
http.ListenAndServe(addr, nil)
}()
}
// 服務端中介軟體
func PrometheusServerMiddleware(next MiddlewareFunc) MiddlewareFunc {
return func(ctx context.Context, req interface{}) (resp interface{}, err error) {
serverMeta := meta.GetServerMeta(ctx)
DefaultServerMetrics.IncrRequest(ctx, serverMeta.ServiceName, serverMeta.Method)
startTime := time.Now()
resp, err = next(ctx, req)
DefaultServerMetrics.IncrCode(ctx, serverMeta.ServiceName, serverMeta.Method, err)
DefaultServerMetrics.Latency(ctx, serverMeta.ServiceName,
serverMeta.Method, time.Since(startTime).Nanoseconds()/1000)
return
}
}
// 客戶端的中介軟體
func PrometheusClientMiddleware(next MiddlewareFunc) MiddlewareFunc {
return func(ctx context.Context, req interface{}) (resp interface{}, err error) {
rpcMeta := meta.GetRpcMeta(ctx)
DefaultRpcMetrics.IncrRequest(ctx, rpcMeta.ServiceName, rpcMeta.Method)
startTime := time.Now()
resp, err = next(ctx, req)
DefaultRpcMetrics.IncrCode(ctx, rpcMeta.ServiceName, rpcMeta.Method, err)
DefaultRpcMetrics.Latency(ctx, rpcMeta.ServiceName,
rpcMeta.Method, time.Since(startTime).Nanoseconds()/1000)
return
}
}
分散式追蹤中介軟體
- 基於opentracing
- https://www.cnblogs.com/zhaohaiyu/p/13613650.html
配置
const (
binHdrSuffix = "-bin"
)
// metadataTextMap extends a metadata.MD to be an opentracing textmap
type metadataTextMap metadata.MD
// Set is a opentracing.TextMapReader interface that extracts values.
func (m metadataTextMap) Set(key, val string) {
// gRPC allows for complex binary values to be written.
encodedKey, encodedVal := encodeKeyValue(key, val)
// The metadata object is a multimap, and previous values may exist, but for opentracing headers, we do not append
// we just override.
m[encodedKey] = []string{encodedVal}
}
// ForeachKey is a opentracing.TextMapReader interface that extracts values.
func (m metadataTextMap) ForeachKey(callback func(key, val string) error) error {
for k, vv := range m {
for _, v := range vv {
if decodedKey, decodedVal, err := metadata.DecodeKeyValue(k, v); err == nil {
if err = callback(decodedKey, decodedVal); err != nil {
return err
}
} else {
return fmt.Errorf("failed decoding opentracing from gRPC metadata: %v", err)
}
}
}
return nil
}
// encodeKeyValue encodes key and value qualified for transmission via gRPC.
// note: copy pasted from private values of grpc.metadata
func encodeKeyValue(k, v string) (string, string) {
k = strings.ToLower(k)
if strings.HasSuffix(k, binHdrSuffix) {
val := base64.StdEncoding.EncodeToString([]byte(v))
v = string(val)
}
return k, v
}
func InitTrace(serviceName, reportAddr, sampleType string, rate float64) (err error) {
transport := transport.NewHTTPTransport(
reportAddr,
transport.HTTPBatchSize(16),
)
cfg := &config.Configuration{
Sampler: &config.SamplerConfig{
Type: sampleType,
Param: rate,
},
Reporter: &config.ReporterConfig{
LogSpans: true,
},
}
r := jaeger.NewRemoteReporter(transport)
tracer, closer, err := cfg.New(serviceName,
config.Logger(jaeger.StdLogger),
config.Reporter(r))
if err != nil {
fmt.Printf("ERROR: cannot init Jaeger: %v\n", err)
return
}
_ = closer
opentracing.SetGlobalTracer(tracer)
return
}
中介軟體實現
func TraceServerMiddleware(next MiddlewareFunc) MiddlewareFunc {
return func(ctx context.Context, req interface{}) (resp interface{}, err error) {
//從ctx獲取grpc的metadata
md, ok := metadata.FromIncomingContext(ctx)
if !ok {
//沒有的話,新建一個
md = metadata.Pairs()
}
tracer := opentracing.GlobalTracer()
parentSpanContext, err := tracer.Extract(opentracing.HTTPHeaders, metadataTextMap(md))
if err != nil && err != opentracing.ErrSpanContextNotFound {
logs.Warn(ctx, "trace extract failed, parsing trace information: %v", err)
}
serverMeta := meta.GetServerMeta(ctx)
//開始追蹤該方法
serverSpan := tracer.StartSpan(
serverMeta.Method,
ext.RPCServerOption(parentSpanContext),
ext.SpanKindRPCServer,
)
serverSpan.SetTag(util.TraceID, logs.GetTraceId(ctx))
ctx = opentracing.ContextWithSpan(ctx, serverSpan)
resp, err = next(ctx, req)
//記錄錯誤
if err != nil {
ext.Error.Set(serverSpan, true)
serverSpan.LogFields(log.String("event", "error"), log.String("message", err.Error()))
}
serverSpan.Finish()
return
}
}
func TraceClientMiddleware(next MiddlewareFunc) MiddlewareFunc {
return func(ctx context.Context, req interface{}) (resp interface{}, err error) {
tracer := opentracing.GlobalTracer()
var parentSpanCtx opentracing.SpanContext
if parent := opentracing.SpanFromContext(ctx); parent != nil {
parentSpanCtx = parent.Context()
}
opts := []opentracing.StartSpanOption{
opentracing.ChildOf(parentSpanCtx),
ext.SpanKindRPCClient,
opentracing.Tag{Key: string(ext.Component), Value: "koala_rpc"},
opentracing.Tag{Key: util.TraceID, Value: logs.GetTraceId(ctx)},
}
rpcMeta := meta.GetRpcMeta(ctx)
clientSpan := tracer.StartSpan(rpcMeta.ServiceName, opts...)
md, ok := metadata.FromOutgoingContext(ctx)
if !ok {
md = metadata.Pairs()
}
if err := tracer.Inject(clientSpan.Context(), opentracing.HTTPHeaders, metadataTextMap(md)); err != nil {
logs.Debug(ctx, "grpc_opentracing: failed serializing trace information: %v", err)
}
ctx = metadata.NewOutgoingContext(ctx, md)
ctx = metadata.AppendToOutgoingContext(ctx, util.TraceID, logs.GetTraceId(ctx))
ctx = opentracing.ContextWithSpan(ctx, clientSpan)
resp, err = next(ctx, req)
//記錄錯誤
if err != nil {
ext.Error.Set(clientSpan, true)
clientSpan.LogFields(log.String("event", "error"), log.String("message", err.Error()))
}
clientSpan.Finish()
return
}
}