go監控方案(7) -- 實現
阿新 • • 發佈:2019-08-19
metrics 客戶端
資料採集使用go-metrics
傳輸使用UDP, 仿StatsD上傳採集資料, InfluxDB進行資料儲存, Grafana進行展示。
實現github 地址
https://github.com/solate/metrics
該地址有已經改好的配置檔案可以直接使用。
使用的all-in-one :
git docker-statsd-influxdb-grafana
資料封裝
//掛載配置檔案,已修改statsd模版 docker run --ulimit nofile=66000:66000 -v /root/telegraf.conf:/etc/telegraf/telegraf.conf -d --name docker-statsd-influxdb-grafana -p 3003:3003 -p 3004:8888 -p 8086:8086 -p 8125:8125/udp samuelebistoletti/docker-statsd-influxdb-grafana:latest
register
register 使用的name 必須是不同的
telegraf 配置修改
將 [[inputs.statsd]]
部分配置開啟, 修改templates為:
templates = [
"* measurement.measurement.field"
]
表示傳值prefix.name.field 最好表示為prefix_name field
程式碼實現
package client import ( "bufio" "bytes" "github.com/rcrowley/go-metrics" "log" "net" "strconv" "strings" "time" ) const ( // UDP packet limit, see // https://en.wikipedia.org/wiki/User_Datagram_Protocol#Packet_structure UDP_MAX_PACKET_SIZE int = 64 * 1024 ) // Config provides a container with configuration parameters for // the StatsD exporter type Config struct { Network string // Network: tcp, udp. Addr string // Network address to connect to | 地址 Registry metrics.Registry // Registry to be exported | metrics註冊 FlushInterval time.Duration // Flush interval | 重新整理間隔時間 Prefix string // Prefix to be prepended to metric names | 字首名字 Rate float32 // Rate Tags string // tag //TODO conn net.Conn } func StatsD(r metrics.Registry, d time.Duration, prefix string, network string, addr string, rate float32) { conn, err := net.Dial(network, addr) if err != nil { panic("conn remote err!") } StatsDWithConfig(Config{ Network: network, Addr: addr, Registry: r, FlushInterval: d, Prefix: prefix, Rate: rate, conn: conn, }) } // WithConfig is a blocking exporter function func StatsDWithConfig(c Config) { for _ = range time.Tick(c.FlushInterval) { if err := statsd(&c); err != nil { log.Println(err) c.conn.Close() } } } func statsd(c *Config) (err error) { w := bufio.NewWriter(c.conn) c.Registry.Each(func(name string, i interface{}) { switch metric := i.(type) { case metrics.Counter: ms := metric.Snapshot() w.Write(statsdLine(c.Prefix, name, "", ms.Count(), "|c", c.Rate)) case metrics.Gauge: ms := metric.Snapshot() w.Write(statsdLine(c.Prefix, name, "", ms.Value(), "|g", c.Rate)) case metrics.GaugeFloat64: ms := metric.Snapshot() w.Write(statsdLine(c.Prefix, name, "", ms.Value(), "|g", c.Rate)) case metrics.Histogram: ms := metric.Snapshot() ps := ms.Percentiles([]float64{0.5, 0.75, 0.95, 0.99, 0.999, 0.9999}) fields := make([][]byte, 12) fields = append(fields, statsdLine(c.Prefix, name, "count", ms.Count(), "|g", c.Rate)) fields = append(fields, statsdLine(c.Prefix, name, "max", ms.Max(), "|g", c.Rate)) fields = append(fields, statsdLine(c.Prefix, name, "mean", ms.Mean(), "|g", c.Rate)) fields = append(fields, statsdLine(c.Prefix, name, "min", ms.Min(), "|g", c.Rate)) fields = append(fields, statsdLine(c.Prefix, name, "stddev", ms.StdDev(), "|g", c.Rate)) fields = append(fields, statsdLine(c.Prefix, name, "variance", ms.Variance(), "|g", c.Rate)) fields = append(fields, statsdLine(c.Prefix, name, "p50", ps[0], "|g", c.Rate)) fields = append(fields, statsdLine(c.Prefix, name, "p75", ps[1], "|g", c.Rate)) fields = append(fields, statsdLine(c.Prefix, name, "p95", ps[2], "|g", c.Rate)) fields = append(fields, statsdLine(c.Prefix, name, "p99", ps[3], "|g", c.Rate)) fields = append(fields, statsdLine(c.Prefix, name, "p999", ps[4], "|g", c.Rate)) fields = append(fields, statsdLine(c.Prefix, name, "p9999", ps[5], "|g", c.Rate)) buf := bytes.Join(fields, []byte{}) w.Write(buf) case metrics.Meter: ms := metric.Snapshot() fields := make([][]byte, 5) fields = append(fields, statsdLine(c.Prefix, name, "count", ms.Count(), "|g", c.Rate)) fields = append(fields, statsdLine(c.Prefix, name, "m1", ms.Rate1(), "|g", c.Rate)) fields = append(fields, statsdLine(c.Prefix, name, "m5", ms.Rate5(), "|g", c.Rate)) fields = append(fields, statsdLine(c.Prefix, name, "m15", ms.Rate15(), "|g", c.Rate)) fields = append(fields, statsdLine(c.Prefix, name, "mean", ms.RateMean(), "|g", c.Rate)) buf := bytes.Join(fields, []byte{}) w.Write(buf) case metrics.Timer: ms := metric.Snapshot() ps := ms.Percentiles([]float64{0.5, 0.75, 0.95, 0.99, 0.999, 0.9999}) fields := make([][]byte, 12) fields = append(fields, statsdLine(c.Prefix, name, "count", ms.Count(), "|g", c.Rate)) fields = append(fields, statsdLine(c.Prefix, name, "max", ms.Max(), "|g", c.Rate)) fields = append(fields, statsdLine(c.Prefix, name, "mean", ms.Mean(), "|g", c.Rate)) fields = append(fields, statsdLine(c.Prefix, name, "min", ms.Min(), "|g", c.Rate)) fields = append(fields, statsdLine(c.Prefix, name, "stddev", ms.StdDev(), "|g", c.Rate)) fields = append(fields, statsdLine(c.Prefix, name, "variance", ms.Variance(), "|g", c.Rate)) fields = append(fields, statsdLine(c.Prefix, name, "p50", ps[0], "|g", c.Rate)) fields = append(fields, statsdLine(c.Prefix, name, "p75", ps[1], "|g", c.Rate)) fields = append(fields, statsdLine(c.Prefix, name, "p95", ps[2], "|g", c.Rate)) fields = append(fields, statsdLine(c.Prefix, name, "p99", ps[3], "|g", c.Rate)) fields = append(fields, statsdLine(c.Prefix, name, "p999", ps[4], "|g", c.Rate)) fields = append(fields, statsdLine(c.Prefix, name, "p9999", ps[5], "|g", c.Rate)) fields = append(fields, statsdLine(c.Prefix, name, "m1", ms.Rate1(), "|g", c.Rate)) fields = append(fields, statsdLine(c.Prefix, name, "m5", ms.Rate5(), "|g", c.Rate)) fields = append(fields, statsdLine(c.Prefix, name, "m15", ms.Rate15(), "|g", c.Rate)) fields = append(fields, statsdLine(c.Prefix, name, "mean", ms.RateMean(), "|g", c.Rate)) buf := bytes.Join(fields, []byte{}) w.Write(buf) //case metrics.Healthcheck: // metric.Check() // log.Printf("healthcheck %s\n", name) // log.Printf(" error: %v\n", metric.Error()) // //case metrics.EWMA: //case metrics.Sample: } w.Flush() }) return } //構造傳送line func statsdLine(prefix, name, field string, value interface{}, suffix string, rate float32) []byte { //<metricname>:<value>|<type>|@<rate> var buffer bytes.Buffer //buf := make([]byte, UDP_MAX_PACKET_SIZE) //新增字首 if prefix != "" { //buf = append(buf, prefix...) //buf = append(buf, '.') buffer.WriteString(prefix) buffer.WriteString(".") } else { buffer.WriteString("statsd") buffer.WriteString(".") } ////新增名稱 //buf = append(buf, name...) //buf = append(buf, ':') //將name註冊中的'.'替換成'_', 配合telegraf修改模版,防止將資料庫名字改為屬性 if strings.Contains(name, ".") { name = strings.ReplaceAll(name, ".", "_") } //新增名稱 buffer.WriteString(name) if field != "" { buffer.WriteString(".") buffer.WriteString(field) } buffer.WriteString(":") buf := buffer.Bytes() switch v := value.(type) { case string: buf = append(buf, v...) case int64: buf = strconv.AppendInt(buf, v, 10) case float64: buf = strconv.AppendFloat(buf, v, 'f', -1, 64) default: return nil } if suffix != "" { buf = append(buf, suffix...) } if rate != 0 && rate < 1 { buf = append(buf, "|@"...) buf = strconv.AppendFloat(buf, float64(rate), 'f', 6, 32) } buf = append(buf, "\n"...) //每一行打一個回車,telegraf 是使用回車進行讀取的 return buf }