Spring中手動開啟kafka監聽.md
0. 背景重現
最近搭建一個新專案,基於SpringBoot框架,使用Kafka做訊息中介軟體。
使用@KafkaListener註解來建立一個消費者,實現對Kafka訊息的消費。我計劃的執行順序是這樣的:服務啟動之後,建立Consumer例項,執行loadResourceConfig初始化方法,之後才開始消費Kafka的訊息。
但是出現了一個問題:沒有等loadResourceConfig方式執行完畢,@KafkaListener就開始消費訊息了。
這顯然不是我們期望的,下面是大概的程式碼:
@Component public class Consumer{ @PostConstruct private void loadResourceConfig () {//載入資料 // 載入資源配置 } /** * 接收資料處理 * @param record */ @KafkaListener(id = "device-data",topics = {"${DataTopic}"}) public void listen(ConsumerRecord<String, ?> record) { Optional kafkaMessage = Optional.ofNullable(record.value()); Optional<String> kafkaKey = Optional.ofNullable(record.key()); if (kafkaKey.isPresent()) { Object value = kafkaMessage.get(); String gatewayId = kafkaKey.get(); //使用 載入的資源資訊對資料進行處理 } } }
1.原因分析
@KafkaListener這個註解所標註的方法並沒有在IOC容器中註冊為Bean,而是會被註冊在KafkaListenerEndpointRegistry中,KafkaListenerEndpointRegistry在SpringIOC中已經被註冊為Bean,具體可以看一下該類的原始碼,當然不是使用註解方式註冊。
KafkaListenerEndpointRegistry註冊完Kafka中的topic之後,就會自動啟動監聽容器,如此KafkaListener註解的方法就開始消費訊息了。這個過程可能在自定義Bean建立完成之前執行。
知道了問題,以及原因,解決方法就比較簡單了,我們只需要完成2點:
1.禁止KafkaListener自啟動(AutoStartup)
2.手動啟動單個Kafka的topic的監聽
2.解決方法
@Component public class Consumer{ @Autowired KafkaManager kafkaManager; @PostConstruct private void loadResourceConfig () {//載入資料 // 載入資源配置 kafkaManager.startListener();//開啟topic的監聽 } } @Component public class KafkaManager { @Autowired private KafkaListenerEndpointRegistry registry; @Autowired private ConsumerFactory consumerFactory; @Bean public ConcurrentKafkaListenerContainerFactory delayContainerFactory() { ConcurrentKafkaListenerContainerFactory container = new ConcurrentKafkaListenerContainerFactory(); container.setConsumerFactory(consumerFactory); //禁止自動啟動 container.setAutoStartup(false); return container; } /** * 開啟kafka監聽 */ public void startListener() { if (!registry.getListenerContainer("device-data").isRunning()) { registry.getListenerContainer("device-data").start(); } registry.getListenerContainer("device-data").resume(); }
上面的程式碼做了幾件事:
1.使用ConsumerFactory 構建Kafka監聽容器工廠ConcurrentKafkaListenerContainerFactory
2.Kafka監聽容器工廠註冊為Bean
3.禁止Kafka監聽容器自動啟動
4.在loadResourceConfig方法載入完成資源之後,呼叫startListener方法,手動啟動Kafka容器監聽。注意registry.getListenerContainer(“device-data”)的引數,就是 @KafkaListener註解中的id引數。
5.startListener中我們先判斷容器是否執行(isRunning),如果沒有則呼叫start方法啟動。 resume方法是恢復執行。這樣寫的目的是,即便startListener多次執行,也沒有問題。