深入理解Flink ---- Metrics的內部結構
從Metrics的使用說起
Flink的Metrics種類有四種Counters
, Gauges
, Histograms和
Meters
.
如何使用Metrics呢? 以Counter為例,
1 public class MyMapper extends RichMapFunction<String, String> { 2 private transient Counter counter; 3 4 @Override 5 public void open(Configuration config) { 6 this.counter = getRuntimeContext()7 .getMetricGroup() 8 .counter("myCounter"); 9 } 10 11 @Override 12 public String map(String value) throws Exception { 13 this.counter.inc(); 14 return value; 15 } 16 }
行7 getMetricGroup()獲取MetricGroup
行8 從MetricGroup中獲取Metric實例
那麽,我們來探訪一下MetricGroup
Metric容器--MetricGroup
MetricGroup是Metric對象和metric subgroups的容器.
調用以下4個方法可以獲得Metric對象並調用addMetric()註冊這個Metric.
(AbstractMetricGroup.java)
1 public <C extends Counter> C counter(String name, C counter) 2 { 3 addMetric(name, counter); 4 return counter; 5 } 6 7 public <T, G extends Gauge<T>> G gauge(String name, G gauge) {8 addMetric(name, gauge); 9 return gauge; 10 } 11 12 public <H extends Histogram> H histogram(String name, H histogram) { 13 addMetric(name, histogram); 14 return histogram; 15 } 16 17 public <M extends Meter> M meter(String name, M meter) { 18 addMetric(name, meter); 19 return meter; 20 }
註,MetricGroup接口的另一個實現UnregisteredMetricsGroup僅僅返回Metric實例而不對Metric進行註冊
註2,MetricGroup接口的第三個實現ProxyMetricGroup有一個parent MetricGroup,ProxyMetricGroup所有的調用都轉發到parentMetricGroup上
(AbstractMetricGroup.java)重要的域
/** The registry that this metrics group belongs to. */ protected final MetricRegistry registry; /** All metrics that are directly contained in this group. */ private final Map<String, Metric> metrics = new HashMap<>(); /** All metric subgroups of this group. */ private final Map<String, AbstractMetricGroup> groups = new HashMap<>();/** Flag indicating whether this group has been closed. */ private volatile boolean closed;
(AbstractMetricGroup.java)
1 protected void addMetric(String name, Metric metric) { 2 if (metric == null) { 3 LOG.warn("Ignoring attempted registration of a metric due to being null for name {}.", name); 4 return; 5 } 6 // add the metric only if the group is still open 7 synchronized (this) { 8 if (!closed) { 9 // immediately put without a ‘contains‘ check to optimize the common case (no collision) 10 // collisions are resolved later 11 Metric prior = metrics.put(name, metric); 12 13 // check for collisions with other metric names 14 if (prior == null) { 15 // no other metric with this name yet 16 17 if (groups.containsKey(name)) { 18 // we warn here, rather than failing, because metrics are tools that should not fail the 19 // program when used incorrectly 20 LOG.warn("Name collision: Adding a metric with the same name as a metric subgroup: ‘" + 21 name + "‘. Metric might not get properly reported. " + Arrays.toString(scopeComponents)); 22 } 23 24 registry.register(metric, name, this); 25 } 26 else { 27 // we had a collision. put back the original value 28 metrics.put(name, prior); 29 30 // we warn here, rather than failing, because metrics are tools that should not fail the 31 // program when used incorrectly 32 LOG.warn("Name collision: Group already contains a Metric with the name ‘" + 33 name + "‘. Metric will not be reported." + Arrays.toString(scopeComponents)); 34 } 35 } 36 } 37 }
具體再來看一下addMetric()的代碼
行7 獲得互斥鎖
行8 檢測當前group是否close
行11~34 把要註冊的Metric對象添加到metrics map中
這裏一個小trick是,默認沒有key的沖突,直接把這個metric對象添加到map中.再回頭檢測是否有值被替換出來.這樣的做法可以優化性能(若沒有key沖突,減少了一次map尋址)
行24 在MetricRegister中註冊Metric,這個在下一節詳談
調用addGroup()可以添加subgroup
(AbstractMetricGroup.java)
1 private AbstractMetricGroup<?> addGroup(String name, ChildType childType) { 2 synchronized (this) { 3 if (!closed) { 4 // adding a group with the same name as a metric creates problems in many reporters/dashboards 5 // we warn here, rather than failing, because metrics are tools that should not fail the 6 // program when used incorrectly 7 if (metrics.containsKey(name)) { 8 LOG.warn("Name collision: Adding a metric subgroup with the same name as an existing metric: ‘" + 9 name + "‘. Metric might not get properly reported. " + Arrays.toString(scopeComponents)); 10 } 11 12 AbstractMetricGroup newGroup = createChildGroup(name, childType); 13 AbstractMetricGroup prior = groups.put(name, newGroup); 14 if (prior == null) { 15 // no prior group with that name 16 return newGroup; 17 } else { 18 // had a prior group with that name, add the prior group back 19 groups.put(name, prior); 20 return prior; 21 } 22 } 23 else { 24 // return a non-registered group that is immediately closed already 25 GenericMetricGroup closedGroup = new GenericMetricGroup(registry, this, name); 26 closedGroup.close(); 27 return closedGroup; 28 } 29 } 30 } 31 32 protected GenericMetricGroup createChildGroup(String name, ChildType childType) { 33 switch (childType) { 34 case KEY: 35 return new GenericKeyMetricGroup(registry, this, name); 36 default: 37 return new GenericMetricGroup(registry, this, name); 38 } 39 } 40 41 /** 42 * Enum for indicating which child group should be created. 43 * `KEY` is used to create {@link GenericKeyMetricGroup}. 44 * `VALUE` is used to create {@link GenericValueMetricGroup}. 45 * `GENERIC` is used to create {@link GenericMetricGroup}. 46 */ 47 protected enum ChildType { 48 KEY, 49 VALUE, 50 GENERIC 51 }
行2 獲取互斥鎖
行12~21 新建MetricGroup對象
註意,添加的subgroup的name與Metric對象的name相同會造成問題.
行25,35,37 同一個tree裏的MetricGroup對象使用同一個MetricRegister
行26 close MetricGroup
1 public void close() { 2 synchronized (this) { 3 if (!closed) { 4 closed = true; 5 6 // close all subgroups 7 for (AbstractMetricGroup group : groups.values()) { 8 group.close(); 9 } 10 groups.clear(); 11 12 // un-register all directly contained metrics 13 for (Map.Entry<String, Metric> metric : metrics.entrySet()) { 14 registry.unregister(metric.getValue(), metric.getKey(), this); 15 } 16 metrics.clear(); 17 } 18 } 19 }
行2 獲取互斥鎖
遞歸地close所有subgroups, 註銷所有metrics
MetricGroup中的addMetric(),addGroup(),close()以及上面未提到的getAllVariables()方法需要獲取互斥鎖
原因: 防止關閉group的同時添加metrics和subgroups造成的資源泄露.
MetricGroup另一個很重要的方法是public String getMetricIdentifier(String metricName, CharacterFilter filter, int reporterIndex).
作用是獲取某個Metric的唯一名作為標誌(identifier).
identifier分為3部分:System scope, User scope, Metric name
A.B.C 其中,A為System scope,B為User scope,C為Metric name, ‘.‘是分隔符
System Scope可在conf/flink-conf.yaml中定義.
User Scope就是groups tree, 可調用addGroup(String)來定義 (可定義多層group)
MetricGroup與MetricReporter之間的橋梁 -- MetricRegister
MetricRegistry追蹤所有已註冊的Metric.它作為MetricGroup和MetricReporter之間的橋梁.
在MetricGroup的addMetric()方法中調用了MetricRegister的register()方法:
registry.register(metric, name, this);
在MetricGroup的close()方法中調用了MetricRegister的unregister()方法:
registry.unregister(metric.getValue(), metric.getKey(), this);
1 // ------------------------------------------------------------------------ 2 // Metrics (de)registration 3 // ------------------------------------------------------------------------ 4 5 @Override 6 public void register(Metric metric, String metricName, AbstractMetricGroup group) { 7 synchronized (lock) { 8 if (isShutdown()) { 9 LOG.warn("Cannot register metric, because the MetricRegistry has already been shut down."); 10 } else { 11 if (reporters != null) { 12 for (int i = 0; i < reporters.size(); i++) { 13 MetricReporter reporter = reporters.get(i); 14 try { 15 if (reporter != null) { 16 FrontMetricGroup front = new FrontMetricGroup<AbstractMetricGroup<?>>(i, group); 17 reporter.notifyOfAddedMetric(metric, metricName, front); 18 } 19 } catch (Exception e) { 20 LOG.warn("Error while registering metric.", e); 21 } 22 } 23 } 24 try { 25 if (queryService != null) { 26 MetricQueryService.notifyOfAddedMetric(queryService, metric, metricName, group); 27 } 28 } catch (Exception e) { 29 LOG.warn("Error while registering metric.", e); 30 } 31 try { 32 if (metric instanceof View) { 33 if (viewUpdater == null) { 34 viewUpdater = new ViewUpdater(executor); 35 } 36 viewUpdater.notifyOfAddedView((View) metric); 37 } 38 } catch (Exception e) { 39 LOG.warn("Error while registering metric.", e); 40 } 41 } 42 } 43 } 44 45 @Override 46 public void unregister(Metric metric, String metricName, AbstractMetricGroup group) { 47 synchronized (lock) { 48 if (isShutdown()) { 49 LOG.warn("Cannot unregister metric, because the MetricRegistry has already been shut down."); 50 } else { 51 if (reporters != null) { 52 for (int i = 0; i < reporters.size(); i++) { 53 try { 54 MetricReporter reporter = reporters.get(i); 55 if (reporter != null) { 56 FrontMetricGroup front = new FrontMetricGroup<AbstractMetricGroup<?>>(i, group); 57 reporter.notifyOfRemovedMetric(metric, metricName, front); 58 } 59 } catch (Exception e) { 60 LOG.warn("Error while registering metric.", e); 61 } 62 } 63 } 64 try { 65 if (queryService != null) { 66 MetricQueryService.notifyOfRemovedMetric(queryService, metric); 67 } 68 } catch (Exception e) { 69 LOG.warn("Error while registering metric.", e); 70 } 71 try { 72 if (metric instanceof View) { 73 if (viewUpdater != null) { 74 viewUpdater.notifyOfRemovedView((View) metric); 75 } 76 } 77 } catch (Exception e) { 78 LOG.warn("Error while registering metric.", e); 79 } 80 } 81 } 82 }
register()方法和unregister()方法基本相似
行7 獲取同步鎖. 鎖對象不再是this,而是new Object().這樣做,方便拓展第二個鎖.
行11~23 向所有下屬的MetricReporter添加該Metric
行24~30 向MetricQueryService添加該Metric
MetricQueryService是個actor,它會將Metric序列化,然後寫入到output stream
行31~40 如果Metric實現了View接口,那麽在viewUpdater中註冊這個Metric
Metric類實現View接口後,可以按設定時間間隔來更新這個Metric(由viewUpdater來執行update)
MetricReporter
MetricReporter用於把Metric導出到外部backend.
外部backend的參數可在conf/flink-conf.yaml中設定.
可同時設定多個外部backend.
MetricReporter接口
1 public interface MetricReporter { 2 3 // ------------------------------------------------------------------------ 4 // life cycle 5 // ------------------------------------------------------------------------ 6 8 void open(MetricConfig config); // 9 10 void close(); 11 12 // ------------------------------------------------------------------------ 13 // adding / removing metrics 14 // ------------------------------------------------------------------------ 15 16 void notifyOfAddedMetric(Metric metric, String metricName, MetricGroup group); 17 18 19 void notifyOfRemovedMetric(Metric metric, String metricName, MetricGroup group); 20 }
行8 配置這個Reporter.
由於reporter的構造器是無參的,這個方法用於初始化reporter的域.
這個方法總是在對象構造後調用
行10 關閉這個Reporter.
應該在這個方法中關閉 channels,streams以及釋放資源.
行16,19 增刪metrics
常規的reporter類還需要實現Scheduled接口用於報告當前的measurements
1 public interface Scheduled { 2 3 void report(); 4 }
行3 由metric registry定期地調用report()方法,來報告當前的measurements
深入理解Flink ---- Metrics的內部結構