1. 程式人生 > >spring-kafka之KafkaListener註解深入解讀

spring-kafka之KafkaListener註解深入解讀

簡介

Kafka目前主要作為一個分散式的釋出訂閱式的訊息系統使用,也是目前最流行的訊息佇列系統之一。因此,也越來越多的框架對kafka做了整合,比如本文將要說到的spring-kafka。

Kafka既然作為一個訊息釋出訂閱系統,就包括訊息生成者和訊息消費者。本文主要講述的spring-kafka框架的kafkaListener註解的深入解讀和使用案例。

解讀

原始碼解讀

@Target({ ElementType.TYPE, ElementType.METHOD, ElementType.ANNOTATION_TYPE })

@Retention(RetentionPolicy.RUNTIME)

@MessageMapping

@Documented

@Repeatable(KafkaListeners.class)

public @interface KafkaListener {



   /**

    * 消費者的id,當GroupId沒有被配置的時候,預設id為GroupId

    */

   String id() default "";



   /**

    * 監聽容器工廠,當監聽時需要區分單資料還是多資料消費需要配置containerFactory      屬性

    */

   String containerFactory() default "";



   /**

    * 需要監聽的Topic,可監聽多個,和 topicPattern 屬性互斥
*/

   String[] topics() default {};




   /**

    * 需要監聽的Topic的正則表達。和 topics,topicPartitions屬性互斥
    */

   String topicPattern() default "";


   /**

    * 可配置更加詳細的監聽資訊,必須監聽某個Topic中的指定分割槽,或者從offset為200的偏移量開始監聽,可配置該引數, 和 topicPattern 屬性互斥
    */

   TopicPartition[] topicPartitions() default {};



   /**

    *偵聽器容器組 

    */

   String containerGroup() default "";



   /**

    * 監聽異常處理器,配置BeanName

    */

   String errorHandler() default "";



   /**

    * 消費組ID 

    */

   String groupId() default "";



   /**

    * id是否為GroupId

    */

   boolean idIsGroup() default true;



   /**

    * 消費者Id字首

    */

   String clientIdPrefix() default "";



   /**

    * 真實監聽容器的BeanName,需要在 BeanName前加 "__"

    */

   String beanRef() default "__listener";



}
View Code

使用案例

ConsumerRecord類消費

使用ConsumerRecord類接收有一定的好處,ConsumerRecord類裡面包含分割槽資訊、訊息頭、訊息體等內容,如果業務需要獲取這些引數時,使用ConsumerRecord會是個不錯的選擇。如果使用具體的型別接收訊息體則更加方便,比如說用String型別去接收訊息體。

這裡我們編寫一個Listener方法,監聽"topic1"Topic,並把ConsumerRecord裡面所包含的內容列印到控制檯中:

@Component

public class Listener {



    private static final Logger log = LoggerFactory.getLogger(Listener.class);



    @KafkaListener(id = "consumer", topics = "topic1")

    public void consumerListener(ConsumerRecord<Integer, String> record) {

        log.info("topic.quick.consumer receive : " + record.toString());

    }



}
View Code

批量消費

批量消費在現實業務場景中是很有實用性的。因為批量消費可以增大kafka消費吞吐量,提高效能。

批量消費實現步驟:

1、重新建立一份新的消費者配置,配置為一次拉取10條訊息

2、建立一個監聽容器工廠,命名為:batchContainerFactory,設定其為批量消費並設定併發量為5,這個併發量根據分割槽數決定,必須小於等於分割槽數,否則會有執行緒一直處於空閒狀態。

3、建立一個分割槽數為8的Topic。

4、建立監聽方法,設定消費id為“batchConsumer”,clientID字首為“batch”,監聽“batch”,使用“batchContainerFactory”工廠建立該監聽容器。

@Component

public class BatchListener {



    private static final Logger log= LoggerFactory.getLogger(BatchListener.class);



    private Map<String, Object> consumerProps() {

        Map<String, Object> props = new HashMap<>();

        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");

        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);

        props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");

        props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000");

        //一次拉取訊息數量

        props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "10");

        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,

                NumberDeserializers.IntegerDeserializer.class);

        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,

                StringDeserializer.class);

        return props;

    }



    @Bean("batchContainerFactory")

    public ConcurrentKafkaListenerContainerFactory listenerContainer() {

        ConcurrentKafkaListenerContainerFactory container

                = new ConcurrentKafkaListenerContainerFactory();

        container.setConsumerFactory(new DefaultKafkaConsumerFactory(consumerProps()));

        //設定併發量,小於或等於Topic的分割槽數

        container.setConcurrency(5);

        //必須 設定為批量監聽

        container.setBatchListener(true);

        return container;

    }



    @Bean

    public NewTopic batchTopic() {

        return new NewTopic("topic.batch", 8, (short) 1);

    }



    @KafkaListener(id = "batchConsumer",clientIdPrefix = "batch"

            ,topics = {"topic.batch"},containerFactory = "batchContainerFactory")

    public void batchListener(List<String> data) {

        log.info("topic.batch  receive : ");

        for (String s : data) {

            log.info(  s);

        }

    }

}
View Code

 

監聽Topic中指定的分割槽

使用@KafkaListener註解的topicPartitions屬性監聽不同的partition分割槽。

@TopicPartition:topic--需要監聽的Topic的名稱,partitions --需要監聽Topic的分割槽id。

partitionOffsets --可以設定從某個偏移量開始監聽,@PartitionOffset:partition --分割槽Id,非陣列,initialOffset --初始偏移量。

@Bean

public NewTopic batchWithPartitionTopic() {

    return new NewTopic("topic.batch.partition", 8, (short) 1);

}



