Kafka生成訊息時的3種分割槽策略
摘要:KafkaProducer在傳送訊息的時候,需要指定傳送到哪個分割槽, 那麼這個分割槽策略都有哪些呢?
本文分享自華為雲社群《Kafka生產者3中分割槽分配策略》,作者:石臻臻的雜貨鋪。
KafkaProducer在傳送訊息的時候,需要指定傳送到哪個分割槽, 那麼這個分割槽策略都有哪些呢?我們今天來看一下
使用分割槽策略的配置:
1. DefaultPartitioner 預設分割槽策略
全路徑類名:org.apache.kafka.clients.producer.internals.DefaultPartitioner
- 如果訊息中指定了分割槽,則使用它
- 如果未指定分割槽但存在key,則根據序列化key使用murmur2雜湊演算法對分割槽數取模。
- 如果不存在分割槽或key,則會使用粘性分割槽策略,關於粘性分割槽請參閱 KIP-480。
粘性分割槽Sticky Partitioner
為什麼會有粘性分割槽的概念?
首先,我們指定,Producer在傳送訊息的時候,會將訊息放到一個ProducerBatch中, 這個Batch可能包含多條訊息,然後再將Batch打包傳送。關於這一塊可以看看我之前的文章 圖解Kafka Producer 訊息快取模型。
這樣做的好處就是能夠提高吞吐量,減少發起請求的次數。
但是有一個問題就是, 因為訊息的傳送它必須要你的一個Batch滿了或者linger.ms時間到了,才會傳送。如果生產的訊息比較少的話,遲遲難以讓Batch塞滿,那麼就意味著更高的延遲。
在之前的訊息傳送中,就將訊息輪詢到各個分割槽的, 本來訊息就少,你還給所有分割槽遍歷的分配,那麼每個ProducerBatch都很難滿足條件。
那麼假如我先讓一個ProducerBatch塞滿了之後,再給其他的分割槽分配是不是可以降低這個延遲呢?
詳細的可以看看下面這張圖
這張圖的前提是:
Topic1 有3分割槽, 此時給Topic1 發9條無key的訊息, 這9條訊息加起來都不超過batch.size .
那麼以前的分配方式和粘性分割槽的分配方式如下
可以看到,使用粘性分割槽之後,至少是先把一個Batch填滿了傳送然後再去填充另一個Batch。不至於向之前那樣,雖然平均分配了,但是導致一個Batch都沒有放滿,不能立即傳送。這不就增大了延遲了嗎(只能通過linger.ms時間到了才傳送)
劃重點:
- 當一個Batch傳送之後,需要選擇一個新的粘性分割槽的時候
①. 可用分割槽<1 ;那麼選擇分割槽的邏輯是在所有分割槽中隨機選擇。
②. 可用分割槽=1; 那麼直接選擇這個分割槽。
③. 可用分割槽>1 ; 那麼在所有可用分割槽中隨機選擇。 - 當選擇下一個粘性分割槽的時候,不是按照分割槽平均的原則來分配。而是隨機原則(當然不能跟上一次的分割槽相同)
例如剛剛傳送到的Batch是 1號分割槽,等Batch滿了,傳送之後,新的訊息可能會發到2或者3, 如果選擇的是2,等2的Batch滿了之後,下一次選擇的Batch仍舊可能是1,而不是說為了平均,選擇3分割槽。
2.UniformStickyPartitioner 純粹的粘性分割槽策略
全路徑類名:org.apache.kafka.clients.producer.internals.UniformStickyPartitioner
他跟DefaultPartitioner 分割槽策略的唯一區別就是。
DefaultPartitionerd 如果有key的話,那麼它是按照key來決定分割槽的,這個時候並不會使用粘性分割槽
UniformStickyPartitioner 是不管你有沒有key, 統一都用粘性分割槽來分配。
3. RoundRobinPartitioner 分割槽策略
全路徑類名:org.apache.kafka.clients.producer.internals.RoundRobinPartitioner
- 如果訊息中指定了分割槽,則使用它
- 將訊息平均的分配到每個分割槽中。
- 與key無關
@Override public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) { List<PartitionInfo> partitions = cluster.partitionsForTopic(topic); int numPartitions = partitions.size(); int nextValue = nextValue(topic); List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic); if (!availablePartitions.isEmpty()) { int part = Utils.toPositive(nextValue) % availablePartitions.size(); return availablePartitions.get(part).partition(); } else { // no partitions are available, give a non-available partition return Utils.toPositive(nextValue) % numPartitions; } }
上面是具體程式碼。有個地方需要注意;
- 當可用分割槽是0的話,那麼就是遍歷的是所有分割槽中的。
- 當有可用分割槽的話,那麼遍歷的是所有可用分割槽的。