springboot整合整合kafka-消費過濾器、ConcurrentKafkaListenerContainerFactory
阿新 • • 發佈:2020-12-14
技術標籤:springbootkafka
寫在前面:各位看到此部落格的小夥伴,如有不對的地方請及時通過私信我或者評論此部落格的方式指出,以免誤人子弟。多謝!
今天記錄一下訊息過濾器的使用,訊息過濾器可以讓訊息在抵達監聽容器前被攔截,過濾器根據系統業務邏輯去篩選出需要的資料交由 KafkaListener 處理,不需要的訊息則會過濾掉。使用訊息過濾器也很簡單,只要兩步:1.定義一個過濾器;[email protected]註解中通過containerFactory屬性引用即可。直接貼一下程式碼:
一、定義過濾器
/** * 消費者訊息過濾器 */ @Configuration public class CustomKafkaSendFilter { // 監聽器工廠 @Autowired private ConsumerFactory consumerFactory; @Bean public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory(){ ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory(); factory.setConsumerFactory(consumerFactory); factory.setConcurrency(3); factory.setBatchListener(true); // 被過濾的訊息將被丟棄 factory.setAckDiscarded(true); // 設定記錄篩選策略 factory.setRecordFilterStrategy(new RecordFilterStrategy() { @Override public boolean filter(ConsumerRecord consumerRecord) { String msg = consumerRecord.value().toString(); if(Integer.parseInt(msg.substring(msg.length() - 1)) % 2 == 0){ return false; } // 返回true訊息將會被丟棄 return true; } }); return factory; } }
注意上面需要設定factory.setConcurrency(3)和factory.setBatchListener(true),否則即使在yml檔案中配置了這兩項,批量消費依然不起作用。
一、引用過濾器
/** * 測試訊息過濾器 */ @KafkaListener(topics = {"mytopic"},containerFactory = "kafkaListenerContainerFactory") public void test(List<String> message){ System.out.println("接收到的訊息:" + message); }
三、測試下
新增測試方法:
@Transactional
@GetMapping("/send15")
public void send15(){
for (int i = 0; i < 20; i++) {
kafkaTemplate.send(topic,i+"");
}
}
我們期望的是過濾器將傳送訊息為奇數的都過濾掉,訊息監聽器只處理偶數訊息。
啟動專案,訪問http://localhost:8080/send15結果如下:
如上:從列印結果來看確實只接收處理了偶數部分訊息,但咱之前批量消費設定的是每次拉取五條訊息,為啥列印結果不是五條五條的呢,最開始的時候就說到:“訊息過濾器可以讓訊息在抵達監聽容器前被攔截”;也就是說訊息都正常傳送到kafka伺服器,只是只監聽消費滿足過濾後剩下的訊息。看下啟動日誌:
再次重啟看下啟動日誌:
其實20條訊息都提交到kafka伺服器上了,並且也是五條五條的拉取的,只是在抵達監聽器前過濾器給過濾掉了部分資料。