圖解Kafka之 Partitioner
阿新 • • 發佈:2020-08-25
基於 Kafka Version 2.4
//Partitioner 介面 public interface Partitioner extends Configurable, Closeable { //根據給定的資料,找到 partition public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster); // 關閉 partition public void close(); //在批量操作前,可以修改 partition , 預設沒有實現 default public void onNewBatch(String topic, Cluster cluster, int prevPartition) { } }
只有一個預設的實現類,實現如下:
/** 預設的 partition 分配策略 1. record 有指定的,使用指定的 2. key 有值,Hash(key) & numPartitions , 得到 partition 3. 沒有可用的,從所有中隨機取一個 4. 有可用的,從可用中隨機取一個 */ public class DefaultPartitioner implements Partitioner { private final StickyPartitionCache stickyPartitionCache = new StickyPartitionCache(); public void configure(Map<String, ?> configs) {} /** * Compute the partition for the given record */ public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) { if (keyBytes == null) { return stickyPartitionCache.partition(topic, cluster); } List<PartitionInfo> partitions = cluster.partitionsForTopic(topic); int numPartitions = partitions.size(); // hash the keyBytes to choose a partition return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions; } public void close() {} /** * If a batch completed for the current sticky partition, change the sticky partition. * Alternately, if no sticky partition has been determined, set one. */ public void onNewBatch(String topic, Cluster cluster, int prevPartition) { stickyPartitionCache.nextPartition(topic, cluster, prevPartition); } }
org.apache.kafka.clients.producer.KafkaProducer#partition
這個方法是在執行時分配 Partition 的入口
/** * computes partition for given record. * if the record has partition returns the value otherwise * calls configured partitioner class to compute the partition. */ private int partition(ProducerRecord<K, V> record, byte[] serializedKey, byte[] serializedValue, Cluster cluster) { Integer partition = record.partition(); return partition != null ? partition : partitioner.partition( record.topic(), record.key(), serializedKey, record.value(), serializedValue, cluster); }
如果文章有幫助到您,請點個贊,您的反饋會讓我感到文章是有價值的