1. 程式人生 > >Kafka消費異常處理

Kafka消費異常處理

異常

org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records.
	at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:775)
	at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:726)

異常的主要資訊:

a) CommitFailedException

b) Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing

. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records.

其實如果我們對其中的引數,或是對消費的機制比較瞭解,這個問題就很好解決。當我看到這個異常,我很開心,因為我知道我能通過此異常瞭解一下Kafka Consumer 消費訊息的大致過程。心態是好的~~~

其實現在看這個異常是說:該Consumer不能提交offset了,因為它已經出局了,是因為你的處理小時時間長於你要報告給server的時間。同時還告訴我們怎麼處理:要麼增加超時時間,要麼減少每次poll回來的訊息個數。

主要問題在於,何為session timeout?maximum size of batches?poll(timeout)中timeout什麼意思?

處理過程

版本:1.1.0

有效資訊:


換成通俗易懂的人話:

poll() API 主要是判斷consumer是否還活著,只要我們持續呼叫poll(),消費者就會存活在自己所在的group中,並且持續的消費指定partition的訊息。底層是這麼做的:消費者向server持續傳送心跳,如果一個時間段(session.timeout.ms)consumer掛掉或是不能傳送心跳,這個消費者會被認為是掛掉了,這個Partition也會被重新分配給其他consumer

下邊這個例子如果理解不上,請通讀全文後,再回來理解一下筆者的意思

------------------------------------分割線------------------------------------

背景:你是個搬磚的,同時還是個瓦工,en….你還有個兒子上述錯誤就是:工頭命令每個碼農(consumer)最多10分鐘把一個100塊轉運到目的地並把搬來的磚壘房子,然後回來接著取磚、壘房子。問題在於,你搬了100塊磚走了,但是10分鐘過去了,你還沒回來,那我怎麼知道你是不是偷懶睡覺去了,工頭就把這個搬磚壘房子的活分給同在一起幹活的其他人了(同group不同consumer)。其實你可能沒有偷懶,是因為你太追求完美了(估計是處女座,或是壘自家的房子),壘房子的時間很長(spending too much time message processing),10分鐘內沒能回來向工頭報道,這時,你就得和工頭商量,兩種辦法:1、能不能15分鐘內回來就行,2、10分鐘內回來,但每次搬80塊磚來壘房子。如果老闆是個比較有控制慾的人,對於第二中辦法,同樣的工作量,你無非是多跑幾趟。還能很好的控制你;但是對於第一種辦法,老闆是不願意的,為什麼,因為和你一起搬磚的還有其他人,他可以協調(rebalance)其他5分鐘就回來的人來幹你的活。你告訴他15分鐘對於工頭來說是相對不可控的。當然你還有兩位一種辦法,你可以找你兒子來搬磚(另起一個執行緒),你來壘房子,等你壘完了100塊轉,你兒子去告訴工頭,並搬回下一個100塊轉。但是要注意有一個問題,就是你兒子不能在你還沒有壘完上一個100塊轉前就報告給工頭,去獲取下一批100塊轉。這樣你就處理不過來了。

------------------------------------分割線------------------------------------


通過上邊的例子,我們大致清楚了max.poll.interval.ms?maximum size of batches?

max.poll.interval.ms:消費者最大心跳時間間隔

maximum size of batches:消費者每次獲取訊息的個數

什麼時候傳送心跳呢?是poll()方法被呼叫傳送心跳嗎?那poll(timeout)中timeout是什麼意思呢?

官網對poll(timeout)中timeout的解釋如下:

Parameters:
timeout - The time, in milliseconds, spent waiting in poll if data is not available in the buffer. If 0, returns immediately with any records that are available currently in the buffer, else returns empty. Must not be negative.

這個我費了很大力氣都沒有給它翻譯成人話……

怎麼辦?看原始碼?大致看了下,但是水平有限。。。真的不知道什麼時候傳送心跳。那就剩下最後一招了(殺手鐗)---寫例子驗證

驗證

1、producer

