1. 程式人生 > 實用技巧 >kafka學習總結017 --- consumer配置引數之max.poll.interval.ms

kafka學習總結017 --- consumer配置引數之max.poll.interval.ms

max.poll.interval.ms引數用於指定consumer兩次poll的最大時間間隔(預設5分鐘),如果超過了該間隔consumer client會主動向coordinator發起LeaveGroup請求,觸發rebalance;然後consumer重新發送JoinGroup請求

示例如下:

1. 配置max.poll.interval.ms為1000ms

public static KafkaConsumer<String, String> createConsumer() {
    Properties properties = new Properties();
    properties.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVER);
    properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.
class); properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); properties.put(ConsumerConfig.GROUP_ID_CONFIG, "group1"); properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); properties.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG,
10000); return new KafkaConsumer<>(properties); }

2.消費者執行緒sleep 11000ms,模擬兩次poll時間間隔超過max.poll.interval.ms

@Slf4j
public class MyConsumer {

    public static void main(String[] args) throws InterruptedException {
        KafkaConsumer<String, String> consumer = KafkaTestUtil.createConsumer();
        consumer.subscribe(Collections.singleton(KafkaTestContants.FIRST_TOPIC));
        
while (true) { ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(2)); for (ConsumerRecord<String, String> record : records) { log.warn("consume msg: topic={}, partition={}, offset={}, msg={}", record.topic(), record.partition(), record.offset(), record.value()); } Thread.sleep(11000); } } }

3.執行結果

09:34:46,858 [kafka-coordinator-heartbeat-thread | group1] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-group1-1, groupId=group1] Member consumer-group1-1-da40b0d7-9dda-484d-9722-b512bf351c56 sending LeaveGroup request to coordinator 192.168.1.8:9092 (id: 2147483645 rack: null) due to consumer poll timeout has expired. This means the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time processing messages. You can address this either by increasing max.poll.interval.ms or by reducing the maximum size of batches returned in poll() with max.poll.records.
09:34:47,979 [main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-group1-1, groupId=group1] Failing OffsetCommit request since the consumer is not part of an active group
09:34:47,979 [main] WARN org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-group1-1, groupId=group1] Synchronous auto-commit of offsets {topic1-1=OffsetAndMetadata{offset=0, leaderEpoch=null, metadata=''}, topic1-0=OffsetAndMetadata{offset=4, leaderEpoch=0, metadata=''}} failed: Offset commit cannot be completed since the consumer is not part of an active group for auto partition assignment; it is likely that the consumer was kicked out of the group.
09:34:47,979 [main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-group1-1, groupId=group1] Giving away all assigned partitions as lost since generation has been reset,indicating that consumer is no longer part of the group
09:34:47,979 [main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-group1-1, groupId=group1] Lost previously assigned partitions topic1-1, topic1-0
09:34:47,979 [main] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-group1-1, groupId=group1] (Re-)joining group
09:34:48,058 [main] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-group1-1, groupId=group1] Join group failed with org.apache.kafka.common.errors.MemberIdRequiredException: The group member needs to have a valid member id before actually entering a consumer group
09:34:48,058 [main] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-group1-1, groupId=group1] (Re-)joining group
09:34:48,058 [main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-group1-1, groupId=group1] Finished assignment for group at generation 22: {consumer-group1-1-32cfa301-a26c-4f3a-b270-9bb1bbc51fef=Assignment(partitions=[topic1-0, topic1-1])}
09:34:48,058 [main] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-group1-1, groupId=group1] Successfully joined group with generation 22

4.使用建議

(1)實際應用中,消費到的資料處理時長不宜超過max.poll.interval.ms,否則會觸發rebalance

(2)如果處理消費到的資料耗時,可以嘗試通過減小max.poll.records的方式減小單次拉取的記錄數(預設是500條)