Kafka 之producer攔截器(interceptor)
阿新 • • 發佈:2021-11-03
Kafka 之producer攔截器(interceptor)
一、攔截器原理
Producer攔截器(interceptor)是在Kafka 0.10版本被引入的,主要用於實現clients端的定製化控制邏輯。
對於producer而言,interceptor使得使用者在訊息傳送前以及producer回撥邏輯前有機會對訊息做一些定製化需求,比如修改訊息等。同時,producer允許使用者指定多個interceptor按序作用於同一條訊息從而形成一個攔截鏈(interceptor chain)。Intercetpor的實現介面是org.apache.kafka.clients.producer.ProducerInterceptor
(1)configure(configs)
獲取配置資訊和初始化資料時呼叫。
(2)onSend(ProducerRecord):
該方法封裝進KafkaProducer.send方法中,即它執行在使用者主執行緒中。Producer確保在訊息被序列化以及計算分割槽前呼叫該方法。使用者可以在該方法中對訊息做任何操作,但最好保證不要修改訊息所屬的topic和分割槽,否則會影響目標分割槽的計算
(3)onAcknowledgement(RecordMetadata, Exception):
該方法會在訊息被應答或訊息傳送失敗時呼叫,並且通常都是在producer回撥邏輯觸發之前。onAcknowledgement執行在producer的IO執行緒中,因此不要在該方法中放入很重的邏輯,否則會拖慢producer的訊息傳送效率
(4)close:
關閉interceptor,主要用於執行一些資源清理工作
如前所述,interceptor可能被執行在多個執行緒中,因此在具體實現時使用者需要自行確保執行緒安全。另外倘若指定了多個interceptor,則producer將按照指定順序呼叫它們,並僅僅是捕獲每個interceptor可能丟擲的異常記錄到錯誤日誌中而非在向上傳遞。這在使用過程中要特別留意。
二、攔截器案例
1)需求:
實現一個簡單的雙interceptor組成的攔截鏈。第一個interceptor會在訊息傳送前將時間戳資訊加到訊息value的最前部;第二個interceptor會在訊息傳送後更新成功傳送訊息數或失敗傳送訊息數。
2)案例實操
(1)增加時間戳攔截器
packagecom.libt.kafka.interceptor; import java.util.Map; import org.apache.kafka.clients.producer.ProducerInterceptor; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; public class TimeInterceptor implements ProducerInterceptor<String, String> { @Override public void configure(Map<String, ?> configs) { } @Override public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) { // 建立一個新的record,把時間戳寫入訊息體的最前部 return new ProducerRecord(record.topic(), record.partition(), record.timestamp(), record.key(), System.currentTimeMillis() + "," + record.value().toString()); } @Override public void onAcknowledgement(RecordMetadata metadata, Exception exception) { } @Override public void close() { } }
(2)統計傳送訊息成功和傳送失敗訊息數,並在producer關閉時列印這兩個計數器
package com.libt.kafka.interceptor; import java.util.Map; import org.apache.kafka.clients.producer.ProducerInterceptor; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; public class CounterInterceptor implements ProducerInterceptor<String, String>{ private int errorCounter = 0; private int successCounter = 0; @Override public void configure(Map<String, ?> configs) { } @Override public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) { return record; } @Override public void onAcknowledgement(RecordMetadata metadata, Exception exception) { // 統計成功和失敗的次數 if (exception == null) { successCounter++; } else { errorCounter++; } } @Override public void close() { // 儲存結果 System.out.println("Successful sent: " + successCounter); System.out.println("Failed sent: " + errorCounter); } }
(3)producer主程式
package com.libt.kafka.interceptor; import java.util.ArrayList; import java.util.List; import java.util.Properties; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; public class InterceptorProducer { public static void main(String[] args) throws Exception { // 1 設定配置資訊 Properties props = new Properties(); props.put("bootstrap.servers", "hadoop1:9092"); props.put("acks", "all"); props.put("retries", 0); props.put("batch.size", 16384); props.put("linger.ms", 1); props.put("buffer.memory", 33554432); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); // 2 構建攔截鏈 List<String> interceptors = new ArrayList<>(); interceptors.add("com.atguigu.kafka.interceptor.TimeInterceptor"); interceptors.add("com.atguigu.kafka.interceptor.CounterInterceptor"); props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, interceptors); String topic = "first"; Producer<String, String> producer = new KafkaProducer<>(props); // 3 傳送訊息 for (int i = 0; i < 10; i++) { ProducerRecord<String, String> record = new ProducerRecord<>(topic, "message" + i); producer.send(record); } // 4 一定要關閉producer,這樣才會呼叫interceptor的close方法 producer.close(); } }
3)測試
(1)在kafka上啟動消費者,然後執行客戶端java程式。
[hadoop1 kafka]$ bin/kafka-console-consumer.sh \ --zookeeper hadoop1:2181 --from-beginning --topic first 1501904047034,message0 1501904047225,message1 1501904047230,message2 1501904047234,message3 1501904047236,message4 1501904047240,message5 1501904047243,message6 1501904047246,message7 1501904047249,message8 1501904047252,message9
(2)觀察java平臺控制檯輸出資料如下:
Successful sent: 10
Failed sent: 0
做自己的太陽,成為別人的光!