kafka的攔截器API
阿新 • • 發佈:2018-12-23
攔截器 public class TimeInterceptor implements ProducerInterceptor<String, String>{ //設定資訊 public void configure(Map<String, ?> configs) { } //業務邏輯 public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record){ return new ProdecerRecord<String, String>( record.topic(), record.partition(), record.key(), System.currentTimeMillis() + "-" +record.value()); } //應答,傳送失敗呼叫 public void onAcknowledgement(RecordMetadata metadata, Exception exception){ } //釋放資源 public void close(){ } } { //配置生產者屬性(指定多個引數) Properties prop = new Properties(); //省略不寫 ----- //訊息在傳送前必須序列化 prop.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer"); prop.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer"); //攔截器 ArrayList<String> inList = new ArrayList<String>(); inList.add("com.terry.kafka.TimeInterceptor"); prop.put("ProducerConfig.In=NTERCEPTOR_CLASS_CONFIG",inList); //2、例項化producer KafkaProducer<String, String> producer = new KafkaProducer<String, String>(prop); //3、傳送訊息 for(int i = 0; i < 99; i++) { producer.send(new ProducerRecord<String, String>("test", i),new CallBack(){ public void onCompletion(RecordMetadata metadata, Exception exception) { //如果metadata不為null,拿到當前的資料偏移量與分割槽 if(metadata != null) { Sout(metadata.topic() + "------" + metadata.offset() + "------" + metadata.partition()); } } }); } //4、關閉資源 producer.close(); }