1. 程式人生 > >[kafka掃盲]---(6)kafka原始碼閱讀之分割槽器

[kafka掃盲]---(6)kafka原始碼閱讀之分割槽器

Author:趙志乾
Date:2018-10-21
Declaration:All Right Reserved!!!

DefaultPartitioner.java

該類實現了Partitioner介面,核心方法為partition():用於給未指定分割槽號的訊息記錄生成分割槽號,其生成策略也比較簡單。其依據是否指定key值採用兩種不同的策略:如果指定key值,則按照key的hash來生成分割槽號,如果未指定key值,則按照輪詢策略來生成分割槽號。

為了實現輪詢策略,該類內部維護了一個例項欄位:topicCounterMap,其型別為ConcurrentMap。該欄位用於維護topic名稱到topic在當前客戶端中所持有的計數器。

如果要實現自定義的分割槽器,可以通過實現Partitioner介面來完成。

package org.apache.kafka.clients.producer.internals;

import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicInteger;

import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.utils.Utils;

/*該類的例項是由kafka客戶端預設的分割槽器,提供預設的分割槽策略:如果生產者要釋出的訊息記錄指定了分割槽
號,則直接使用該分割槽號進行二級分屬劃分;如果沒有指定分割槽號,而是指定了key值,則使用該key的hash值
來生成分割槽號,進行訊息記錄的二級分屬劃分;如果分割槽號和key都沒有指定,則通過輪詢的方式生成一個分割槽
號,進行二級分屬劃分*/
public class DefaultPartitioner implements Partitioner {
    //主題計數器,用於輪詢策略使用
    private final ConcurrentMap<String, AtomicInteger> topicCounterMap = new ConcurrentHashMap<>();

    public void configure(Map<String, ?> configs) {}

    /*為指定訊息記錄計算分割槽號,所需引數包括:主題、key、key序列化後的位元組陣列、value、value序
列化後的位元組陣列、kafka叢集元資料*/
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[]
 valueBytes, Cluster cluster) {
        //獲取訊息記錄所屬主題在叢集中的當前分割槽資訊
        List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
        //當前主題擁有的分割槽數
        int numPartitions = partitions.size();
        //如果沒有指定key值,便採用輪詢的方式生成分割槽號
        if (keyBytes == null) {
            //獲取主題下一個計數值
            int nextValue = nextValue(topic);
            //獲取叢集上指定主題下的可用分割槽資訊
            List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic
(topic);    
            //如果存在可用分割槽
            if (availablePartitions.size() > 0) {
                //按輪詢策略得到分割槽號
                int part = Utils.toPositive(nextValue) % availablePartitions.size();
                return availablePartitions.get(part).partition();
            } else {
                //沒有可用分割槽時,按主題下所有分割槽參與輪詢計算,返回一個不可用分割槽號            
                return Utils.toPositive(nextValue) % numPartitions;
            }
        } else {
            如果存在key值,則通過對key值hash的方式返回一個分割槽號            
            return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
        }
    }

    //獲取主題的下一個計數值,返回值用於後續的輪詢策略
    private int nextValue(String topic) {
        //獲取指定主題的計數器
        AtomicInteger counter = topicCounterMap.get(topic);
        //如果計數器不存在,代表首次向該主題釋出訊息
        if (null == counter) {
            //為主題生成一個計數器例項
            counter = new AtomicInteger(ThreadLocalRandom.current().nextInt());
            AtomicInteger currentCounter = topicCounterMap.putIfAbsent(topic, counter);
            if (currentCounter != null) {
                counter = currentCounter;
            }
        }
        //返回主題下一個計數值
        return counter.getAndIncrement();
    }

    public void close() {}
}