Kafka自定義分割槽器
阿新 • • 發佈:2022-05-15
1.定義類實現 Partitioner 介面,重寫 partition()方法
package com.kafka.producer; import org.apache.kafka.clients.producer.Partitioner; import org.apache.kafka.common.Cluster; import java.util.Map; /** * 1. 實現介面 Partitioner * 2. 實現 3 個方法:partition,close,configure * 3. 編寫 partition 方法,返回分割槽號 */ public class MyPartitioner implementsPartitioner { /*** 返回資訊對應的分割槽 * @param topic 主題 * @param key 訊息的 key * @param keyBytes 訊息的 key 序列化後的位元組陣列 * @param value 訊息的 value * @param valueBytes 訊息的 value 序列化後的位元組陣列 * @param cluster 叢集元資料可以檢視分割槽資訊 * @return */
@Override
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
// 獲取訊息
String msgValue = value.toString();
// 建立 partition
int partition;
// 判斷訊息是否包含 Kafka
if (msgValue.contains("Kafka")){
partition = 0;
}else {
partition = 1;
}
// 返回分割槽號
return partition;
}
// 關閉資源
@Override
public void close() {
}
// 配置方法
@Override
public void configure(Map<String, ?> configs) {
}
}
2.使用分割槽器的方法,在生產者的配置中新增分割槽器引數。
複製之前寫的CustomProducerCallBack類,改成CustomProducerCallBackPartition
紅色程式碼為新增部分
package com.kafka.producer; import org.apache.kafka.clients.producer.*; import org.apache.kafka.common.serialization.StringSerializer; import java.util.Properties; public class CustomProducerCallBackPartition {public static void main(String[] args) { //配置 Properties properties = new Properties(); //連線叢集 properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092"); //指定對應的key和value序列化型別 properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,"com.kafka.producer.MyPartitioner"); //建立Kafka生產者物件 KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties); //傳送資料 for (int i = 0; i < 5; i++) { kafkaProducer.send(new ProducerRecord<>("first", "Kafka" + i), new Callback() { @Override public void onCompletion(RecordMetadata metadata, Exception exception) { if (exception == null) { System.out.println("主題:" + metadata.topic() + " 分割槽:" + metadata.partition()); } } }); } //關閉資源 kafkaProducer.close(); } }
3.在 hadoop102 上開啟 Kafka 消費者。
bin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic first
4.在IDEA執行程式碼
檢視hadoop102資訊
同時IDEA列印資訊,指定分割槽為0,因為訊息包含Kafka
修改傳送的資訊為hello
執行程式碼
檢視hadoop102資訊
同時IDEA列印資訊,指定分割槽為1,因為訊息不包含Kafka