public class ProducerTest {
    @Test
    public void TestPro() {
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("acks", "all");
        props.put("retries", 0);
        props.put("batch.size", 16384);
        props.put("linger.ms", 1);
        props.put("buffer.memory", 33554432);
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        Producer<String, String> producer = new KafkaProducer<String, String>(props);
        for (int i = 0; i < 30; i++)
            producer.send(new ProducerRecord<String, String>("user_behavior", Integer.toString(i), "hello-"+i));

        producer.close();
    }
}

2、consumer

public class ConsumerTest {
    @Test
    public void TestCon() throws InterruptedException {
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("group.id", "test");
        props.put("enable.auto.commit", "false");
        props.put("auto.offset.reset", "earliest");
        props.put("max.poll.records", 5);
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

        KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
        consumer.subscribe(Arrays.asList("user_behavior"));
        int i = 0;
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(3000);
            System.out.println("polls out: " + ++i + "time: " + KafkaHelper.timestmp2date(System.currentTimeMillis()));

            for (ConsumerRecord<String, String> record : records) {
                System.out.printf("time = %s, partition = %s, offset = %d, key = %s, value = %s%n",
                        KafkaHelper.timestmp2date(record.timestamp()),
                        record.partition(),
                        record.offset(),
                        record.key(),
                        record.value());
            }
            consumer.commitSync();
        }
    }
}

a)測試poll中的引數作用

直接啟動Consumer列印結果:

polls out: 1time: 2018-06-13 15:25:19
polls out: 2time: 2018-06-13 15:25:22
polls out: 3time: 2018-06-13 15:25:25
polls out: 4time: 2018-06-13 15:25:28

一開始我錯誤以為:這個timeout是Consumer每次拉去訊息的時間間隔

但我啟動了Producer後,列印結果:

polls out: 1time: 2018-06-13 15:27:40
polls out: 2time: 2018-06-13 15:27:43
polls out: 3time: 2018-06-13 15:27:46
polls out: 4time: 2018-06-13 15:27:49
polls out: 5time: 2018-06-13 15:27:52
time = 2018-06-13 15:27:52, partition = 0, offset = 503, key = 1, value = hello-1
time = 2018-06-13 15:27:52, partition = 0, offset = 504, key = 5, value = hello-5
polls out: 6time: 2018-06-13 15:27:52
time = 2018-06-13 15:27:52, partition = 1, offset = 157, key = 4, value = hello-4
time = 2018-06-13 15:27:52, partition = 2, offset = 129, key = 0, value = hello-0
time = 2018-06-13 15:27:52, partition = 2, offset = 130, key = 2, value = hello-2
time = 2018-06-13 15:27:52, partition = 2, offset = 131, key = 3, value = hello-3
polls out: 7time: 2018-06-13 15:27:55
polls out: 8time: 2018-06-13 15:27:58
polls out: 9time: 2018-06-13 15:28:01
polls out: 10time: 2018-06-13 15:28:04

由此可見,第5次和第6次呼叫poll方法的時間相同。

至此,結合官網的描述對poll(timeout) 的timeout引數認識如下

如果訊息佇列中沒有訊息,等待timeout毫秒後,呼叫poll()方法。如果佇列中有訊息,立即消費訊息,每次消費的訊息的多少可以通過max.poll.records配置。

b)測試max.poll.interval.ms

public class ConsumerTest {
    @Test
    public void TestCon() throws InterruptedException {
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("group.id", "test");
        props.put("enable.auto.commit", "false");
        props.put("auto.offset.reset", "earliest");
        props.put("max.poll.records", 5);
        props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, "1000");

        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

        KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
        consumer.subscribe(Arrays.asList("user_behavior"));
        int i = 0;

        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(3000);
            System.out.println("polls out: " + ++i + "time: " + KafkaHelper.timestmp2date(System.currentTimeMillis()));

