spring boot kafka 支援批量操作
阿新 • • 發佈:2018-11-08
前言
最近有個專案存在kafka積壓情況,上去看了下,的確積壓挺厲害。
看了下程式碼,spring boot 是1.5.13.RELEASE版本,kafka使用的是spring boot的自動配置,@KafkaListener每次處理一條資料,每次邏輯中存在多次資料庫操作。
準備修改下邏輯,@KafkaListener批量處理資料,合併邏輯,並且批量操作資料庫,提高處理速度。
原有邏輯
1.pom新增配置
<dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> </dependency>
2.application.yml新增配置
spring: kafka: bootstrap-servers: 127.0.0.1:9092 #生產者的配置,大部分我們可以使用預設的,這裡列出幾個比較重要的屬性 producer: #每批次傳送訊息的數量 batch-size: 16 #設定大於0的值將使客戶端重新發送任何資料,一旦這些資料傳送失敗。注意,這些重試與客戶端接收到傳送錯誤時的重試沒有什麼不同。允許重試將潛在的改變資料的順序,如果這兩個訊息記錄都是傳送到同一個partition,則第一個訊息失敗第二個傳送成功,則第二條訊息會比第一條訊息出現要早。 retries: 0 #producer可以用來快取資料的記憶體大小。如果資料產生速度大於向broker傳送的速度,producer會阻塞或者丟擲異常,以“block.on.buffer.full”來表明。這項設定將和producer能夠使用的總記憶體相關,但並不是一個硬性的限制,因為不是producer使用的所有記憶體都是用於快取。一些額外的記憶體會用於壓縮(如果引入壓縮機制),同樣還有一些用於維護請求。 buffer-memory: 33554432 #key序列化方式 key-serializer: org.apache.kafka.common.serialization.StringSerializer value-serializer: org.apache.kafka.common.serialization.StringSerializer #消費者的配置 consumer: concurrency: 10 #Kafka中沒有初始偏移或如果當前偏移在伺服器上不再存在時,預設區最新 ,有三個選項 【latest, earliest, none】 auto-offset-reset: earliest #是否開啟自動提交 enable-auto-commit: false ack-mode: MANUAL_IMMEDIATE #自動提交的時間間隔 # auto-commit-interval: 100 #key的解碼方式 key-deserializer: org.apache.kafka.common.serialization.StringDeserializer #value的解碼方式 value-deserializer: org.apache.kafka.common.serialization.StringDeserializer group-id: test-consumer-group
3.java監聽邏輯
@KafkaListener(topics = { "topic" })
public void listenPlayEnt(ConsumerRecord<?, ?> record) {
// 處理邏輯
....
}
簡單思路
spring kafka實現
由於spring boot 自動配置KafkaAutoConfiguration和KafkaProperties中沒有找到批量操作相關的,轉向檢視依賴實現spring-kafka。
從 官方文件 https://docs.spring.io/spring-kafka/reference/html/_reference.html
中我們可以看出來批量需要以下程式碼實現。
@Configuration
@EnableKafka
public class KafkaConfig {
@Bean
KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>>
kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setConcurrency(3);
# 這個就是用來控制是否批量處理
factory.setBatchListener(true);
factory.getContainerProperties().setPollTimeout(3000);
return factory;
}
@Bean
public ConsumerFactory<Integer, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, embeddedKafka.getBrokersAsString());
...
return props;
}
}
但是,我人比較懶,不想大幅度修改kafka相關的配置,想要沿用spring boot kafka自動配置邏輯。
具體的過程不說了,簡單分析下KafkaAutoConfiguration邏輯,可以看到差別主要是在ConcurrentKafkaListenerContainerFactory這個bean。
spring boot kafka實現
1.重寫KafkaProperties物件
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
@ConfigurationProperties(prefix = "spring.kafka")
@Configuration("KafkaPropertiesExtra")
@Primary
public class KafkaPropertiesExtra extends KafkaProperties {
private final ConsumerExtra consumerExtra = new ConsumerExtra();
public static class ConsumerExtra {
private Boolean batchListener = false;
public Boolean getBatchListener() {
return batchListener;
}
public void setBatchListener(Boolean batchListener) {
this.batchListener = batchListener;
}
}
public ConsumerExtra getConsumerExtra() {
return this.consumerExtra;
}
}
2.自定義ConcurrentKafkaListenerContainerFactory這個bean
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.kafka.ConcurrentKafkaListenerContainerFactoryConfigurer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
@Configuration
public class KafkaConfig {
@Autowired KafkaPropertiesExtra kafkaProperties;
@Bean
public ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory(
ConcurrentKafkaListenerContainerFactoryConfigurer configurer,
ConsumerFactory<Object, Object> kafkaConsumerFactory) {
ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<Object, Object>();
configurer.configure(factory, kafkaConsumerFactory);
// 補充批量處理引數
factory.setBatchListener(kafkaProperties.getConsumerExtra().getBatchListener());
return factory;
}
}
3.修改application.yml,加上批量相關的配置
spring:
kafka:
......
consumer:
......
# 一次性最多抓取多少條資料
max-poll-records: 100
# 超時時間 毫秒
poll-timeout: 3000
max-poll-interval-ms: 5000
consumer-extra:
# 是否批量處理
batch-listener: true
4.修改java監聽器邏輯,改成list
@KafkaListener(topics = { "topic" })
public void listenPlayEnt(List<ConsumerRecord<?, ?>> records) {
// 處理邏輯
....
}
至此,就基於spring boot的KafkaAutoConfiguration實現了kafka批量消費邏輯。