1. 程式人生 > >Spring Boot 自定義kafka 消費者配置 ContainerFactory最佳實踐

Spring Boot 自定義kafka 消費者配置 ContainerFactory最佳實踐

# Spring Boot 自定義kafka 消費者配置 ContainerFactory最佳實踐 > 本篇博文主要提供一個在 SpringBoot 中自定義 kafka配置的實踐,想象這樣一個場景:你的系統需要監聽多個不同叢集的訊息,在不同的叢集中topic衝突了,所以你需要分別定義kafka訊息配置。 此篇文章會在SpringBoot 提供的預設模板上提供擴充套件,不會因為你自定義了消費者配置,而導致原生SpringBoot的Kakfa模板配置失效。 ## 引入 MAVEN 依賴 > 版本需要你自己指定 ``` com.alibaba fastjson xxx org.springframework.kafka spring-kafka xxx
org.apache.kafka kafka-clients xxx ``` ## 引入Java配置類 ```java /** * 手動自定義 kafka 消費者 ContainerFactory 配置demo */ @Configuration @EnableConfigurationProperties(KafkaProperties.class) public class KafkaConsumerConfig { @Autowired private KafkaProperties properties; @Value("${監聽服務地址}") private List myServers; @Bean("myKafkaContainerFactory") @ConditionalOnBean(ConcurrentKafkaListenerContainerFactoryConfigurer.class) public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory( ConcurrentKafkaListenerContainerFactoryConfigurer configurer) { ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>(); configurer.configure(factory, consumerFactory()); return factory; } //獲得建立消費者工廠 public ConsumerFactory consumerFactory() { KafkaProperties myKafkaProperties = JSON.parseObject(JSON.toJSONString(this.properties), KafkaProperties.class); //對模板 properties 進行定製化 //.... //例如:定製servers myKafkaProperties.setBootstrapServers(myServers); return new DefaultKafkaConsumerFactory<>(myKafkaProperties.buildConsumerProperties()); } } ``` ## yml模板 ``` #kafka配置,更多配置請參考:KafkaProperties spring.kafka: #公共引數,其他的timeout.ms, request.timeout.ms, metadata.fetch.timeout.ms保持預設值 properties: #這個引數指定producer在傳送批量訊息前等待的時間,當設定此引數後,即便沒有達到批量訊息的指定大小(batch-size),到達時間後生產者也會發送批量訊息到broker。預設情況下,生產者的傳送訊息執行緒只要空閒了就會發送訊息,即便只有一條訊息。設定這個引數後,傳送執行緒會等待一定的時間,這樣可以批量傳送訊息增加吞吐量,但同時也會增加延遲。 linger.ms: 50 #預設值:0毫秒,當訊息傳送比較頻繁時,增加一些延遲可增加吞吐量和效能。 #這個引數指定producer在一個TCP connection可同時傳送多少條訊息到broker並且等待broker響應,設定此引數較高的值可以提高吞吐量,但同時也會增加記憶體消耗。另外,如果設定過高反而會降低吞吐量,因為批量訊息效率降低。設定為1,可以保證傳送到broker的順序和呼叫send方法順序一致,即便出現失敗重試的情況也是如此。 #注意:當前訊息符合at-least-once,自kafka1.0.0以後,為保證訊息有序以及exactly once,這個配置可適當調大為5。 max.in.flight.requests.per.connection: 1 #預設值:5,設定為1即表示producer在connection上傳送一條訊息,至少要等到這條訊息被broker確認收到才繼續傳送下一條,因此是有序的。 #生產者的配置,可參考org.apache.kafka.clients.producer.ProducerConfig producer: #這個引數可以是任意字串,它是broker用來識別訊息是來自哪個客戶端的。在broker進行列印日誌、衡量指標或者配額限制時會用到。 clientId: ${spring.application.name} #方便kafkaserver列印日誌定位請求來源 bootstrap-servers: 127.0.0.1:8080 #kafka伺服器地址,多個以逗號隔開 #acks=0:生產者把訊息傳送到broker即認為成功,不等待broker的處理結果。這種方式的吞吐最高,但也是最容易丟失訊息的。 #acks=1:生產者會在該分割槽的leader寫入訊息並返回成功後,認為訊息傳送成功。如果群首寫入訊息失敗,生產者會收到錯誤響應並進行重試。這種方式能夠一定程度避免訊息丟失,但如果leader宕機時該訊息沒有複製到其他副本,那麼該訊息還是會丟失。另外,如果我們使用同步方式來發送,延遲會比前一種方式大大增加(至少增加一個網路往返時間);如果使用非同步方式,應用感知不到延遲,吞吐量則會受非同步正在傳送中的數量限制。 #acks=all:生產者會等待所有副本成功寫入該訊息,這種方式是最安全的,能夠保證訊息不丟失,但是延遲也是最大的。 #如果是傳送日誌之類的,允許部分丟失,可指定acks=0,如果想不丟失訊息,可配置為all,但需密切關注效能和吞吐量。 acks: all #預設值:1 #當生產者傳送訊息收到一個可恢復異常時,會進行重試,這個引數指定了重試的次數。在實際情況中,這個引數需要結合retry.backoff.ms(重試等待間隔)來使用,建議總的重試時間比叢集重新選舉leader的時間長,這樣可以避免生產者過早結束重試導致失敗。 #另外需注意,當開啟重試時,若未設定max.in.flight.requests.per.connection=1,則可能出現發往同一個分割槽的兩批訊息的順序出錯,比如,第一批發送失敗了,第二批成功了,然後第一批重試成功了,此時兩者的順序就顛倒了。 retries: 2 #傳送失敗時重試多少次,0=禁用重試(預設值) #預設情況下訊息是不壓縮的,此引數可指定採用何種演算法壓縮訊息,可取值:none,snappy,gzip,lz4。snappy壓縮演算法由Google研發,這種演算法在效能和壓縮比取得比較好的平衡;相比之下,gzip消耗更多的CPU資源,但是壓縮效果也是最好的。通過使用壓縮,我們可以節省網路頻寬和Kafka儲存成本。 compressionType: "none" #如果不開啟壓縮,可設定為none(預設值),比較大的訊息可開啟。 #當多條訊息傳送到一個分割槽時,Producer會進行批量傳送,這個引數指定了批量訊息大小的上限(以位元組為單位)。當批量訊息達到這個大小時,Producer會一起傳送到broker;但即使沒有達到這個大小,生產者也會有定時機制來發送訊息,避免訊息延遲過大。 batch-size: 16384 #預設16K,值越小延遲越低,但是吞吐量和效能會降低。0=禁用批量傳送 #這個引數設定Producer暫存待發送訊息的緩衝區記憶體的大小,如果應用呼叫send方法的速度大於Producer傳送的速度,那麼呼叫會阻塞一定(max.block.ms)時間後丟擲異常。 buffer-memory: 33554432 #緩衝區預設大小32M #消費者的配置,可參考:org.apache.kafka.clients.consumer.ConsumerConfig consumer: #這個引數可以為任意值,用來指明訊息從哪個客戶端發出,一般會在列印日誌、衡量指標、分配配額時使用。 #暫不用提供clientId,2.x版本可放出來,1.x有多個topic且concurrency>1會出現JMX註冊時異常 #clientId: ${spring.application.name} #方便kafkaserver列印日誌定位請求來源 # 籤中kafka叢集 bootstrap-servers: 127.0.0.1:8080 #kafka伺服器地址,多個以逗號隔開 #這個引數指定了當消費者第一次讀取分割槽或者無offset時拉取那個位置的訊息,可以取值為latest(從最新的訊息開始消費),earliest(從最老的訊息開始消費),none(如果無offset就丟擲異常) autoOffsetReset: latest #預設值:latest #這個引數指定了消費者是否自動提交消費位移,預設為true。如果需要減少重複消費或者資料丟失,你可以設定為false,然後手動提交。如果為true,你可能需要關注自動提交的時間間隔,該間隔由auto.commit.interval.ms設定。 enable-auto-commit: false #週期性自動提交的間隔,單位毫秒 auto-commit-interval: 2000 #預設值:5000 #這個引數允許消費者指定從broker讀取訊息時最小的Payload的位元組數。當消費者從broker讀取訊息時,如果資料位元組數小於這個閾值,broker會等待直到有足夠的資料,然後才返回給消費者。對於寫入量不高的主題來說,這個引數可以減少broker和消費者的壓力,因為減少了往返的時間。而對於有大量消費者的主題來說,則可以明顯減輕broker壓力。 fetchMinSize: 1 #預設值: 1 #上面的fetch.min.bytes引數指定了消費者讀取的最小資料量,而這個引數則指定了消費者讀取時最長等待時間,從而避免長時間阻塞。這個引數預設為500ms。 fetchMaxWait: 500 #預設值:500毫秒 #這個引數控制一個poll()呼叫返回的記錄數,即consumer每次批量拉多少條資料。 maxPollRecords: 500 #預設值:500 listener: #建立多少個consumer,值必須小於等於Kafk Topic的分割槽數。 ack-mode: MANUAL_IMMEDIATE concurrency: 1 #推薦設定為topic的分割槽數 ``` ## 配置釋義 點開 KafkaProperties 這個類,可以看到這個是SpringBoot 自動配置kafka的配置類,引入這個例項,就相當於你拿到了SpringBoot kafka配置模板的引數,就是上述貼的配置,然後再此基礎上重新定義你需要改變的配置,這裡主要講消費者配置。 程式碼中舉了個重寫監聽servers的例子: ``` //例如:定製servers myKafkaProperties.setBootstrapServers(myServers); ``` ## @KafkaListener 使用 containerFactory ```Java @Slf4j @Component public class ConsumerDemo { //宣告consumerID為demo,監聽topicName為topic.quick.demo的Topic //這個消費者的 containerFactory 是SpringBoot 提供的 kafkaListenerContainerFactory 這個bean @KafkaListener(id = "demo", topics = "topic.quick.demo") public void listen(String msgData) { log.info("demo receive : " + msgData); } @KafkaListener(topics = "k010", containerFactory = "myKafkaContainerFactory") public void listen(String msgData, Acknowledgment ack) { log.info("demo receive : " + msgData); //手動提交 //enable.auto.commit引數設定成false。那麼就是Spring來替為我們做人工提交,從而簡化了人工提交的方式。 //所以kafka和springboot結合中的enable.auto.commit為false為spring的人工提交模式。 //enable.auto.commit為true是採用kafka的預設提交模式。 ack.acknowledge(); } } ``` 如果在@KafkaListener屬性中沒有指定 containerFactory 那麼Spring Boot 會預設注入 name 為“kafkaListenerContainerFactory” 的 containerFactory。具體原始碼可跟蹤:**KafkaListenerAnnotationBeanPostProcessor**中的常量: ``` public static final String DEFAULT_KAFKA_LISTENER_CONTAINER_FACTORY_BEAN_NAME = "kafkaListenerContainerFactor