@KafkaListener(id = "batchWithPartition",clientIdPrefix = "bwp",containerFactory = "batchContainerFactory",

        topicPartitions = {

                @TopicPartition(topic = "topic.batch.partition",partitions = {"1","3"}),

                @TopicPartition(topic = "topic.batch.partition",partitions = {"0","4"},

                        partitionOffsets = @PartitionOffset(partition = "2",initialOffset = "100"))

        }

)

public void batchListenerWithPartition(List<String> data) {

    log.info("topic.batch.partition  receive : ");

    for (String s : data) {

        log.info(s);

    }

}
View Code

註解方式獲取訊息頭及訊息體

當你接收的訊息包含請求頭,以及你監聽方法需要獲取該訊息非常多的欄位時可以通過這種方式。。這裡使用的是預設的監聽容器工廠建立的,如果你想使用批量消費,把對應的型別改為List即可,比如List<String> data , List<Integer> key。

@Payload:獲取的是訊息的訊息體,也就是傳送內容

@Header(KafkaHeaders.RECEIVED_MESSAGE_KEY):獲取傳送訊息的key

@Header(KafkaHeaders.RECEIVED_PARTITION_ID):獲取當前訊息是從哪個分割槽中監聽到的

@Header(KafkaHeaders.RECEIVED_TOPIC):獲取監聽的TopicName

@Header(KafkaHeaders.RECEIVED_TIMESTAMP):獲取時間戳

@KafkaListener(id = "params", topics = "topic.params")

public void otherListener(@Payload String data,

                         @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) Integer key,

                         @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,

                         @Header(KafkaHeaders.RECEIVED_TOPIC) String topic,

                         @Header(KafkaHeaders.RECEIVED_TIMESTAMP) long ts) {

    log.info("topic.params receive : \n"+

            "data : "+data+"\n"+

            "key : "+key+"\n"+

            "partitionId : "+partition+"\n"+

            "topic : "+topic+"\n"+

            "timestamp : "+ts+"\n"

    );

}
View Code

使用Ack機制確認消費

Kafka是通過最新儲存偏移量進行訊息消費的,而且確認消費的訊息並不會立刻刪除,所以我們可以重複的消費未被刪除的資料,當第一條訊息未被確認,而第二條訊息被確認的時候,Kafka會儲存第二條訊息的偏移量,也就是說第一條訊息再也不會被監聽器所獲取,除非是根據第一條訊息的偏移量手動獲取。Kafka的ack 機制可以有效的確保消費不被丟失。因為自動提交是在kafka拉取到資料之後就直接提交,這樣很容易丟失資料,尤其是在需要事物控制的時候。

使用Kafka的Ack機制比較簡單,只需簡單的三步即可:

  1. 設定ENABLE_AUTO_COMMIT_CONFIG=false,禁止自動提交
  2. 設定AckMode=MANUAL_IMMEDIATE
  3. 監聽方法加入Acknowledgment ack 引數

4.使用Consumer.seek方法,可以指定到某個偏移量的位置

@Component

public class AckListener {

    private static final Logger log = LoggerFactory.getLogger(AckListener.class);



    private Map<String, Object> consumerProps() {

        Map<String, Object> props = new HashMap<>();

        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");

        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);

        props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");

        props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000");

        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);

        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);

        return props;

    }



    @Bean("ackContainerFactory")

    public ConcurrentKafkaListenerContainerFactory ackContainerFactory() {

        ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory();

        factory.setConsumerFactory(new DefaultKafkaConsumerFactory(consumerProps()));

        factory.getContainerProperties().setAckMode(AbstractMessageListenerContainer.AckMode.MANUAL_IMMEDIATE);

        factory.setConsumerFactory(new DefaultKafkaConsumerFactory(consumerProps()));

        return factory;

    }





    @KafkaListener(id = "ack", topics = "topic.ack", containerFactory = "ackContainerFactory")

    public void ackListener(ConsumerRecord record, Acknowledgment ack) {

        log.info("topic.quick.ack receive : " + record.value());

        ack.acknowledge();

    }

}
View Code

解決重複消費

上一節中使用ack手動提交偏移量時,假如consumer掛了重啟,那它將從committed offset位置開始重新消費,而不是consume offset位置。這也就意味著有可能重複消費。

在0.9客戶端中,有3種ack策略: 

策略1: 自動的,週期性的ack。

策略2:consumer.commitSync(),呼叫commitSync,手動同步ack。每處理完1條訊息,commitSync 1次。

策略3:consumer. commitASync(),手動非同步ack。、

那麼使用策略2,提交每處理完1條訊息,就傳送一次commitSync。那這樣是不是就可以解決“重複消費”了呢?如下程式碼:

while (true) {

        List<ConsumerRecord> buffer = new ArrayList<>();

        ConsumerRecords<String, String> records = consumer.poll(100);

        for (ConsumerRecord<String, String> record : records) {

            buffer.add(record);

        }

        insertIntoDb(buffer);    //消除處理,存到db

        consumer.commitSync();   //同步傳送ack

        buffer.clear();

    }

}
View Code

答案是否定的!因為上面的insertIntoDb和commitSync做不到原子操作:如果在資料處理完成,commitSync的時候掛了,伺服器再次重啟,訊息仍然會重複消費。

         那麼如何解決重複消費的問題呢?答案是自己儲存committed offset,而不是依賴kafka的叢集儲存committed offset,把訊息的處理和儲存offset做成一個原子操作,並且對訊息加入唯一id,進行判重。

依照官方文件,要自己儲存偏移量,需要:

  1. enable.auto.commit=false, 禁用自動ack。
  2. 每次取到訊息,把對應的offset存下來。
  3. 下次重啟,通過consumer.seek函式,定位到自己儲存的offset,從那開始消費。
  4. 更進一步處理可以對訊息加入唯一id,進行判重。

&n