1. 程式人生 > >深入理解Flink ---- Metrics的內部結構

深入理解Flink ---- Metrics的內部結構

open scheduled 以及 only sta OS volatile 添加 closed

從Metrics的使用說起

Flink的Metrics種類有四種Counters, Gauges, Histograms和Meters.

如何使用Metrics呢? 以Counter為例,

 1 public class MyMapper extends RichMapFunction<String, String> {
 2   private transient Counter counter;
 3 
 4   @Override
 5   public void open(Configuration config) {
 6     this.counter = getRuntimeContext()
7 .getMetricGroup() 8 .counter("myCounter"); 9 } 10 11 @Override 12 public String map(String value) throws Exception { 13 this.counter.inc(); 14 return value; 15 } 16 }

行7 getMetricGroup()獲取MetricGroup

行8 從MetricGroup中獲取Metric實例

那麽,我們來探訪一下MetricGroup

Metric容器--MetricGroup

MetricGroup是Metric對象和metric subgroups的容器.

調用以下4個方法可以獲得Metric對象並調用addMetric()註冊這個Metric.

(AbstractMetricGroup.java)

 1 public <C extends Counter> C counter(String name, C counter)     
 2 {
 3     addMetric(name, counter);
 4     return counter;
 5 }
 6 
 7 public <T, G extends Gauge<T>> G gauge(String name, G gauge) {
8 addMetric(name, gauge); 9 return gauge; 10 } 11 12 public <H extends Histogram> H histogram(String name, H histogram) { 13 addMetric(name, histogram); 14 return histogram; 15 } 16 17 public <M extends Meter> M meter(String name, M meter) { 18 addMetric(name, meter); 19 return meter; 20 }

註,MetricGroup接口的另一個實現UnregisteredMetricsGroup僅僅返回Metric實例而不對Metric進行註冊

註2,MetricGroup接口的第三個實現ProxyMetricGroup有一個parent MetricGroup,ProxyMetricGroup所有的調用都轉發到parentMetricGroup上

(AbstractMetricGroup.java)重要的域

    /** The registry that this metrics group belongs to. */
    protected final MetricRegistry registry; 

    /** All metrics that are directly contained in this group. */
    private final Map<String, Metric> metrics = new HashMap<>();

    /** All metric subgroups of this group. */
    private final Map<String, AbstractMetricGroup> groups = new HashMap<>();/** Flag indicating whether this group has been closed. */
    private volatile boolean closed;

(AbstractMetricGroup.java)

 1     protected void addMetric(String name, Metric metric) {
 2         if (metric == null) {
 3             LOG.warn("Ignoring attempted registration of a metric due to being null for name {}.", name);
 4             return;
 5         }
 6         // add the metric only if the group is still open
 7         synchronized (this) {
 8             if (!closed) {
 9                 // immediately put without a ‘contains‘ check to optimize the common case (no collision)
10                 // collisions are resolved later
11                 Metric prior = metrics.put(name, metric);
12 
13                 // check for collisions with other metric names
14                 if (prior == null) {
15                     // no other metric with this name yet
16 
17                     if (groups.containsKey(name)) {
18                         // we warn here, rather than failing, because metrics are tools that should not fail the
19                         // program when used incorrectly
20                         LOG.warn("Name collision: Adding a metric with the same name as a metric subgroup: ‘" +
21                                 name + "‘. Metric might not get properly reported. " + Arrays.toString(scopeComponents));
22                     }
23 
24                     registry.register(metric, name, this);
25                 }
26                 else {
27                     // we had a collision. put back the original value
28                     metrics.put(name, prior);
29 
30                     // we warn here, rather than failing, because metrics are tools that should not fail the
31                     // program when used incorrectly
32                     LOG.warn("Name collision: Group already contains a Metric with the name ‘" +
33                             name + "‘. Metric will not be reported." + Arrays.toString(scopeComponents));
34                 }
35             }
36         }
37     }

具體再來看一下addMetric()的代碼

行7 獲得互斥鎖

行8 檢測當前group是否close

行11~34 把要註冊的Metric對象添加到metrics map中

這裏一個小trick是,默認沒有key的沖突,直接把這個metric對象添加到map中.再回頭檢測是否有值被替換出來.這樣的做法可以優化性能(若沒有key沖突,減少了一次map尋址)

行24 在MetricRegister中註冊Metric,這個在下一節詳談

調用addGroup()可以添加subgroup

