[kafka掃盲]---(6)kafka原始碼閱讀之分割槽器
阿新 • • 發佈:2019-02-05
Author:趙志乾
Date:2018-10-21
Declaration:All Right Reserved!!!
DefaultPartitioner.java
該類實現了Partitioner介面,核心方法為partition():用於給未指定分割槽號的訊息記錄生成分割槽號,其生成策略也比較簡單。其依據是否指定key值採用兩種不同的策略:如果指定key值,則按照key的hash來生成分割槽號,如果未指定key值,則按照輪詢策略來生成分割槽號。
為了實現輪詢策略,該類內部維護了一個例項欄位:topicCounterMap,其型別為ConcurrentMap。該欄位用於維護topic名稱到topic在當前客戶端中所持有的計數器。
如果要實現自定義的分割槽器,可以通過實現Partitioner介面來完成。
package org.apache.kafka.clients.producer.internals; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.atomic.AtomicInteger; import org.apache.kafka.clients.producer.Partitioner; import org.apache.kafka.common.Cluster; import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.utils.Utils; /*該類的例項是由kafka客戶端預設的分割槽器,提供預設的分割槽策略:如果生產者要釋出的訊息記錄指定了分割槽 號,則直接使用該分割槽號進行二級分屬劃分;如果沒有指定分割槽號,而是指定了key值,則使用該key的hash值 來生成分割槽號,進行訊息記錄的二級分屬劃分;如果分割槽號和key都沒有指定,則通過輪詢的方式生成一個分割槽 號,進行二級分屬劃分*/ public class DefaultPartitioner implements Partitioner { //主題計數器,用於輪詢策略使用 private final ConcurrentMap<String, AtomicInteger> topicCounterMap = new ConcurrentHashMap<>(); public void configure(Map<String, ?> configs) {} /*為指定訊息記錄計算分割槽號,所需引數包括:主題、key、key序列化後的位元組陣列、value、value序 列化後的位元組陣列、kafka叢集元資料*/ public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) { //獲取訊息記錄所屬主題在叢集中的當前分割槽資訊 List<PartitionInfo> partitions = cluster.partitionsForTopic(topic); //當前主題擁有的分割槽數 int numPartitions = partitions.size(); //如果沒有指定key值,便採用輪詢的方式生成分割槽號 if (keyBytes == null) { //獲取主題下一個計數值 int nextValue = nextValue(topic); //獲取叢集上指定主題下的可用分割槽資訊 List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic (topic); //如果存在可用分割槽 if (availablePartitions.size() > 0) { //按輪詢策略得到分割槽號 int part = Utils.toPositive(nextValue) % availablePartitions.size(); return availablePartitions.get(part).partition(); } else { //沒有可用分割槽時,按主題下所有分割槽參與輪詢計算,返回一個不可用分割槽號 return Utils.toPositive(nextValue) % numPartitions; } } else { 如果存在key值,則通過對key值hash的方式返回一個分割槽號 return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions; } } //獲取主題的下一個計數值,返回值用於後續的輪詢策略 private int nextValue(String topic) { //獲取指定主題的計數器 AtomicInteger counter = topicCounterMap.get(topic); //如果計數器不存在,代表首次向該主題釋出訊息 if (null == counter) { //為主題生成一個計數器例項 counter = new AtomicInteger(ThreadLocalRandom.current().nextInt()); AtomicInteger currentCounter = topicCounterMap.putIfAbsent(topic, counter); if (currentCounter != null) { counter = currentCounter; } } //返回主題下一個計數值 return counter.getAndIncrement(); } public void close() {} }