1. 程式人生 > 其它 >Kafka 之producer攔截器(interceptor)

Kafka 之producer攔截器(interceptor)

Kafka 之producer攔截器(interceptor)

一、攔截器原理

  Producer攔截器(interceptor)是在Kafka 0.10版本被引入的,主要用於實現clients端的定製化控制邏輯。
  對於producer而言,interceptor使得使用者在訊息傳送前以及producer回撥邏輯前有機會對訊息做一些定製化需求,比如修改訊息等。同時,producer允許使用者指定多個interceptor按序作用於同一條訊息從而形成一個攔截鏈(interceptor chain)。Intercetpor的實現介面是org.apache.kafka.clients.producer.ProducerInterceptor

,其定義的方法包括:
  (1)configure(configs)
  獲取配置資訊和初始化資料時呼叫。
  (2)onSend(ProducerRecord):
  該方法封裝進KafkaProducer.send方法中,即它執行在使用者主執行緒中。Producer確保在訊息被序列化以及計算分割槽前呼叫該方法。使用者可以在該方法中對訊息做任何操作,但最好保證不要修改訊息所屬的topic和分割槽,否則會影響目標分割槽的計算
  (3)onAcknowledgement(RecordMetadata, Exception):
  該方法會在訊息被應答或訊息傳送失敗時呼叫,並且通常都是在producer回撥邏輯觸發之前。onAcknowledgement執行在producer的IO執行緒中,因此不要在該方法中放入很重的邏輯,否則會拖慢producer的訊息傳送效率
  (4)close:
  關閉interceptor,主要用於執行一些資源清理工作

  如前所述,interceptor可能被執行在多個執行緒中,因此在具體實現時使用者需要自行確保執行緒安全。另外倘若指定了多個interceptor,則producer將按照指定順序呼叫它們,並僅僅是捕獲每個interceptor可能丟擲的異常記錄到錯誤日誌中而非在向上傳遞。這在使用過程中要特別留意。

二、攔截器案例

1)需求:
  實現一個簡單的雙interceptor組成的攔截鏈。第一個interceptor會在訊息傳送前將時間戳資訊加到訊息value的最前部;第二個interceptor會在訊息傳送後更新成功傳送訊息數或失敗傳送訊息數。

2)案例實操
(1)增加時間戳攔截器

package
com.libt.kafka.interceptor; import java.util.Map; import org.apache.kafka.clients.producer.ProducerInterceptor; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; public class TimeInterceptor implements ProducerInterceptor<String, String> { @Override public void configure(Map<String, ?> configs) { } @Override public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) { // 建立一個新的record,把時間戳寫入訊息體的最前部 return new ProducerRecord(record.topic(), record.partition(), record.timestamp(), record.key(), System.currentTimeMillis() + "," + record.value().toString()); } @Override public void onAcknowledgement(RecordMetadata metadata, Exception exception) { } @Override public void close() { } }

(2)統計傳送訊息成功和傳送失敗訊息數,並在producer關閉時列印這兩個計數器

package com.libt.kafka.interceptor;
import java.util.Map;
import org.apache.kafka.clients.producer.ProducerInterceptor;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;

public class CounterInterceptor implements ProducerInterceptor<String, String>{
    private int errorCounter = 0;
    private int successCounter = 0;

    @Override
    public void configure(Map<String, ?> configs) {
        
    }

    @Override
    public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {
         return record;
    }

    @Override
    public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
        // 統計成功和失敗的次數
        if (exception == null) {
            successCounter++;
        } else {
            errorCounter++;
        }
    }

    @Override
    public void close() {
        // 儲存結果
        System.out.println("Successful sent: " + successCounter);
        System.out.println("Failed sent: " + errorCounter);
    }
}

(3)producer主程式

package com.libt.kafka.interceptor;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;

public class InterceptorProducer {

    public static void main(String[] args) throws Exception {
        // 1 設定配置資訊
        Properties props = new Properties();
        props.put("bootstrap.servers", "hadoop1:9092");
        props.put("acks", "all");
        props.put("retries", 0);
        props.put("batch.size", 16384);
        props.put("linger.ms", 1);
        props.put("buffer.memory", 33554432);
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        
        // 2 構建攔截鏈
        List<String> interceptors = new ArrayList<>();
        interceptors.add("com.atguigu.kafka.interceptor.TimeInterceptor");     interceptors.add("com.atguigu.kafka.interceptor.CounterInterceptor"); 
        props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, interceptors);
         
        String topic = "first";
        Producer<String, String> producer = new KafkaProducer<>(props);
        
        // 3 傳送訊息
        for (int i = 0; i < 10; i++) {
            
            ProducerRecord<String, String> record = new ProducerRecord<>(topic, "message" + i);
            producer.send(record);
        }
         
        // 4 一定要關閉producer,這樣才會呼叫interceptor的close方法
        producer.close();
    }
}

3)測試
(1)在kafka上啟動消費者,然後執行客戶端java程式。

[hadoop1 kafka]$ bin/kafka-console-consumer.sh \
--zookeeper hadoop1:2181 --from-beginning --topic first

1501904047034,message0
1501904047225,message1
1501904047230,message2
1501904047234,message3
1501904047236,message4
1501904047240,message5
1501904047243,message6
1501904047246,message7
1501904047249,message8
1501904047252,message9

(2)觀察java平臺控制檯輸出資料如下:

Successful sent: 10
Failed sent: 0
做自己的太陽,成為別人的光!