Kafka中Producer攔截器
1.攔截器原理(interceptor)
Producer攔截器(interceptor)是在Kafka 0.10版本被引入的,主要用於實現clients端的定製化控制邏輯
對於producer而言,interceptor使得使用者在訊息傳送前以及producer回撥邏輯前有機會對訊息做一些定製化需求,比如修改訊息
等,同時producer允許使用者指定多個interceptor按序作用於同一條訊息從而形成一個攔截鏈(interceptor chain),Intercetpor的實現介面是org.apache.kafka.clients.producer.ProducerInterceptor
1)configure(configs):
獲取配置資訊和初始化資料時呼叫
2)onSend(ProducerRecord):
Producer確保在訊息被序列化以及計算分割槽前呼叫該方法。使用者可以在該方法中對訊息做任何操作,但最好保證不要修改訊息所屬的topic和分割槽,否則會影響目標分割槽的計算
3)onAcknowledgement(RecordMetadata, Exception):
該方法會在訊息被應答或訊息傳送失敗時呼叫
4)close:
關閉interceptor,主要用於執行一些資源清理工作
2.攔截器實現
1)需求:
實現一個簡單的雙interceptor組成的攔截鏈,第一個interceptor會在訊息傳送前將時間戳資訊加到訊息value的最前部;第二個interceptor會在訊息傳送後更新成功傳送訊息數或失敗傳送訊息數
2)程式碼實現
第一個攔截器實現:
Interceptor1.java
package cn.ysjh; import java.util.Map; import org.apache.kafka.clients.producer.ProducerInterceptor; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; public class Interceptor1 implements ProducerInterceptor<String, String>{ @Override public void configure(Map<String, ?> arg0) { // TODO Auto-generated method stub } @Override public void close() { // TODO Auto-generated method stub } @Override public void onAcknowledgement(RecordMetadata arg0, Exception arg1) { // TODO Auto-generated method stub } @Override public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) { return new ProducerRecord(record.topic(), record.partition(), record.timestamp(), record.key(), System.currentTimeMillis() + "," + record.value().toString()); } }
第二個攔截器實現:
Interceptor2.java
package cn.ysjh;
import java.util.Map;
import org.apache.kafka.clients.producer.ProducerInterceptor;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
public class Interceptor2 implements ProducerInterceptor<String, String>{
private int errorCounter = 0;
private int successCounter = 0;
@Override
public void configure(Map<String, ?> arg0) {
// TODO Auto-generated method stub
}
@Override
public void close() {
// 儲存結果
System.out.println("Successful sent: " + successCounter);
System.out.println("Failed sent: " + errorCounter);
}
@Override
public void onAcknowledgement(RecordMetadata arg0, Exception arg1) {
// 統計成功和失敗的次數
if (arg1 == null) {
successCounter++;
} else {
errorCounter++;
}
}
@Override
public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {
// TODO Auto-generated method stub
return record;
}
}
在Producer中進行註冊: Interceptor會按照註冊的順序進行依次攔截
package cn.ysjh;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
public class ProducerTest {
public static void main(String args[]) {
//1.配置生產者屬性
Properties props = new Properties();
// Kafka服務端的主機名和埠號,可以是多個
props.put("bootstrap.servers", "172.17.0.3:9092");
//配置傳送的訊息是否等待應答
props.put("acks", "all");
//配置訊息傳送失敗的重試
props.put("retries", 0);
// 批量處理資料的大小:16kb
props.put("batch.size", 16384);
// 設定批量處理資料的延遲,單位:ms
props.put("linger.ms", 1);
// 設定記憶體緩衝區的大小
props.put("buffer.memory", 33554432);
//資料在傳送之前一定要序列化
// key序列化
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
// value序列化
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
// 2 構建攔截鏈
List<String> interceptors = new ArrayList<>();
interceptors.add("cn.ysjh.Interceptor1"); interceptors.add("cn.ysjh.Interceptor2");
props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, interceptors);
//2.例項化KafkaProducer
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
for (int i = 0; i < 50; i++) {
//3.呼叫Producer的send方法,進行訊息的傳送,每條待發送的訊息,都必須封裝為一個Record物件
producer.send(new ProducerRecord<String, String>("test", Integer.toString(i), "hello world"+i));
}
//4.close釋放資源
producer.close();
}
}
最後在Kafka叢集中開啟一個消費者,執行程式即可,效果如圖: