Prometheus 實戰於原始碼分析之collector
阿新 • • 發佈:2019-02-12
在prometheus裡面有很多的exporter,每個exporter裡面的都有一個collector,我在這裡先寫分析一下prometheus自身的監控系統,採集自己的監控資料。
先看介面
type Collector interface {
Describe(chan<- *Desc)
Collect(chan<- Metric)
}
有很多資料型別實現了這個介面
Gauge
type Gauge interface {
Metric
Collector
// Set sets the Gauge to an arbitrary value.
Set(float64)
// Inc increments the Gauge by 1.
Inc()
// Dec decrements the Gauge by 1.
Dec()
// Add adds the given value to the Gauge. (The value can be
// negative, resulting in a decrease of the Gauge.)
Add(float64)
// Sub subtracts the given value from the Gauge. (The value can be
// negative, resulting in an increase of the Gauge.)
Sub(float64)
}
Histogram
type Histogram interface {
Metric
Collector
// Observe adds a single observation to the histogram.
Observe(float64)
}
Counter
type Counter interface {
Metric
Collector
// Set is used to set the Counter to an arbitrary value. It is only used
// if you have to transfer a value from an external counter into this
// Prometheus metric. Do not use it for regular handling of a
// Prometheus counter (as it can be used to break the contract of
// monotonically increasing values).
//
// Deprecated: Use NewConstMetric to create a counter for an external
// value. A Counter should never be set.
Set(float64)
// Inc increments the counter by 1.
Inc()
// Add adds the given value to the counter. It panics if the value is <
// 0.
Add(float64)
}
Summary
type Summary interface {
Metric
Collector
// Observe adds a single observation to the summary.
Observe(float64)
}
這是Collector介面還有一個prometheus自己的一個實現selfCollector
type selfCollector struct {
self Metric
}
// init provides the selfCollector with a reference to the metric it is supposed
// to collect. It is usually called within the factory function to create a
// metric. See example.
func (c *selfCollector) init(self Metric) {
c.self = self
}
// Describe implements Collector.
func (c *selfCollector) Describe(ch chan<- *Desc) {
ch <- c.self.Desc()
}
// Collect implements Collector.
func (c *selfCollector) Collect(ch chan<- Metric) {
ch <- c.self
}
當執行selfCollector的Collect方法就是返回本身的Metric。還記得第一篇說的註冊嗎?prometheus.MustRegister(configSuccess)註冊這個configSuccess
configSuccess = prometheus.NewGauge(prometheus.GaugeOpts{
Namespace: "prometheus",
Name: "config_last_reload_successful",
Help: "Whether the last configuration reload attempt was successful.",
})
在NewGauge裡面,本質上就建立一個value。這個value裡面有selfCollector,就是上面的selfCollector
type value struct {
valBits uint64
selfCollector
desc *Desc
valType ValueType
labelPairs []*dto.LabelPair
}
建立完Gauge後就可以註冊MustRegister(…Collector),具體看
func (r *Registry) MustRegister(cs ...Collector) {
for _, c := range cs {
if err := r.Register(c); err != nil {
panic(err)
}
}
}
再深入看一下Register方法
if len(newDescIDs) == 0 {
return errors.New("collector has no descriptors")
}
if existing, exists := r.collectorsByID[collectorID]; exists {
return AlreadyRegisteredError{
ExistingCollector: existing,
NewCollector: c,
}
}
// If the collectorID is new, but at least one of the descs existed
// before, we are in trouble.
if duplicateDescErr != nil {
return duplicateDescErr
}
// Only after all tests have passed, actually register.
r.collectorsByID[collectorID] = c
for hash := range newDescIDs {
r.descIDs[hash] = struct{}{}
}
for name, dimHash := range newDimHashesByName {
r.dimHashesByName[name] = dimHash
}
就是註冊到collectorsByID這map裡面,collectorsByID map[uint64]Collector 它的key是descID,值就是我們註冊的collector。
通過這個map去維護collector。取消註冊的方法是刪除
r.mtx.RLock()
if _, exists := r.collectorsByID[collectorID]; !exists {
r.mtx.RUnlock()
return false
}
r.mtx.RUnlock()
r.mtx.Lock()
defer r.mtx.Unlock()
delete(r.collectorsByID, collectorID)
for id := range descIDs {
delete(r.descIDs, id)
}
現在已經把collector的結構和註冊講完了,那麼採集就變的順理成章了,Gather()方法採集資料
wg.Add(len(r.collectorsByID))
go func() {
wg.Wait()
close(metricChan)
}()
for _, collector := range r.collectorsByID {
go func(collector Collector) {
defer wg.Done()
collector.Collect(metricChan)
}(collector)
}
迴圈遍歷執行collecto去採集,把結果放到metricChan,然後就引數解析封裝了,這裡涉及到了資料型別,和上面介面組合是對應的
dtoMetric := &dto.Metric{}
if err := metric.Write(dtoMetric); err != nil {
errs = append(errs, fmt.Errorf(
"error collecting metric %v: %s", desc, err,
))
continue
}
...
metricFamily.Metric = append(metricFamily.Metric, dtoMetric)
上面的write方法在需要解釋一下,如果是value型別
func (v *value) Write(out *dto.Metric) error {
val := math.Float64frombits(atomic.LoadUint64(&v.valBits))
return populateMetric(v.valType, val, v.labelPairs, out)
}
func populateMetric(
t ValueType,
v float64,
labelPairs []*dto.LabelPair,
m *dto.Metric,
) error {
m.Label = labelPairs
switch t {
case CounterValue:
m.Counter = &dto.Counter{Value: proto.Float64(v)}
case GaugeValue:
m.Gauge = &dto.Gauge{Value: proto.Float64(v)}
case UntypedValue:
m.Untyped = &dto.Untyped{Value: proto.Float64(v)}
default:
return fmt.Errorf("encountered unknown type %v", t)
}
return nil
}
如果是其它型別,在自己的
這裡還有補充一下對於指標的定義
type Metric struct {
Label []*LabelPair `protobuf:"bytes,1,rep,name=label" json:"label,omitempty"`
Gauge *Gauge `protobuf:"bytes,2,opt,name=gauge" json:"gauge,omitempty"`
Counter *Counter `protobuf:"bytes,3,opt,name=counter" json:"counter,omitempty"`
Summary *Summary `protobuf:"bytes,4,opt,name=summary" json:"summary,omitempty"`
Untyped *Untyped `protobuf:"bytes,5,opt,name=untyped" json:"untyped,omitempty"`
Histogram *Histogram `protobuf:"bytes,7,opt,name=histogram" json:"histogram,omitempty"`
TimestampMs *int64 `protobuf:"varint,6,opt,name=timestamp_ms" json:"timestamp_ms,omitempty"`
XXX_unrecognized []byte `json:"-"`
}