1. 程式人生 > 其它 >如何監控 Kafka 消費者組消費進度?

如何監控 Kafka 消費者組消費進度?

技術標籤:Kafka

如何監控 Kafka 消費者組消費進度?

對於 Kafka 消費者來說,最重要的事情就是監控它們的消費進度了,或者說是監控它們消費的滯後程度。這個滯後程度有個專門的名稱:消費者 Lag 或 Consumer Lag。

所謂滯後程度,就是指消費者當前落後於生產者的程度。比方說,Kafka 生產者向某主題成功生產了 100 萬條訊息,你的消費者當前消費了 80 萬條訊息,那麼我們就說你的消費者滯後了 20 萬條訊息,即 Lag 等於 20 萬。

通常來說,Lag 的單位是訊息數,而且我們一般是在主題這個級別上討論 Lag 的,但實際上,Kafka 監控 Lag 的層級是在分割槽上的。如果要計算主題級別的,你需要手動彙總所有主題分割槽的 Lag,將它們累加起來,合併成最終的 Lag 值。

對消費者而言,Lag 應該算是最最重要的監控指標了。它直接反映了一個消費者的執行情況。一個正常工作的消費者,它的 Lag 值應該很小,甚至是接近於 0 的,這表示該消費者能夠及時地消費生產者生產出來的訊息,滯後程度很小。反之,如果一個消費者 Lag 值很大,通常就表明它無法跟上生產者的速度,最終 Lag 會越來越大,從而拖慢下游訊息的處理速度。

更可怕的是,由於消費者的速度無法匹及生產者的速度,極有可能導致它消費的資料已經不在作業系統的頁快取中了,那麼這些資料就會失去享有 Zero Copy 技術的資格。這樣的話,消費者就不得不從磁碟上讀取它們,這就進一步拉大了與生產者的差距,進而出現馬太效應,即那些 Lag 原本就很大的消費者會越來越慢,Lag 也會越來越大。

鑑於這些原因,你在實際業務場景中必須時刻關注消費者的消費進度。一旦出現 Lag 逐步增加的趨勢,一定要定位問題,及時處理,避免造成業務損失。

既然消費進度這麼重要,我們應該怎麼監控它呢?簡單來說,有 3 種方法。

  1. 使用 Kafka 自帶的命令列工具 kafka-consumer-groups 指令碼。
  2. 使用 Kafka Java Consumer API 程式設計。
  3. 使用 Kafka 自帶的 JMX 監控指標。

Kafka 自帶命令

使用 Kafka 自帶的命令列工具 bin/kafka-consumer-groups.sh 。**kafka-consumer-groups 指令碼是 Kafka 為我們提供的最直接的監控消費者消費進度的工具。**它不僅能監控管理消費者組,也可以監控管理獨立消費者。獨立消費者就是沒有使用消費者組機制的消費者程式

。和消費者組相同的是,它們也要配置 group.id 引數值,但和消費者組呼叫 KafkaConsumer.subscribe() 不同的是,獨立消費者呼叫 KafkaConsumer.assign() 方法直接消費指定分割槽。

使用 kafka-consumer-groups 指令碼很簡單。該指令碼位於 Kafka 安裝目錄的 bin 子目錄下,我們可以通過下面的命令來檢視某個給定消費者的 Lag 值:

bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group  消費組名稱

輸出如下:

它會按照消費者組訂閱主題的分割槽進行展示,每個分割槽一行資料;

其次,除了主題、分割槽等資訊外,它會彙報每個分割槽當前最新生產的訊息的位移值(即 LOG-END-OFFSET 列值)、該消費者組當前最新消費訊息的位移值(即 CURRENT-OFFSET 值)、LAG 值(前兩者的差值)、消費者例項 ID、消費者連線 Broker 的主機名以及消費者的 CLIENT-ID 資訊。

圖中每個分割槽的 LAG 值大約都是 60 多萬,這表明,在我的這個測試中,消費者組遠遠落後於生產者的進度。理想情況下,我們希望該列所有值都是 0,因為這才表明我的消費者完全沒有任何滯後。

Kafka Java Consumer API

Java Consumer API 分別提供了查詢當前分割槽最新訊息位移和消費者組最新消費訊息位移兩組方法,我們使用它們就能計算出對應的 Lag。程式碼如下:

