Java kafka如何實現自定義分割槽類和攔截器
生產者傳送到對應的分割槽有以下幾種方式:
(1)指定了patition,則直接使用;(可以查閱對應的java api,有多種引數)
(2)未指定patition但指定key,通過對key的value進行hash出一個patition;
(3)patition和key都未指定,使用輪詢選出一個patition。
但是kafka提供了,自定義分割槽演算法的功能,由業務手動實現分佈:
1、實現一個自定義分割槽類,CustomPartitioner實現Partitioner
import org.apache.kafka.clients.producer.Partitioner; import org.apache.kafka.common.Cluster; import java.util.Map; public class CustomPartitioner implements Partitioner { /** * * @param topic 當前的傳送的topic * @param key 當前的key值 * @param keyBytes 當前的key的位元組陣列 * @param value 當前的value值 * @param valueBytes 當前的value的位元組陣列 * @param cluster * @return */ @Override public int partition(String topic,Object key,byte[] keyBytes,Object value,byte[] valueBytes,Cluster cluster) { //這邊根據返回值就是分割槽號,這邊就是固定傳送到三號分割槽 return 3; } @Override public void close() { } @Override public void configure(Map<String,?> configs) { } }
2、producer配置檔案指定,具體的分割槽類
// 具體的分割槽類
props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,"kafka.CustomPartitioner");
技巧:可以使用ProducerConfig中提供的配置ProducerConfig
kafka producer攔截器
攔截器(interceptor)是在Kafka 0.10版本被引入的。
interceptor使得使用者在訊息傳送前以及producer回撥邏輯前有機會對訊息做一些定製化需求,比如修改訊息等。
許使用者指定多個interceptor按序作用於同一條訊息從而形成一個攔截鏈(interceptor chain)。
所使用的類為:
org.apache.kafka.clients.producer.ProducerInterceptor
我們可以編碼測試下:
1、定義訊息攔截器,實現訊息處理(可以是加時間戳等等,unid等等。)
import org.apache.kafka.clients.producer.ProducerInterceptor; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; import java.util.Map; import java.util.UUID; public class MessageInterceptor implements ProducerInterceptor<String,String> { @Override public void configure(Map<String,?> configs) { System.out.println("這是MessageInterceptor的configure方法"); } /** * 這個是訊息傳送之前進行處理 * * @param record * @return */ @Override public ProducerRecord<String,String> onSend(ProducerRecord<String,String> record) { // 建立一個新的record,把uuid入訊息體的最前部 System.out.println("為訊息新增uuid"); return new ProducerRecord(record.topic(),record.partition(),record.timestamp(),record.key(),UUID.randomUUID().toString().replace("-","") + "," + record.value()); } /** * 這個是生產者回調函式呼叫之前處理 * @param metadata * @param exception */ @Override public void onAcknowledgement(RecordMetadata metadata,Exception exception) { System.out.println("MessageInterceptor攔截器的onAcknowledgement方法"); } @Override public void close() { System.out.println("MessageInterceptor close 方法"); } }
2、定義計數攔截器
import java.util.Map; import org.apache.kafka.clients.producer.ProducerInterceptor; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; public class CounterInterceptor implements ProducerInterceptor<String,String>{ private int errorCounter = 0; private int successCounter = 0; @Override public void configure(Map<String,?> configs) { System.out.println("這是CounterInterceptor的configure方法"); } @Override public ProducerRecord<String,String> record) { System.out.println("CounterInterceptor計數過濾器不對訊息做任何操作"); return record; } @Override public void onAcknowledgement(RecordMetadata metadata,Exception exception) { // 統計成功和失敗的次數 System.out.println("CounterInterceptor過濾器執行統計失敗和成功數量"); if (exception == null) { successCounter++; } else { errorCounter++; } } @Override public void close() { // 儲存結果 System.out.println("Successful sent: " + successCounter); System.out.println("Failed sent: " + errorCounter); } }
3、producer客戶端:
import org.apache.kafka.clients.producer.*; import java.util.ArrayList; import java.util.List; import java.util.Properties; public class Producer1 { public static void main(String[] args) throws Exception { Properties props = new Properties(); // Kafka服務端的主機名和埠號 props.put("bootstrap.servers","localhost:9092"); // 等待所有副本節點的應答 props.put("acks","all"); // 訊息傳送最大嘗試次數 props.put("retries",0); // 一批訊息處理大小 props.put("batch.size",16384); // 請求延時,可能生產資料太快了 props.put("linger.ms",1); // 傳送快取區記憶體大小,資料是先放到生產者的緩衝區 props.put("buffer.memory",33554432); // key序列化 props.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer"); // value序列化 props.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer"); // 具體的分割槽類 props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,"kafka.CustomPartitioner"); //定義攔截器 List<String> interceptors = new ArrayList<>(); interceptors.add("kafka.MessageInterceptor"); interceptors.add("kafka.CounterInterceptor"); props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,interceptors); Producer<String,String> producer = new KafkaProducer<>(props); for (int i = 0; i < 1; i++) { producer.send(new ProducerRecord<String,String>("test_0515",i + "","xxx-" + i),new Callback() { public void onCompletion(RecordMetadata recordMetadata,Exception e) { System.out.println("這是producer回撥函式"); } }); } /*System.out.println("現在執行關閉producer"); producer.close();*/ producer.close(); } }
總結,我們可以知道攔截器鏈各個方法的執行順序,假如有A、B攔截器,在一個攔截器鏈中:
(1)執行A的configure方法,執行B的configure方法
(2)執行A的onSend方法,B的onSend方法
(3)生產者傳送完畢後,執行A的onAcknowledgement方法,B的onAcknowledgement方法。
(4)執行producer自身的callback回撥函式。
(5)執行A的close方法,B的close方法。
以上就是本文的全部內容,希望對大家的學習有所幫助,也希望大家多多支援我們。