kafka 自定義分割槽
阿新 • • 發佈:2020-11-28
1:POM檔案
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.11.0.0</version>
</dependency>
2:自定義分割槽
package com.kpwong.partitioner; import org.apache.kafka.clients.producer.Partitioner; import org.apache.kafka.common.Cluster;import java.util.Map; public class Mypartitioner implements Partitioner { @Override public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) { // cluster.availablePartitionsForTopic(topic).size(); return 1; } @Overridepublic void close() { } @Override public void configure(Map<String, ?> map) { } }
3:生產者使用自定義分割槽
properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,"com.kpwong.partitioner.Mypartitioner");
package com.kpwong.producer;
import org.apache.kafka.clients.producer.*;
import java.util.Properties;
public class PartitionProducer {
public static void main(String[] args) {
//Create kafka 生產者配置資訊
Properties properties = new Properties();
//kafka 叢集, broker list
properties.put("bootstrap.servers", "hadoop202:9092");
properties.put("acks", "all");
//重試次數
properties.put("retries", 1);
//批次大小
properties.put("batch.size", 16384);
//等待時間
properties.put("linger.ms", 1);
//RecordAccumulator 緩衝區大小 32M
properties.put("buffer.memory", 33554432);
// key value 的序列化類
properties.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
properties.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");
//使用者自定義分割槽
// ProducerConfig.PARTITIONER_CLASS_CONFIG
properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,"com.kpwong.partitioner.Mypartitioner");
//建立生產者物件
KafkaProducer<String, String> producer = new KafkaProducer<String, String>(properties);
//傳送資料
for(int i = 11 ;i <= 20;i++)
{
producer.send(new ProducerRecord<String, String>("two", "kpwong--" + i), new Callback() {
@Override
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
if( e == null)
{
System.out.println(recordMetadata.partition() + "-----"+ recordMetadata.offset());
}
else
{
e.printStackTrace();
}
}
});
}
//關閉連線
producer.close();
}
}
執行看結果: