1. 程式人生 > 實用技巧 >圖解Kafka之 Partitioner

圖解Kafka之 Partitioner

基於 Kafka Version 2.4

//Partitioner 介面
public interface Partitioner extends Configurable, Closeable {
    //根據給定的資料,找到 partition
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster);
    // 關閉 partition
    public void close();
    //在批量操作前,可以修改 partition , 預設沒有實現
    default public void onNewBatch(String topic, Cluster cluster, int prevPartition) {
    }
}

只有一個預設的實現類,實現如下:

/**
預設的 partition 分配策略
1. record 有指定的,使用指定的
2. key 有值,Hash(key) & numPartitions , 得到 partition
3. 沒有可用的,從所有中隨機取一個
4. 有可用的,從可用中隨機取一個
 */
public class DefaultPartitioner implements Partitioner {

    private final StickyPartitionCache stickyPartitionCache = new StickyPartitionCache();

    public void configure(Map<String, ?> configs) {}

    /**
     * Compute the partition for the given record
     */
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        if (keyBytes == null) {
            return stickyPartitionCache.partition(topic, cluster);
        } 
        List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
        int numPartitions = partitions.size();
        // hash the keyBytes to choose a partition
        return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
    }

    public void close() {}
    
    /**
     * If a batch completed for the current sticky partition, change the sticky partition. 
     * Alternately, if no sticky partition has been determined, set one.
     */
    public void onNewBatch(String topic, Cluster cluster, int prevPartition) {
        stickyPartitionCache.nextPartition(topic, cluster, prevPartition);
    }
}

org.apache.kafka.clients.producer.KafkaProducer#partition 這個方法是在執行時分配 Partition 的入口

   /**
     * computes partition for given record.
     * if the record has partition returns the value otherwise
     * calls configured partitioner class to compute the partition.
     */
    private int partition(ProducerRecord<K, V> record, byte[] serializedKey, byte[] serializedValue, Cluster cluster) {
        Integer partition = record.partition();
        return partition != null ?
                partition :
                partitioner.partition(
                        record.topic(), record.key(), serializedKey, record.value(), serializedValue, cluster);
    }

如果文章有幫助到您,請點個贊,您的反饋會讓我感到文章是有價值的