1. 程式人生 > >Kafka Producer 攔截器

Kafka Producer 攔截器

config 回調 exceptio key ots 分配 scrip 參數 pos

Kafka中的攔截器(Interceptor)是0.10.x.x版本引入的一個功能,一共有兩種:Kafka Producer端的攔截器和Kafka Consumer端的攔截器。本篇主要講述的是Kafka Producer端的攔截器,它主要用來對消息進行攔截或者修改,也可以用於Producer的Callback回調之前進行相應的預處理。

使用Kafka Producer端的攔截器非常簡單,主要是實現ProducerInterceptor接口,此接口包含4個方法:

    1. ProducerRecord<K, V> onSend(ProducerRecord<K, V> record):Producer在將消息序列化和分配分區之前會調用攔截器的這個方法來對消息進行相應的操作。一般來說最好不要修改消息ProducerRecord的topic、key以及partition等信息,如果要修改,也需確保對其有準確的判斷,否則會與預想的效果出現偏差。比如修改key不僅會影響分區的計算,同樣也會影響Broker端日誌壓縮(Log Compaction)的功能。
    1. void onAcknowledgement(RecordMetadata metadata, Exception exception):在消息被應答(Acknowledgement)之前或者消息發送失敗時調用,優先於用戶設定的Callback之前執行。這個方法運行在Producer的IO線程中,所以這個方法裏實現的代碼邏輯越簡單越好,否則會影響消息的發送速率。
    1. void close():關閉當前的攔截器,此方法主要用於執行一些資源的清理工作。
    1. configure(Map<String, ?> configs):用來初始化此類的方法,這個是ProducerInterceptor接口的父接口Configurable中的方法。

一般情況下只需要關註並實現onSend或者onAcknowledgement方法即可。下面我們來舉個案例,通過onSend方法來過濾消息體為空的消息以及通過onAcknowledgement方法來計算發送消息的成功率。

public class ProducerInterceptorDemo implements ProducerInterceptor<String,String> {
    private volatile long sendSuccess = 0;
    private volatile long sendFailure = 0;

    @Override
    public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {
        if(record.value().length()<=0)
            return null;
        return record;
    }

    @Override
    public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
        if (exception == null) {
            sendSuccess++;
        } else {
            sendFailure ++;
        }
    }

    @Override
    public void close() {
        double succe***atio = (double)sendSuccess / (sendFailure + sendSuccess);
        System.out.println("[INFO] 發送成功率="+String.format("%f", succe***atio * 100)+"%");
    }

    @Override
    public void configure(Map<String, ?> configs) {}
}

自定義的ProducerInterceptorDemo類實現之後就可以在Kafka Producer的主程序中指定,示例代碼如下:

public class ProducerMain {
    public static final String brokerList = "localhost:9092";
    public static final String topic = "hidden-topic";

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        Properties properties = new Properties();
        properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        properties.put("bootstrap.servers", brokerList);
        properties.put("interceptor.classes", "com.hidden.producer.ProducerInterceptorDemo");

        Producer<String, String> producer = new KafkaProducer<String, String>(properties);

        for(int i=0;i<100;i++) {
            ProducerRecord<String, String> producerRecord = new ProducerRecord<String, String>(topic, "msg-" + i);
            producer.send(producerRecord).get();
        }
        producer.close();
    }
}

Kafka Producer不僅可以指定一個攔截器,還可以指定多個攔截器以形成攔截鏈,這個攔截鏈會按照其中的攔截器的加入順序一一執行。比如上面的程序多添加一個攔截器,示例如下:

properties.put("interceptor.classes", "com.hidden.producer.ProducerInterceptorDemo,com.hidden.producer.ProducerInterceptorDemoPlus");1

這樣Kafka Producer會先執行攔截器ProducerInterceptorDemo,之後再執行ProducerInterceptorDemoPlus。

有關interceptor.classes參數,在kafka 1.0.0版本中的定義如下:

NAME DESCRIPTION TYPE DEFAULT VALID VALUES IMPORTANCE
interceptor.calssses A list of classes to use as interceptors. Implementing the org.apache.kafka.clients.producer.ProducerInterceptor interface allows you to intercept (and possibly mutate) the records received by the producer before they are published to the Kafka cluster. By default, there no interceptors. list null low

Kafka Producer 攔截器