kafka效能監控之KafkaMetrics Sensor
package kafka.metrics
package org.apache.kafka.common.metrics
metrics = new Metrics(metricConfig, reporters, kafkaMetricsTime, true) quotaManagers = QuotaFactory.instantiate(config, metrics, time)初始化了一個Metrics,並將這個例項傳到quotaManagers的建構函式中,這裡簡單介紹一下quotaManagers.這是kafka中用來限制kafka,producer的傳輸速度的,比如在config檔案下設定producer不能以超過5MB/S的速度傳輸資料,那麼這個限制就是通過quotaManager來實現的.
public class Metrics implements Closeable { .... .... private final ConcurrentMap<MetricName, KafkaMetric> metrics; private final ConcurrentMap<String, Sensor> sensors;metrics與sensors這兩個concurrentMap是Metrics中兩個重要的成員屬性.那麼什麼是KafkaMetric,什麼是Sensor呢?
public interface Metric { /** * A name for this metric */ public MetricName metricName(); /** * The value of the metric */ public double value(); }那麼KafkaMetric又是如何實現value()方法的呢?
@Override public double value() { synchronized (this.lock) { return value(time.milliseconds()); } } double value(long timeMs) { return this.measurable.measure(config, timeMs); }原來value()是通過kafkaMetric中的另一個成員屬性measurable完成
public interface Measurable { /** * Measure this quantity and return the result as a double * @param config The configuration for this metric * @param now The POSIX time in milliseconds the measurement is being taken * @return The measured value */ public double measure(MetricConfig config, long now); }其實這邊挺繞的,Metrics有kafkaMetric的成員變數,而kafkaMetric又通過Measurable返回要檢測的值.打個比方,Metrics好比是汽車的儀表盤,kafkaMetric就是儀表盤上的一個儀表,Measurable就是對真正要檢測的元件的一個封裝.來看看一個Measrable的簡單實現,在sender.java類中.
metrics.addMetric(m, new Measurable() { public double measure(MetricConfig config, long now) { return (now - metadata.lastSuccessfulUpdate()) / 1000.0; } });可以看到measure的實現就是簡單地返回要返回的值,因為是直接在目標類中定義的,所以可以直接獲得相應變數的引用.
private final ConcurrentMap<String, Sensor> sensors;
/** * A sensor applies a continuous sequence of numerical values to a set of associated metrics. For example a sensor on * message size would record a sequence of message sizes using the {@link #record(double)} api and would maintain a set * of metrics about request sizes such as the average or max. */ public final class Sensor { //一個kafka就只有一個Metrics例項,這個registry就是對這個Metrics的引用 private final Metrics registry; private final String name; private final Sensor[] parents; private final List<Stat> stats; private final List<KafkaMetric> metrics;這一段的註釋很有意義,從註釋中可以看到Sensor的作用不同KafkaMetric. KafkaMetric僅僅是返回某一個引數的值,而Sensor有基於某一引數時間序列進行統計的功能,比如平均值,最大值,最小值.那這些統計又是如何實現的呢?答案是List<Stat> stats這個屬性成員.
public interface Stat { /** * Record the given value * @param config The configuration to use for this metric * @param value The value to record * @param timeMs The POSIX time in milliseconds this value occurred */ public void record(MetricConfig config, double value, long timeMs); }可以看到Stat是一個介面,其中有一個record方法可以記錄一個取樣數值,下面看一個例子,max這個功能如何用Stat來實現?
public final class Max extends SampledStat { public Max() { super(Double.NEGATIVE_INFINITY); } @Override protected void update(Sample sample, MetricConfig config, double value, long now) { sample.value = Math.max(sample.value, value); } @Override public double combine(List<Sample> samples, MetricConfig config, long now) { double max = Double.NEGATIVE_INFINITY; for (int i = 0; i < samples.size(); i++) max = Math.max(max, samples.get(i).value); return max; } }是不是很簡單,update相當於冒一次泡,把當前的值與歷史的最大值比較.combine相當於用一次完整的氣泡排序找出最大值,需要注意的是,max是繼承SampleStat的,而SampleStat是Stat介面的實現類.那我們回到Sensor類上來.
public void record(double value, long timeMs) { this.lastRecordTime = timeMs; synchronized (this) { // increment all the stats for (int i = 0; i < this.stats.size(); i++) this.stats.get(i).record(config, value, timeMs); checkQuotas(timeMs); } for (int i = 0; i < parents.length; i++) parents[i].record(value, timeMs); }record方法,每個註冊於其中的stats提交值,同時如果自己有父sensor的話,向父sensor提交.
public void checkQuotas(long timeMs) { for (int i = 0; i < this.metrics.size(); i++) { KafkaMetric metric = this.metrics.get(i); MetricConfig config = metric.config(); if (config != null) { Quota quota = config.quota(); if (quota != null) { double value = metric.value(timeMs); if (!quota.acceptable(value)) { throw new QuotaViolationException( metric.metricName(), value, quota.bound()); } } } } }checkQuotas,通過這裡其實是遍歷註冊在sensor上的每一個KafkaMetric來檢查他們的值有沒有超過config檔案中設定的配額.注意這裡的QuotaVioLationException,是不是很熟悉.在QuatoManager中,如果有一個client的上傳/下載速度超過指定配額.那麼就會丟擲這個警告.
try { clientSensors.quotaSensor.record(value) // trigger the callback immediately if quota is not violated callback(0) } catch { case qve: QuotaViolationException => // Compute the delay val clientMetric = metrics.metrics().get(clientRateMetricName(clientQuotaEntity.sanitizedUser, clientQuotaEntity.clientId)) throttleTimeMs = throttleTime(clientMetric, getQuotaMetricConfig(clientQuotaEntity.quota)) clientSensors.throttleTimeSensor.record(throttleTimeMs) // If delayed, add the element to the delayQueue delayQueue.add(new ThrottledResponse(time, throttleTimeMs, callback)) delayQueueSensor.record() logger.debug("Quota violated for sensor (%s). Delay time: (%d)".format(clientSensors.quotaSensor.name(), throttleTimeMs)) }這裡就很好理解了,向clientSensor提交上傳,下載的值,如果成功了,就掉用相應的callback,如果失敗了catch的就是QuotaViolationException.
class ExpireSensorTask implements Runnable { public void run() { for (Map.Entry<String, Sensor> sensorEntry : sensors.entrySet()) { // removeSensor also locks the sensor object. This is fine because synchronized is reentrant // There is however a minor race condition here. Assume we have a parent sensor P and child sensor C. // Calling record on C would cause a record on P as well. // So expiration time for P == expiration time for C. If the record on P happens via C just after P is removed, // that will cause C to also get removed. // Since the expiration time is typically high it is not expected to be a significant concern // and thus not necessary to optimize synchronized (sensorEntry.getValue()) { if (sensorEntry.getValue().hasExpired()) { log.debug("Removing expired sensor {}", sensorEntry.getKey()); removeSensor(sensorEntry.getKey()); } } } }