1. 程式人生 > >spring boot kafka 支援批量操作

spring boot kafka 支援批量操作

前言

最近有個專案存在kafka積壓情況,上去看了下,的確積壓挺厲害。
看了下程式碼,spring boot 是1.5.13.RELEASE版本,kafka使用的是spring boot的自動配置,@KafkaListener每次處理一條資料,每次邏輯中存在多次資料庫操作。
準備修改下邏輯,@KafkaListener批量處理資料,合併邏輯,並且批量操作資料庫,提高處理速度。

原有邏輯

1.pom新增配置

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
</dependency>

2.application.yml新增配置

spring:
    kafka:
        bootstrap-servers: 127.0.0.1:9092
        #生產者的配置,大部分我們可以使用預設的,這裡列出幾個比較重要的屬性
        producer:
            #每批次傳送訊息的數量
            batch-size: 16
            #設定大於0的值將使客戶端重新發送任何資料,一旦這些資料傳送失敗。注意,這些重試與客戶端接收到傳送錯誤時的重試沒有什麼不同。允許重試將潛在的改變資料的順序,如果這兩個訊息記錄都是傳送到同一個partition,則第一個訊息失敗第二個傳送成功,則第二條訊息會比第一條訊息出現要早。
            retries: 0
            #producer可以用來快取資料的記憶體大小。如果資料產生速度大於向broker傳送的速度,producer會阻塞或者丟擲異常,以“block.on.buffer.full”來表明。這項設定將和producer能夠使用的總記憶體相關,但並不是一個硬性的限制,因為不是producer使用的所有記憶體都是用於快取。一些額外的記憶體會用於壓縮(如果引入壓縮機制),同樣還有一些用於維護請求。
            buffer-memory: 33554432
            #key序列化方式
            key-serializer: org.apache.kafka.common.serialization.StringSerializer
            value-serializer: org.apache.kafka.common.serialization.StringSerializer
        #消費者的配置
        consumer:
            concurrency: 10
            #Kafka中沒有初始偏移或如果當前偏移在伺服器上不再存在時,預設區最新 ,有三個選項 【latest, earliest, none】
            auto-offset-reset: earliest
            #是否開啟自動提交
            enable-auto-commit: false
            ack-mode: MANUAL_IMMEDIATE
            #自動提交的時間間隔
            #            auto-commit-interval: 100
            #key的解碼方式
            key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
            #value的解碼方式
            value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
            group-id: test-consumer-group
          

3.java監聽邏輯

@KafkaListener(topics = { "topic" })
	public void listenPlayEnt(ConsumerRecord<?, ?> record) {
   // 處理邏輯
   ....
}

簡單思路

spring kafka實現

由於spring boot 自動配置KafkaAutoConfiguration和KafkaProperties中沒有找到批量操作相關的,轉向檢視依賴實現spring-kafka。

從 官方文件 https://docs.spring.io/spring-kafka/reference/html/_reference.html

中我們可以看出來批量需要以下程式碼實現。

@Configuration
@EnableKafka
public class KafkaConfig {

    @Bean
    KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>>
                        kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
                                new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        factory.setConcurrency(3);
        # 這個就是用來控制是否批量處理
        factory.setBatchListener(true);
        factory.getContainerProperties().setPollTimeout(3000);
        return factory;
    }

    @Bean
    public ConsumerFactory<Integer, String> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerConfigs());
    }

    @Bean
    public Map<String, Object> consumerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, embeddedKafka.getBrokersAsString());
        ...
        return props;
    }
}

但是,我人比較懶,不想大幅度修改kafka相關的配置,想要沿用spring boot kafka自動配置邏輯。

具體的過程不說了,簡單分析下KafkaAutoConfiguration邏輯,可以看到差別主要是在ConcurrentKafkaListenerContainerFactory這個bean。

spring boot kafka實現

1.重寫KafkaProperties物件

import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;

@ConfigurationProperties(prefix = "spring.kafka")
@Configuration("KafkaPropertiesExtra")
@Primary
public class KafkaPropertiesExtra extends KafkaProperties {

    private final ConsumerExtra consumerExtra = new ConsumerExtra();

    public static class ConsumerExtra {

        private Boolean batchListener = false;

        public Boolean getBatchListener() {
            return batchListener;
        }

        public void setBatchListener(Boolean batchListener) {
            this.batchListener = batchListener;
        }
    }


    public ConsumerExtra getConsumerExtra() {
        return this.consumerExtra;
    }



}

2.自定義ConcurrentKafkaListenerContainerFactory這個bean

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.kafka.ConcurrentKafkaListenerContainerFactoryConfigurer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;

@Configuration
public class KafkaConfig {

    @Autowired KafkaPropertiesExtra kafkaProperties;

    @Bean
    public ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory(
        ConcurrentKafkaListenerContainerFactoryConfigurer configurer,
        ConsumerFactory<Object, Object> kafkaConsumerFactory) {
        ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<Object, Object>();
        configurer.configure(factory, kafkaConsumerFactory);

        // 補充批量處理引數
        factory.setBatchListener(kafkaProperties.getConsumerExtra().getBatchListener());

        return factory;
    }

}

3.修改application.yml,加上批量相關的配置

spring:
    kafka:
        ......
        consumer:
            ......
            # 一次性最多抓取多少條資料
            max-poll-records: 100
            # 超時時間 毫秒
            poll-timeout: 3000
            max-poll-interval-ms: 5000
        consumer-extra:
            # 是否批量處理
            batch-listener: true

4.修改java監聽器邏輯,改成list

@KafkaListener(topics = { "topic" })
	public void listenPlayEnt(List<ConsumerRecord<?, ?>> records) {
	// 處理邏輯
   ....
}

至此,就基於spring boot的KafkaAutoConfiguration實現了kafka批量消費邏輯。