1. 程式人生 > 實用技巧 >kafka 自定義分割槽

kafka 自定義分割槽

1:POM檔案

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>0.11.0.0</version>
</dependency>

2:自定義分割槽

package com.kpwong.partitioner;

import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;

import java.util.Map; public class Mypartitioner implements Partitioner { @Override public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) { // cluster.availablePartitionsForTopic(topic).size(); return 1; } @Override
public void close() { } @Override public void configure(Map<String, ?> map) { } }

3:生產者使用自定義分割槽

properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,"com.kpwong.partitioner.Mypartitioner");
package com.kpwong.producer;

import org.apache.kafka.clients.producer.*;

import
java.util.Properties; public class PartitionProducer { public static void main(String[] args) { //Create kafka 生產者配置資訊 Properties properties = new Properties(); //kafka 叢集, broker list properties.put("bootstrap.servers", "hadoop202:9092"); properties.put("acks", "all"); //重試次數 properties.put("retries", 1); //批次大小 properties.put("batch.size", 16384); //等待時間 properties.put("linger.ms", 1); //RecordAccumulator 緩衝區大小 32M properties.put("buffer.memory", 33554432); // key value 的序列化類 properties.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer"); properties.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer"); //使用者自定義分割槽 // ProducerConfig.PARTITIONER_CLASS_CONFIG properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,"com.kpwong.partitioner.Mypartitioner"); //建立生產者物件 KafkaProducer<String, String> producer = new KafkaProducer<String, String>(properties); //傳送資料 for(int i = 11 ;i <= 20;i++) { producer.send(new ProducerRecord<String, String>("two", "kpwong--" + i), new Callback() { @Override public void onCompletion(RecordMetadata recordMetadata, Exception e) { if( e == null) { System.out.println(recordMetadata.partition() + "-----"+ recordMetadata.offset()); } else { e.printStackTrace(); } } }); } //關閉連線 producer.close(); } }

執行看結果: