初識kafka 之 自定義分割槽器
阿新 • • 發佈:2022-04-14
需求
通過一個分割槽器實現,傳送過來的資料中如果包含kafka,就發往0號分割槽,不包含kafka,就發往1號分割槽。
程式碼實現
分割槽器
package com.lzh.kafka; import org.apache.kafka.clients.producer.Partitioner; import org.apache.kafka.common.Cluster; import java.util.Map; // 自定義分割槽器 /* 需求 通過一個分割槽器實現,傳送過來的資料中如果包含kafka,就發往0號分割槽,不包含kafka,就發往1號分割槽。 程式碼寫好之後,複製類全面: com.lzh.kafka.MyPartitioner 在序列化時,指定分割槽器。*/ public class MyPartitioner implements Partitioner { public int partition(String s, Object o, byte[] bytes, Object o1, byte[] bytes1, Cluster cluster) { // 獲取資料 // 把獲取到的資料轉為String型別 String msgValues = o1.toString(); // 建立partition int partition; if (msgValues.contains("kafka")) { partition=0; }else { partition=1; } return partition; } public void close() { } public void configure(Map<String, ?> map) { } }
主體使用
package com.lzh.kafka; import org.apache.kafka.clients.producer.*; import org.apache.kafka.common.serialization.StringSerializer;import java.util.Properties; import java.util.concurrent.ExecutionException; // 自定義分割槽器 /* 需求 通過一個分割槽器實現,傳送過來的資料中如果包含kafka,就發往0號分割槽,不包含kafka,就發往1號分割槽。 程式碼寫好之後,複製類全面: com.lzh.kafka.MyPartitioner 在序列化時,指定分割槽器。 */ public class CustomProducer自定義分割槽器 { public static void main(String[] args) throws ExecutionException, InterruptedException { // 1.配置 Properties properties = new Properties(); // 連線叢集 // 給kafka物件新增配置資訊 bootstrap.servers // 生產者連線叢集所需的 broker 地址清單 properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"bigdata01:9092,bigdata02:9092"); // 指定傳送訊息的key和value的序列化型別。一定要寫全類名。 // key,value序列化 key.serializer,value.serializer // key序列化 // 全類名與下等價: properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer"); properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); // value序列化 properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName()); // 指定自定義分割槽器 properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,"com.lzh.kafka.MyPartitioner"); // 2.建立 kafka 生產者物件 KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties); // 3.傳送資料 // 呼叫 send 方法,傳送訊息 for (int i = 1; i < 5; i++) { final String msg; if (i % 2 == 0) { msg="hello kafka"; }else { msg="hello java"; } kafkaProducer.send(new ProducerRecord<String, String>("Mytopic", msg), new Callback() { public void onCompletion(RecordMetadata recordMetadata, Exception e) { if (e == null) { System.out.println(msg + "已傳送到主題"+recordMetadata.topic() +"的"+ recordMetadata.partition() +"分割槽"); } } }); } // 4.關閉資源 kafkaProducer.close(); } }
附,類全名複製方法。
結果