            for (ConsumerRecord<String, String> record : records) {
                System.out.printf("time = %s, partition = %s, offset = %d, key = %s, value = %s%n",
                        KafkaHelper.timestmp2date(record.timestamp()),
                        record.partition(),
                        record.offset(),
                        record.key(),
                        record.value());
            }
            consumer.commitSync();
        }
    }
}

啟動Consumer、Producer執行正常不報錯

polls out: 1time: 2018-06-13 15:53:07
polls out: 2time: 2018-06-13 15:53:07
time = 2018-06-13 15:53:07, partition = 1, offset = 158, key = 4, value = hello-4
time = 2018-06-13 15:53:07, partition = 0, offset = 505, key = 1, value = hello-1
time = 2018-06-13 15:53:07, partition = 0, offset = 506, key = 5, value = hello-5
time = 2018-06-13 15:53:07, partition = 2, offset = 132, key = 0, value = hello-0
time = 2018-06-13 15:53:07, partition = 2, offset = 133, key = 2, value = hello-2
polls out: 3time: 2018-06-13 15:53:07
time = 2018-06-13 15:53:07, partition = 2, offset = 134, key = 3, value = hello-3
polls out: 4time: 2018-06-13 15:53:10
polls out: 5time: 2018-06-13 15:53:13

想到異常裡提到的處理訊息時間過長(spending too much time message processing)

Consumer程式碼增加處理時間

public class ConsumerTest {
    @Test
    public void TestCon() throws InterruptedException {
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("group.id", "test");
        props.put("enable.auto.commit", "false");
        props.put("auto.offset.reset", "earliest");
        props.put("max.poll.records", 5);
        props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, "1000");

        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

        KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
        consumer.subscribe(Arrays.asList("user_behavior"));
        int i = 0;
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(3000);
            System.out.println("polls out: " + ++i + "time: " + KafkaHelper.timestmp2date(System.currentTimeMillis()));

            for (ConsumerRecord<String, String> record : records) {
                System.out.printf("time = %s, partition = %s, offset = %d, key = %s, value = %s%n",
                        KafkaHelper.timestmp2date(record.timestamp()),
                        record.partition(),
                        record.offset(),
                        record.key(),
                        record.value());
                TimeUnit.SECONDS.sleep(2);
            }
            consumer.commitSync();
        }
    }
}
polls out: 1time: 2018-06-13 15:59:13
polls out: 2time: 2018-06-13 15:59:16
polls out: 3time: 2018-06-13 15:59:19
polls out: 4time: 2018-06-13 15:59:22
polls out: 5time: 2018-06-13 15:59:22
time = 2018-06-13 15:59:22, partition = 2, offset = 135, key = 0, value = hello-0
time = 2018-06-13 15:59:22, partition = 2, offset = 136, key = 2, value = hello-2
time = 2018-06-13 15:59:22, partition = 2, offset = 137, key = 3, value = hello-3

org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records.

	at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.sendOffsetCommitRequest(ConsumerCoordinator.java:713)
	at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:596)
	...

久違的異常終於出現了

至此可以看出max.poll.interval.ms是獲取訊息後,處理這些訊息所用時間不能超過該值。即:兩次poll的時間間隔最大時間

那麼對於何時傳送心跳想必也是在呼叫poll(timeout)方法的時候傳送的(猜測),因為超過了max.poll.interval,ms後,這個consumer就被視為掛了。

ps:sleep時間改為0.5秒也會拋異常,因為每次poll5條訊息,處理時間2.5s>max.poll.interval,ms=1000ms

異常解決

a)調大max.poll.interval,ms,預設300000(300s)

b)調小max.poll.records,預設500

c)另起執行緒

後續:

寫一個單獨處理的message的執行緒,這樣消費和處理分開就不會出現此異常。但要注意處理完一批訊息後才能提交offset,然後進行下次的poll(會用到CountDownLatch)

總結:

遇到Exception要淡定,每個Exception搞清楚緣由後都是一次提高的過程。

Ps: 程式碼中包含<strong>標籤是因為我想在更改出加粗,生成後就多了<strong>標籤了。