1. 程式人生 > >go監控方案(7) -- 實現

go監控方案(7) -- 實現

metrics 客戶端

資料採集使用go-metrics

傳輸使用UDP, 仿StatsD上傳採集資料, InfluxDB進行資料儲存, Grafana進行展示。

實現github 地址

https://github.com/solate/metrics

該地址有已經改好的配置檔案可以直接使用。

使用的all-in-one :

git docker-statsd-influxdb-grafana

docker hub 地址

資料封裝

//掛載配置檔案,已修改statsd模版
docker run --ulimit nofile=66000:66000  -v /root/telegraf.conf:/etc/telegraf/telegraf.conf   -d   --name docker-statsd-influxdb-grafana   -p 3003:3003   -p 3004:8888   -p 8086:8086   -p 8125:8125/udp   samuelebistoletti/docker-statsd-influxdb-grafana:latest

register

register 使用的name 必須是不同的

telegraf 配置修改

[[inputs.statsd]] 部分配置開啟, 修改templates為:

   templates = [
      "* measurement.measurement.field"
   ]

表示傳值prefix.name.field 最好表示為prefix_name field

程式碼實現

package client

import (
	"bufio"
	"bytes"
	"github.com/rcrowley/go-metrics"
	"log"
	"net"
	"strconv"
	"strings"
	"time"
)

const (
	// UDP packet limit, see
	// https://en.wikipedia.org/wiki/User_Datagram_Protocol#Packet_structure
	UDP_MAX_PACKET_SIZE int = 64 * 1024
)

// Config provides a container with configuration parameters for
// the StatsD exporter
type Config struct {
	Network       string           // Network: tcp, udp.
	Addr          string           // Network address to connect to | 地址
	Registry      metrics.Registry // Registry to be exported | metrics註冊
	FlushInterval time.Duration    // Flush interval | 重新整理間隔時間
	Prefix        string           // Prefix to be prepended to metric names | 字首名字
	Rate          float32          // Rate
	Tags          string           // tag //TODO

	conn net.Conn
}

func StatsD(r metrics.Registry, d time.Duration, prefix string, network string, addr string, rate float32) {

	conn, err := net.Dial(network, addr)
	if err != nil {
		panic("conn remote err!")
	}

	StatsDWithConfig(Config{
		Network:       network,
		Addr:          addr,
		Registry:      r,
		FlushInterval: d,
		Prefix:        prefix,
		Rate:          rate,
		conn:          conn,
	})

}

// WithConfig is a blocking exporter function
func StatsDWithConfig(c Config) {
	for _ = range time.Tick(c.FlushInterval) {
		if err := statsd(&c); err != nil {
			log.Println(err)
			c.conn.Close()
		}
	}
}