(AbstractMetricGroup.java)

 1     private AbstractMetricGroup<?> addGroup(String name, ChildType childType) {
 2         synchronized (this) {
 3             if (!closed) {
 4                 // adding a group with the same name as a metric creates problems in many reporters/dashboards
 5                 // we warn here, rather than failing, because metrics are tools that should not fail the
 6                 // program when used incorrectly
 7                 if (metrics.containsKey(name)) {
 8                     LOG.warn("Name collision: Adding a metric subgroup with the same name as an existing metric: ‘" +
 9                             name + "‘. Metric might not get properly reported. " + Arrays.toString(scopeComponents));
10                 }
11 
12                 AbstractMetricGroup newGroup = createChildGroup(name, childType);
13                 AbstractMetricGroup prior = groups.put(name, newGroup);
14                 if (prior == null) {
15                     // no prior group with that name
16                     return newGroup;
17                 } else {
18                     // had a prior group with that name, add the prior group back
19                     groups.put(name, prior);
20                     return prior;
21                 }
22             }
23             else {
24                 // return a non-registered group that is immediately closed already
25                 GenericMetricGroup closedGroup = new GenericMetricGroup(registry, this, name);
26                 closedGroup.close();
27                 return closedGroup;
28             }
29         }
30     }
31 
32     protected GenericMetricGroup createChildGroup(String name, ChildType childType) {
33         switch (childType) {
34             case KEY:
35                 return new GenericKeyMetricGroup(registry, this, name);
36             default:
37                 return new GenericMetricGroup(registry, this, name);
38         }
39     }
40 
41     /**
42      * Enum for indicating which child group should be created.
43      * `KEY` is used to create {@link GenericKeyMetricGroup}.
44      * `VALUE` is used to create {@link GenericValueMetricGroup}.
45      * `GENERIC` is used to create {@link GenericMetricGroup}.
46      */
47     protected enum ChildType {
48         KEY,
49         VALUE,
50         GENERIC
51     }

行2 獲取互斥鎖

行12~21 新建MetricGroup對象

註意,添加的subgroup的name與Metric對象的name相同會造成問題.

行25,35,37 同一個tree裏的MetricGroup對象使用同一個MetricRegister

行26 close MetricGroup

 1     public void close() {
 2         synchronized (this) {
 3             if (!closed) {
 4                 closed = true;
 5 
 6                 // close all subgroups
 7                 for (AbstractMetricGroup group : groups.values()) {
 8                     group.close();
 9                 }
10                 groups.clear();
11 
12                 // un-register all directly contained metrics
13                 for (Map.Entry<String, Metric> metric : metrics.entrySet()) {
14                     registry.unregister(metric.getValue(), metric.getKey(), this);
15                 }
16                 metrics.clear();
17             }
18         }
19     }

行2 獲取互斥鎖

遞歸地close所有subgroups, 註銷所有metrics

MetricGroup中的addMetric(),addGroup(),close()以及上面未提到的getAllVariables()方法需要獲取互斥鎖

原因: 防止關閉group的同時添加metrics和subgroups造成的資源泄露.

MetricGroup另一個很重要的方法是public String getMetricIdentifier(String metricName, CharacterFilter filter, int reporterIndex).

作用是獲取某個Metric的唯一名作為標誌(identifier).

identifier分為3部分:System scope, User scope, Metric name

A.B.C 其中,A為System scope,B為User scope,C為Metric name, ‘.‘是分隔符

System Scope可在conf/flink-conf.yaml中定義.

User Scope就是groups tree, 可調用addGroup(String)來定義 (可定義多層group)

MetricGroup與MetricReporter之間的橋梁 -- MetricRegister

MetricRegistry追蹤所有已註冊的Metric.它作為MetricGroup和MetricReporter之間的橋梁.

在MetricGroup的addMetric()方法中調用了MetricRegister的register()方法:

registry.register(metric, name, this);

在MetricGroup的close()方法中調用了MetricRegister的unregister()方法:

