1. 程式人生 > >Kafka中Producer攔截器

Kafka中Producer攔截器

1.攔截器原理(interceptor)

Producer攔截器(interceptor)是在Kafka 0.10版本被引入的,主要用於實現clients端的定製化控制邏輯

對於producer而言,interceptor使得使用者在訊息傳送前以及producer回撥邏輯前有機會對訊息做一些定製化需求,比如修改訊息

等,同時producer允許使用者指定多個interceptor按序作用於同一條訊息從而形成一個攔截鏈(interceptor chain),Intercetpor的實現介面是org.apache.kafka.clients.producer.ProducerInterceptor

,需要實現的方法包括:

    1)configure(configs):

獲取配置資訊和初始化資料時呼叫

    2)onSend(ProducerRecord):

Producer確保在訊息被序列化以及計算分割槽前呼叫該方法。使用者可以在該方法中對訊息做任何操作,但最好保證不要修改訊息所屬的topic和分割槽,否則會影響目標分割槽的計算

    3)onAcknowledgement(RecordMetadata, Exception):

該方法會在訊息被應答或訊息傳送失敗時呼叫

    4)close:

關閉interceptor,主要用於執行一些資源清理工作

2.攔截器實現

    1)需求:

實現一個簡單的雙interceptor組成的攔截鏈,第一個interceptor會在訊息傳送前將時間戳資訊加到訊息value的最前部;第二個interceptor會在訊息傳送後更新成功傳送訊息數或失敗傳送訊息數

    2)程式碼實現

第一個攔截器實現:

Interceptor1.java

package cn.ysjh;

import java.util.Map;

import org.apache.kafka.clients.producer.ProducerInterceptor;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;

public class Interceptor1 implements ProducerInterceptor<String, String>{

	@Override
	public void configure(Map<String, ?> arg0) {
		// TODO Auto-generated method stub
		
	}

	@Override
	public void close() {
		// TODO Auto-generated method stub
		
	}

	@Override
	public void onAcknowledgement(RecordMetadata arg0, Exception arg1) {
		// TODO Auto-generated method stub
		
	}

	@Override
	public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {
		
		return new ProducerRecord(record.topic(), record.partition(), record.timestamp(), record.key(),
				System.currentTimeMillis() + "," + record.value().toString());
	}

}

第二個攔截器實現:

Interceptor2.java

package cn.ysjh;

import java.util.Map;

import org.apache.kafka.clients.producer.ProducerInterceptor;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;

public class Interceptor2 implements ProducerInterceptor<String, String>{

	 private int errorCounter = 0;
	    private int successCounter = 0;
	    
	@Override
	public void configure(Map<String, ?> arg0) {
		// TODO Auto-generated method stub
		
	}

	@Override
	public void close() {
		 // 儲存結果
        System.out.println("Successful sent: " + successCounter);
        System.out.println("Failed sent: " + errorCounter);
	}

	@Override
	public void onAcknowledgement(RecordMetadata arg0, Exception arg1) {
		// 統計成功和失敗的次數
        if (arg1 == null) {
            successCounter++;
        } else {
            errorCounter++;
        }
		
	}

	@Override
	public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {
		// TODO Auto-generated method stub
		return record;
	}

}

在Producer中進行註冊:     Interceptor會按照註冊的順序進行依次攔截

package cn.ysjh;

import java.util.ArrayList;
import java.util.List;
import java.util.Properties;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;

public class ProducerTest {

	
	public static  void main(String args[]) {
		//1.配置生產者屬性
		Properties props = new Properties();
		// Kafka服務端的主機名和埠號,可以是多個
		props.put("bootstrap.servers", "172.17.0.3:9092");
		//配置傳送的訊息是否等待應答
		props.put("acks", "all");
		//配置訊息傳送失敗的重試
		props.put("retries", 0);
		// 批量處理資料的大小:16kb
		props.put("batch.size", 16384);
		// 設定批量處理資料的延遲,單位:ms
		props.put("linger.ms", 1);
		// 設定記憶體緩衝區的大小
		props.put("buffer.memory", 33554432);
		//資料在傳送之前一定要序列化
		// key序列化
		props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
		// value序列化
		props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
		
		// 2 構建攔截鏈
		List<String> interceptors = new ArrayList<>();
		interceptors.add("cn.ysjh.Interceptor1"); 	interceptors.add("cn.ysjh.Interceptor2"); 
		props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, interceptors);
		
		
        //2.例項化KafkaProducer
		KafkaProducer<String, String> producer = new KafkaProducer<>(props);
		for (int i = 0; i < 50; i++) {
	    //3.呼叫Producer的send方法,進行訊息的傳送,每條待發送的訊息,都必須封裝為一個Record物件
			producer.send(new ProducerRecord<String, String>("test", Integer.toString(i), "hello world"+i));
		}
        //4.close釋放資源
		producer.close();
	}
}

最後在Kafka叢集中開啟一個消費者,執行程式即可,效果如圖: