1. 程式人生 > 實用技巧 >Flink實戰(七十一):監控(三)自定義metrics相關指標(一)

Flink實戰(七十一):監控(三)自定義metrics相關指標(一)

0 簡介

User-defined Metrics

除了系統的 Metrics 之外,Flink 支援自定義 Metrics ,即 User-defined Metrics。上文說的都是系統框架方面,對於自己的業務邏輯也可以用 Metrics 來暴露一些指標,以便進行監控。

User-defined Metrics 現在提及的都是 datastream 的 API,table、sql 可能需要 context 協助,但如果寫 UDF,它們其實是大同小異的。

Datastream 的 API 是繼承 RichFunction ,繼承 RichFunction 才可以有 Metrics 的介面。然後通過 RichFunction 會帶來一個 getRuntimeContext().getMetricGroup().addGroup(…) 的方法,這裡就是 User-defined Metrics 的入口。通過這種方式,可以自定義 user-defined Metric Group。如果想定義具體的 Metrics,同樣需要用getRuntimeContext().getMetricGroup().counter/gauge/meter/histogram(…) 方法,它會有相應的建構函式,可以定義到自己的 Metrics 型別中。

繼承 RichFunction
    •Register user-defined Metric Group: getRuntimeContext().getMetricGroup().addGroup(…)
    •Register user-defined Metric: getRuntimeContext().getMetricGroup().counter/gauge/meter/histogram(…)

User-defined Metrics Example

下面通過一段簡單的例子說明如何使用 Metrics。比如,定義了一個 Counter 傳一個 name,Counter 預設的型別是 single counter(Flink 內建的一個實現),可以對 Counter 進行 inc()操作,並在程式碼裡面直接獲取。

Meter 也是這樣,Flink 有一個內建的實現是 Meterview,因為 Meter 是多長時間內發生事件的記錄,所以它是要有一個多長時間的視窗。平常用 Meter 時直接 markEvent(),相當於加一個事件不停地打點,最後用 getrate() 的方法直接把這一段時間發生的事件除一下給算出來。

Gauge 就比較簡單了,把當前的時間打出來,用 Lambda 表示式直接把 System::currentTimeMillis 打進去就可以,相當於每次呼叫的時候都會去真正調一下系統當天時間進行計算。

Histogram 稍微複雜一點,Flink 中程式碼提供了兩種實現,在此取一其中個實現,仍然需要一個視窗大小,更新的時候可以給它一個值。

這些 Metrics 一般都不是執行緒安全的。如果想要用多執行緒,就需要加同步,更多詳情請參考下面連結。

•Counter processedCount = getRuntimeContext().getMetricGroup().counter("processed_count");
  processedCount.inc();
•Meter processRate = getRuntimeContext().getMetricGroup().meter("rate", new MeterView(60));
  processRate.markEvent();
•getRuntimeContext().getMetricGroup().gauge("current_timestamp", System::currentTimeMillis);
•Histogram histogram = getRuntimeContext().getMetricGroup().histogram("histogram", new DescriptiveStatisticsHistogram(1000));
  histogram.update(1024);
•[https://ci.apache.org/projects/flink/flink-docs-release-1.8/monitoring/metrics.html#metric-types]

1 例項

例項一:

Counter:
用與儲存數值型別,比如統計資料輸入、輸出總數量。

public class MyMapper extends RichMapFunction<String, String> {
  private transient Counter counter;

  @Override
  public void open(Configuration config) {
    this.counter = getRuntimeContext()
      .getMetricGroup()
      .counter("myCounter");
  }

  @Override
  public String map(String value) throws Exception {
    this.counter.inc();
    return value;
  }
}

例項二:

Gauge:
可以用來儲存任何型別,前提要實現org.apache.flink.metrics.Gauge介面,重寫getValue方法,如果返回型別為Object則該類需要重寫toString方法。

有些場景下,需要根據業務計算出指標,則Gauge使用起來更靈活。

public class MyMapper extends RichMapFunction<String, String> {
  private transient int valueToExpose = 0;

  @Override
  public void open(Configuration config) {
    getRuntimeContext()
      .getMetricGroup()
      .gauge("MyGauge", new Gauge<Integer>() {
        @Override
        public Integer getValue() {
          return valueToExpose;
        }
      });
  }

  @Override
  public String map(String value) throws Exception {
    valueToExpose++;
    return value;
  }
}