registry.unregister(metric.getValue(), metric.getKey(), this);
 1     // ------------------------------------------------------------------------
 2     //  Metrics (de)registration
 3     // ------------------------------------------------------------------------
 4 
 5     @Override
 6     public void register(Metric metric, String metricName, AbstractMetricGroup group) {
 7         synchronized (lock) {
 8             if (isShutdown()) {
 9                 LOG.warn("Cannot register metric, because the MetricRegistry has already been shut down.");
10             } else {
11                 if (reporters != null) {
12                     for (int i = 0; i < reporters.size(); i++) {
13                         MetricReporter reporter = reporters.get(i);
14                         try {
15                             if (reporter != null) {
16                                 FrontMetricGroup front = new FrontMetricGroup<AbstractMetricGroup<?>>(i, group);
17                                 reporter.notifyOfAddedMetric(metric, metricName, front);
18                             }
19                         } catch (Exception e) {
20                             LOG.warn("Error while registering metric.", e);
21                         }
22                     }
23                 }
24                 try {
25                     if (queryService != null) {
26                         MetricQueryService.notifyOfAddedMetric(queryService, metric, metricName, group);
27                     }
28                 } catch (Exception e) {
29                     LOG.warn("Error while registering metric.", e);
30                 }
31                 try {
32                     if (metric instanceof View) {
33                         if (viewUpdater == null) {
34                             viewUpdater = new ViewUpdater(executor);
35                         }
36                         viewUpdater.notifyOfAddedView((View) metric);
37                     }
38                 } catch (Exception e) {
39                     LOG.warn("Error while registering metric.", e);
40                 }
41             }
42         }
43     }
44 
45     @Override
46     public void unregister(Metric metric, String metricName, AbstractMetricGroup group) {
47         synchronized (lock) {
48             if (isShutdown()) {
49                 LOG.warn("Cannot unregister metric, because the MetricRegistry has already been shut down.");
50             } else {
51                 if (reporters != null) {
52                     for (int i = 0; i < reporters.size(); i++) {
53                         try {
54                         MetricReporter reporter = reporters.get(i);
55                             if (reporter != null) {
56                                 FrontMetricGroup front = new FrontMetricGroup<AbstractMetricGroup<?>>(i, group);
57                                 reporter.notifyOfRemovedMetric(metric, metricName, front);
58                             }
59                         } catch (Exception e) {
60                             LOG.warn("Error while registering metric.", e);
61                         }
62                     }
63                 }
64                 try {
65                     if (queryService != null) {
66                         MetricQueryService.notifyOfRemovedMetric(queryService, metric);
67                     }
68                 } catch (Exception e) {
69                     LOG.warn("Error while registering metric.", e);
70                 }
71                 try {
72                     if (metric instanceof View) {
73                         if (viewUpdater != null) {
74                             viewUpdater.notifyOfRemovedView((View) metric);
75                         }
76                     }
77                 } catch (Exception e) {
78                     LOG.warn("Error while registering metric.", e);
79                 }
80             }
81         }
82     }

register()方法和unregister()方法基本相似

行7 獲取同步鎖. 鎖對象不再是this,而是new Object().這樣做,方便拓展第二個鎖.

行11~23 向所有下屬的MetricReporter添加該Metric

行24~30 向MetricQueryService添加該Metric

MetricQueryService是個actor,它會將Metric序列化,然後寫入到output stream

行31~40 如果Metric實現了View接口,那麽在viewUpdater中註冊這個Metric

Metric類實現View接口後,可以按設定時間間隔來更新這個Metric(由viewUpdater來執行update)

MetricReporter

MetricReporter用於把Metric導出到外部backend.

外部backend的參數可在conf/flink-conf.yaml中設定.

可同時設定多個外部backend.

MetricReporter接口

 1 public interface MetricReporter {
 2 
 3     // ------------------------------------------------------------------------
 4     //  life cycle
 5     // ------------------------------------------------------------------------
 6 
 8     void open(MetricConfig config); //
 9 
10     void close();
11 
12     // ------------------------------------------------------------------------
13     //  adding / removing metrics
14     // ------------------------------------------------------------------------
15 
16     void notifyOfAddedMetric(Metric metric, String metricName, MetricGroup group);
17 
18 
19     void notifyOfRemovedMetric(Metric metric, String metricName, MetricGroup group);
20 }

行8 配置這個Reporter.

由於reporter的構造器是無參的,這個方法用於初始化reporter的域.

這個方法總是在對象構造後調用

行10 關閉這個Reporter.

應該在這個方法中關閉 channels,streams以及釋放資源.

行16,19 增刪metrics

常規的reporter類還需要實現Scheduled接口用於報告當前的measurements

1 public interface Scheduled {
2 
3     void report();
4 }

行3 由metric registry定期地調用report()方法,來報告當前的measurements

深入理解Flink ---- Metrics的內部結構