func statsd(c *Config) (err error) {

	w := bufio.NewWriter(c.conn)

	c.Registry.Each(func(name string, i interface{}) {
		switch metric := i.(type) {
		case metrics.Counter:
			ms := metric.Snapshot()
			w.Write(statsdLine(c.Prefix, name, "", ms.Count(), "|c", c.Rate))
		case metrics.Gauge:
			ms := metric.Snapshot()
			w.Write(statsdLine(c.Prefix, name, "", ms.Value(), "|g", c.Rate))
		case metrics.GaugeFloat64:
			ms := metric.Snapshot()
			w.Write(statsdLine(c.Prefix, name, "", ms.Value(), "|g", c.Rate))
		case metrics.Histogram:
			ms := metric.Snapshot()
			ps := ms.Percentiles([]float64{0.5, 0.75, 0.95, 0.99, 0.999, 0.9999})

			fields := make([][]byte, 12)
			fields = append(fields, statsdLine(c.Prefix, name, "count", ms.Count(), "|g", c.Rate))
			fields = append(fields, statsdLine(c.Prefix, name, "max", ms.Max(), "|g", c.Rate))
			fields = append(fields, statsdLine(c.Prefix, name, "mean", ms.Mean(), "|g", c.Rate))
			fields = append(fields, statsdLine(c.Prefix, name, "min", ms.Min(), "|g", c.Rate))
			fields = append(fields, statsdLine(c.Prefix, name, "stddev", ms.StdDev(), "|g", c.Rate))
			fields = append(fields, statsdLine(c.Prefix, name, "variance", ms.Variance(), "|g", c.Rate))
			fields = append(fields, statsdLine(c.Prefix, name, "p50", ps[0], "|g", c.Rate))
			fields = append(fields, statsdLine(c.Prefix, name, "p75", ps[1], "|g", c.Rate))
			fields = append(fields, statsdLine(c.Prefix, name, "p95", ps[2], "|g", c.Rate))
			fields = append(fields, statsdLine(c.Prefix, name, "p99", ps[3], "|g", c.Rate))
			fields = append(fields, statsdLine(c.Prefix, name, "p999", ps[4], "|g", c.Rate))
			fields = append(fields, statsdLine(c.Prefix, name, "p9999", ps[5], "|g", c.Rate))

			buf := bytes.Join(fields, []byte{})
			w.Write(buf)
		case metrics.Meter:
			ms := metric.Snapshot()
			fields := make([][]byte, 5)
			fields = append(fields, statsdLine(c.Prefix, name, "count", ms.Count(), "|g", c.Rate))
			fields = append(fields, statsdLine(c.Prefix, name, "m1", ms.Rate1(), "|g", c.Rate))
			fields = append(fields, statsdLine(c.Prefix, name, "m5", ms.Rate5(), "|g", c.Rate))
			fields = append(fields, statsdLine(c.Prefix, name, "m15", ms.Rate15(), "|g", c.Rate))
			fields = append(fields, statsdLine(c.Prefix, name, "mean", ms.RateMean(), "|g", c.Rate))

			buf := bytes.Join(fields, []byte{})
			w.Write(buf)

		case metrics.Timer:
			ms := metric.Snapshot()
			ps := ms.Percentiles([]float64{0.5, 0.75, 0.95, 0.99, 0.999, 0.9999})

			fields := make([][]byte, 12)
			fields = append(fields, statsdLine(c.Prefix, name, "count", ms.Count(), "|g", c.Rate))
			fields = append(fields, statsdLine(c.Prefix, name, "max", ms.Max(), "|g", c.Rate))
			fields = append(fields, statsdLine(c.Prefix, name, "mean", ms.Mean(), "|g", c.Rate))
			fields = append(fields, statsdLine(c.Prefix, name, "min", ms.Min(), "|g", c.Rate))
			fields = append(fields, statsdLine(c.Prefix, name, "stddev", ms.StdDev(), "|g", c.Rate))
			fields = append(fields, statsdLine(c.Prefix, name, "variance", ms.Variance(), "|g", c.Rate))
			fields = append(fields, statsdLine(c.Prefix, name, "p50", ps[0], "|g", c.Rate))
			fields = append(fields, statsdLine(c.Prefix, name, "p75", ps[1], "|g", c.Rate))
			fields = append(fields, statsdLine(c.Prefix, name, "p95", ps[2], "|g", c.Rate))
			fields = append(fields, statsdLine(c.Prefix, name, "p99", ps[3], "|g", c.Rate))
			fields = append(fields, statsdLine(c.Prefix, name, "p999", ps[4], "|g", c.Rate))
			fields = append(fields, statsdLine(c.Prefix, name, "p9999", ps[5], "|g", c.Rate))
			fields = append(fields, statsdLine(c.Prefix, name, "m1", ms.Rate1(), "|g", c.Rate))
			fields = append(fields, statsdLine(c.Prefix, name, "m5", ms.Rate5(), "|g", c.Rate))
			fields = append(fields, statsdLine(c.Prefix, name, "m15", ms.Rate15(), "|g", c.Rate))
			fields = append(fields, statsdLine(c.Prefix, name, "mean", ms.RateMean(), "|g", c.Rate))

			buf := bytes.Join(fields, []byte{})

			w.Write(buf)

			//case metrics.Healthcheck:
			//	metric.Check()
			//	log.Printf("healthcheck %s\n", name)
			//	log.Printf("  error:       %v\n", metric.Error())
			//
			//case metrics.EWMA:
			//case metrics.Sample:

		}

		w.Flush()
	})

	return
}

//構造傳送line
func statsdLine(prefix, name, field string, value interface{}, suffix string, rate float32) []byte {

	//<metricname>:<value>|<type>|@<rate>
	var buffer bytes.Buffer

	//buf := make([]byte, UDP_MAX_PACKET_SIZE)

	//新增字首
	if prefix != "" {
		//buf = append(buf, prefix...)
		//buf = append(buf, '.')
		buffer.WriteString(prefix)
		buffer.WriteString(".")
	} else {
		buffer.WriteString("statsd")
		buffer.WriteString(".")
	}

	////新增名稱
	//buf = append(buf, name...)
	//buf = append(buf, ':')

	//將name註冊中的'.'替換成'_', 配合telegraf修改模版,防止將資料庫名字改為屬性
	if strings.Contains(name, ".") {
		name = strings.ReplaceAll(name, ".", "_")
	}
	//新增名稱
	buffer.WriteString(name)

	if field != "" {
		buffer.WriteString(".")
		buffer.WriteString(field)
	}
	buffer.WriteString(":")

	buf := buffer.Bytes()

	switch v := value.(type) {
	case string:
		buf = append(buf, v...)
	case int64:
		buf = strconv.AppendInt(buf, v, 10)
	case float64:
		buf = strconv.AppendFloat(buf, v, 'f', -1, 64)
	default:
		return nil
	}

	if suffix != "" {
		buf = append(buf, suffix...)
	}

	if rate != 0 && rate < 1 {
		buf = append(buf, "|@"...)
		buf = strconv.AppendFloat(buf, float64(rate), 'f', 6, 32)
	}

	buf = append(buf, "\n"...) //每一行打一個回車,telegraf 是使用回車進行讀取的
	return buf

}

參考

multiple fi