public static Map<TopicPartition, Long> lagOf(String groupID, String bootstrapServers) throws TimeoutException {
    Properties props = new Properties();
    props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
    try (AdminClient client = AdminClient.create(props)) {
        ListConsumerGroupOffsetsResult result = client.listConsumerGroupOffsets(groupID);
        try {
            Map<TopicPartition, OffsetAndMetadata> consumedOffsets = result.partitionsToOffsetAndMetadata().get(10, TimeUnit.SECONDS);
            props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); // 禁止自動提交位移
            props.put(ConsumerConfig.GROUP_ID_CONFIG, groupID);
            props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
            props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
            try (final KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props)) {
                Map<TopicPartition, Long> endOffsets = consumer.endOffsets(consumedOffsets.keySet());
                return endOffsets.entrySet().stream().collect(Collectors.toMap(entry -> entry.getKey(),
                                                                               entry -> entry.getValue() - consumedOffsets.get(entry.getKey()).offset()));
            }
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            // 處理中斷異常
            // ...
            return Collections.emptyMap();
        } catch (ExecutionException e) {
            // 處理 ExecutionException
            // ...
            return Collections.emptyMap();
        } catch (TimeoutException e) {
            throw new TimeoutException("Timed out when getting lag for consumer group " + groupID);
        }
    }
}

程式碼中,只需要關注這三行程式碼即可:

  • client.listConsumerGroupOffsets(groupID);

    呼叫 AdminClient.listConsumerGroupOffsets 方法獲取給定消費者組的最新消費訊息的位移;

  • consumer.endOffsets(consumedOffsets.keySet());

    獲取訂閱分割槽的最新訊息位移;

  • entry -> entry.getValue() - consumedOffsets.get(entry.getKey()).offset()));

    執行相應的減法操作,獲取 Lag 值並封裝進一個 Map 物件。

Kafka JMX監控指標

上面這兩種方式,都可以很方便地查詢到給定消費者組的 Lag 資訊。但在很多實際監控場景中,我們藉助的往往是現成的監控框架。如果是這種情況,以上這兩種辦法就不怎麼管用了,因為它們都不能整合進已有的監控框架中,如 Zabbix 或 Grafana。下面我們就來看第三種方法,使用 Kafka 預設提供的 JMX 監控指標來監控消費者的 Lag 值。

當前,Kafka 消費者提供了一個名為 kafka.consumer:type=consumer-fetch-manager-metrics,client-id=“{client-id}”的 JMX 指標,裡面有很多屬性。和我們今天所講內容相關的有兩組屬性:records-lag-max 和 records-lead-min,它們分別表示此消費者在測試視窗時間內曾經達到的最大的 Lag 值和最小的 Lead 值。

Lag 值的含義我們已經反覆講過了,我就不再重複了。這裡的 Lead 值是指消費者最新消費訊息的位移與分割槽當前第一條訊息位移的差值。很顯然,Lag 和 Lead 是一體的兩個方面:Lag 越大的話,Lead 就越小,反之也是同理

試想一下,監控到 Lag 越來越大,可能只會給你一個感受,那就是消費者程式變得越來越慢了,至少是追不上生產者程式了,除此之外,你可能什麼都不會做。畢竟,有時候這也是能夠接受的。但反過來,一旦你監測到 Lead 越來越小,甚至是快接近於 0 了,你就一定要小心了,這可能預示著消費者端要丟訊息了。

為什麼?我們知道 Kafka 的訊息是有留存時間設定的,預設是 1 周,也就是說 Kafka 預設刪除 1 周前的資料。倘若你的消費者程式足夠慢,慢到它要消費的資料快被 Kafka 刪除了,這時你就必須立即處理,否則一定會出現訊息被刪除,從而導致消費者程式重新調整位移值的情形。這可能產生兩個後果:一個是消費者從頭消費一遍資料,另一個是消費者從最新的訊息位移處開始消費,之前沒來得及消費的訊息全部被跳過了,從而造成丟訊息的假象。

這兩種情形都是不可忍受的,因此必須有一個 JMX 指標,清晰地表徵這種情形,這就是引入 Lead 指標的原因。所以,Lag 值從 100 萬增加到 200 萬這件事情,遠不如 Lead 值從 200 減少到 100 這件事來得重要。在實際生產環境中,請你一定要同時監控 Lag 值和 Lead 值