1. 程式人生 > 其它 >Kafka生成訊息時的3種分割槽策略

Kafka生成訊息時的3種分割槽策略

摘要:KafkaProducer在傳送訊息的時候,需要指定傳送到哪個分割槽, 那麼這個分割槽策略都有哪些呢?

本文分享自華為雲社群《Kafka生產者3中分割槽分配策略》,作者:石臻臻的雜貨鋪。

KafkaProducer在傳送訊息的時候,需要指定傳送到哪個分割槽, 那麼這個分割槽策略都有哪些呢?我們今天來看一下

使用分割槽策略的配置:

1. DefaultPartitioner 預設分割槽策略

全路徑類名:org.apache.kafka.clients.producer.internals.DefaultPartitioner

  • 如果訊息中指定了分割槽,則使用它
  • 如果未指定分割槽但存在key,則根據序列化key使用murmur2雜湊演算法對分割槽數取模。
  • 如果不存在分割槽或key,則會使用粘性分割槽策略,關於粘性分割槽請參閱 KIP-480。

粘性分割槽Sticky Partitioner

為什麼會有粘性分割槽的概念?

首先,我們指定,Producer在傳送訊息的時候,會將訊息放到一個ProducerBatch中, 這個Batch可能包含多條訊息,然後再將Batch打包傳送。關於這一塊可以看看我之前的文章 圖解Kafka Producer 訊息快取模型。

這樣做的好處就是能夠提高吞吐量,減少發起請求的次數。

但是有一個問題就是, 因為訊息的傳送它必須要你的一個Batch滿了或者linger.ms時間到了,才會傳送。如果生產的訊息比較少的話,遲遲難以讓Batch塞滿,那麼就意味著更高的延遲。

在之前的訊息傳送中,就將訊息輪詢到各個分割槽的, 本來訊息就少,你還給所有分割槽遍歷的分配,那麼每個ProducerBatch都很難滿足條件。

那麼假如我先讓一個ProducerBatch塞滿了之後,再給其他的分割槽分配是不是可以降低這個延遲呢?

詳細的可以看看下面這張圖

這張圖的前提是:

Topic1 有3分割槽, 此時給Topic1 發9條無key的訊息, 這9條訊息加起來都不超過batch.size .

那麼以前的分配方式和粘性分割槽的分配方式如下

可以看到,使用粘性分割槽之後,至少是先把一個Batch填滿了傳送然後再去填充另一個Batch。不至於向之前那樣,雖然平均分配了,但是導致一個Batch都沒有放滿,不能立即傳送。這不就增大了延遲了嗎(只能通過linger.ms時間到了才傳送)

劃重點:

  1. 當一個Batch傳送之後,需要選擇一個新的粘性分割槽的時候
    ①. 可用分割槽<1 ;那麼選擇分割槽的邏輯是在所有分割槽中隨機選擇。
    ②. 可用分割槽=1; 那麼直接選擇這個分割槽。
    ③. 可用分割槽>1 ; 那麼在所有可用分割槽中隨機選擇。
  2. 當選擇下一個粘性分割槽的時候,不是按照分割槽平均的原則來分配。而是隨機原則(當然不能跟上一次的分割槽相同)

例如剛剛傳送到的Batch是 1號分割槽,等Batch滿了,傳送之後,新的訊息可能會發到2或者3, 如果選擇的是2,等2的Batch滿了之後,下一次選擇的Batch仍舊可能是1,而不是說為了平均,選擇3分割槽。

2.UniformStickyPartitioner 純粹的粘性分割槽策略

全路徑類名:org.apache.kafka.clients.producer.internals.UniformStickyPartitioner

他跟DefaultPartitioner 分割槽策略的唯一區別就是。

DefaultPartitionerd 如果有key的話,那麼它是按照key來決定分割槽的,這個時候並不會使用粘性分割槽
UniformStickyPartitioner 是不管你有沒有key, 統一都用粘性分割槽來分配。

3. RoundRobinPartitioner 分割槽策略

全路徑類名:org.apache.kafka.clients.producer.internals.RoundRobinPartitioner

  • 如果訊息中指定了分割槽,則使用它
  • 將訊息平均的分配到每個分割槽中。
  • 與key無關
@Override
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
        int numPartitions = partitions.size();
        int nextValue = nextValue(topic);
        List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
        if (!availablePartitions.isEmpty()) {
            int part = Utils.toPositive(nextValue) % availablePartitions.size();
            return availablePartitions.get(part).partition();
        } else {
            // no partitions are available, give a non-available partition
            return Utils.toPositive(nextValue) % numPartitions;
        }
    }

上面是具體程式碼。有個地方需要注意;

  1. 當可用分割槽是0的話,那麼就是遍歷的是所有分割槽中的。
  2. 當有可用分割槽的話,那麼遍歷的是所有可用分割槽的。

 

點選關注,第一時間瞭解華為雲新鮮技術~