1. 程式人生 > 其它 >springboot整合整合kafka-消費過濾器、ConcurrentKafkaListenerContainerFactory

springboot整合整合kafka-消費過濾器、ConcurrentKafkaListenerContainerFactory

技術標籤:springbootkafka

寫在前面:各位看到此部落格的小夥伴,如有不對的地方請及時通過私信我或者評論此部落格的方式指出,以免誤人子弟。多謝!

今天記錄一下訊息過濾器的使用,訊息過濾器可以讓訊息在抵達監聽容器前被攔截,過濾器根據系統業務邏輯去篩選出需要的資料交由 KafkaListener 處理,不需要的訊息則會過濾掉。使用訊息過濾器也很簡單,只要兩步:1.定義一個過濾器;[email protected]註解中通過containerFactory屬性引用即可。直接貼一下程式碼:

一、定義過濾器

/**
 * 消費者訊息過濾器
 */
@Configuration
public class CustomKafkaSendFilter {
    // 監聽器工廠
    @Autowired
    private ConsumerFactory consumerFactory;
    @Bean
    public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory(){
        ConcurrentKafkaListenerContainerFactory factory =
                new ConcurrentKafkaListenerContainerFactory();
        factory.setConsumerFactory(consumerFactory);
        factory.setConcurrency(3);
        factory.setBatchListener(true);
        // 被過濾的訊息將被丟棄
        factory.setAckDiscarded(true);
        // 設定記錄篩選策略
        factory.setRecordFilterStrategy(new RecordFilterStrategy() {
            @Override
            public boolean filter(ConsumerRecord consumerRecord) {
                String msg = consumerRecord.value().toString();
                if(Integer.parseInt(msg.substring(msg.length() - 1)) % 2 == 0){
                    return false;
                }
                // 返回true訊息將會被丟棄
                return true;
            }
        });
        return factory;
    }
}

注意上面需要設定factory.setConcurrency(3)和factory.setBatchListener(true),否則即使在yml檔案中配置了這兩項,批量消費依然不起作用。

一、引用過濾器

    /**
     * 測試訊息過濾器
     */
    @KafkaListener(topics = {"mytopic"},containerFactory = "kafkaListenerContainerFactory")
    public void test(List<String> message){
        System.out.println("接收到的訊息:" + message);

    }

三、測試下

新增測試方法:

    @Transactional
    @GetMapping("/send15")
    public void send15(){
        for (int i = 0; i < 20; i++) {
            kafkaTemplate.send(topic,i+"");
        }
    }

我們期望的是過濾器將傳送訊息為奇數的都過濾掉,訊息監聽器只處理偶數訊息。

啟動專案,訪問http://localhost:8080/send15結果如下:

如上:從列印結果來看確實只接收處理了偶數部分訊息,但咱之前批量消費設定的是每次拉取五條訊息,為啥列印結果不是五條五條的呢,最開始的時候就說到:“訊息過濾器可以讓訊息在抵達監聽容器前被攔截”;也就是說訊息都正常傳送到kafka伺服器,只是只監聽消費滿足過濾後剩下的訊息。看下啟動日誌:

再次重啟看下啟動日誌:

其實20條訊息都提交到kafka伺服器上了,並且也是五條五條的拉取的,只是在抵達監聽器前過濾器給過濾掉了部分資料。