1. 程式人生 > >kafka的攔截器API

kafka的攔截器API

攔截器
	public class TimeInterceptor implements ProducerInterceptor<String, String>{
		//設定資訊
		public void configure(Map<String, ?> configs) {

		}

		//業務邏輯
		public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record){
			return new ProdecerRecord<String, String>(
				record.topic(),
				record.partition(),
				record.key(),
				System.currentTimeMillis() + "-" +record.value());
		}

		//應答,傳送失敗呼叫
		public void onAcknowledgement(RecordMetadata metadata, Exception exception){

		}

		//釋放資源
		public void close(){

		}
	}
	{
		//配置生產者屬性(指定多個引數)
		Properties prop = new Properties();

		//省略不寫
		-----
		//訊息在傳送前必須序列化
		prop.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
		prop.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");

		//攔截器
		ArrayList<String> inList = new ArrayList<String>();
		inList.add("com.terry.kafka.TimeInterceptor");
		prop.put("ProducerConfig.In=NTERCEPTOR_CLASS_CONFIG",inList);

		//2、例項化producer
		KafkaProducer<String, String> producer = new KafkaProducer<String, String>(prop);

		//3、傳送訊息
		for(int i = 0; i < 99; i++) {
			producer.send(new ProducerRecord<String, String>("test", i),new CallBack(){
				public void onCompletion(RecordMetadata metadata, Exception exception) {
					//如果metadata不為null,拿到當前的資料偏移量與分割槽
					if(metadata != null) {
						Sout(metadata.topic() + "------" + metadata.offset() + "------" + metadata.partition());
					}
				}
			});
		}

		//4、關閉資源
		producer.close();
	}