如何保證kafka消費的順序性
在Kafka中Partition(分割槽)是真正儲存訊息的地方,傳送的訊息都存放在這裡。Partition(分割槽)又存在於Topic(主題)中,並且一個Topic(主題)可以指定多個Partition(分割槽)。
在Kafka中,只保證Partition(分割槽)內有序,不保證Topic所有分割槽都是有序的。
所以 Kafka 要保證訊息的消費順序,可以有2種方法:
一、1個Topic(主題)只建立1個Partition(分割槽),這樣生產者的所有資料都發送到了一個Partition(分割槽),保證了訊息的消費順序。
二、生產者在傳送訊息的時候指定要傳送到哪個Partition(分割槽)。
怎麼指定呢?我們需要將 producer 傳送的資料封裝成一個 ProducerRecord 物件。(1)指明 partition 的情況下,直接將指明的值直接作為 partiton 值;
(2)沒有指明 partition 值但有 key 的情況下,將 key 的 hash 值與 topic 的 partition數進行取餘得到 partition 值;
在Producer往Kafka插入資料時,控制同一Key分發到同一Partition,並且設定引數max.in.flight.requests.per.connection=1,也即同一個連結只能傳送一條訊息,如此便可嚴格保證Kafka訊息的順序
(3)既沒有 partition 值又沒有 key 值的情況下,第一次呼叫時隨機生成一個整數(後
面每次呼叫在這個整數上自增),將這個值與 topic 可用的 partition 總數取餘得到 partition
值,也就是常說的 round-robin 演算法。
那麼問題來了:在1個topic中,有3個partition,那麼如何保證資料的消費?
1、如順序消費中的 "第①點" 和 "Kafka 要保證訊息的消費順序第二個方法" 說明,生產者在寫的時候,可以指定一個 key,比如說我們指定了某個訂單 id 作為 key,那麼這個訂單相關的資料,一定會被分發到同一個 partition 中去,而且這個 partition 中的資料一定是有順序的。
2、消費者從 partition 中取出來資料的時候,也一定是有順序的。到這裡,順序還是 ok 的,沒有錯亂。
3、但是消費者裡可能會有多個執行緒來併發來處理訊息。因為如果消費者是單執行緒消費資料,那麼這個吞吐量太低了。而多個執行緒併發的話,順序可能就亂掉了。
解決方案:
寫N個queue,將具有相同key的資料都儲存在同一個queue,然後對於N個執行緒,每個執行緒分別消費一個queue即可。
注:在單執行緒中,一個 topic,一個 partition,一個 consumer,內部單執行緒消費,大資料培訓這樣的狀態資料消費是有序的。但由於單執行緒吞吐量太低,在資料龐大的實際場景很少採用。
但是以上消費執行緒模型,存在一個問題:
在消費過程中,如果 Kafka 消費組發生重平衡,此時的分割槽被分配給其它消費組了,如果拉取回來的訊息沒有被消費,雖然 Kakfa 可以實現 ConsumerRebalanceListener 介面,在新一輪重平衡前主動提交消費偏移量,但這貌似解決不了未消費的訊息被打亂順序的可能性?
因此在消費前,還需要主動進行判斷此分割槽是否被分配給其它消費者處理,並且還需要鎖定該分割槽在消費當中不能被分配到其它消費者中(但 kafka 目前做不到這一點)。
參考 RocketMQ 的做法:
在消費前主動呼叫 ProcessQueue#isDropped 方法判斷佇列是否已過期,並且對該佇列進行加鎖處理(向 broker 端請求該佇列加鎖)。
RocketMQ
RocketMQ 不像 Kafka 那麼“原生”,RocketMQ 早已為你準備好了你的需求,它本身的消費模型就是單 consumer 例項 + 多 worker 執行緒模型,有興趣的小夥伴可以從以下方法觀摩 RocketMQ 的消費邏輯:
org.apache.rocketmq.client.impl.consumer.PullMessageService#run
RocketMQ 會為每個佇列分配一個 PullRequest,並將其放入 pullRequestQueue,PullMessageService 執行緒會不斷輪詢從 pullRequestQueue 中取出 PullRequest 去拉取訊息,接著將拉取到的訊息給到 ConsumeMessageService 處理,ConsumeMessageService 有兩個子介面:
// 併發訊息消費邏輯實現類
org.apache.rocketmq.client.impl.consumer.ConsumeMessageConcurrentlyService;
// 順序訊息消費邏輯實現類
org.apache.rocketmq.client.impl.consumer.ConsumeMessageOrderlyService;
其中,ConsumeMessageConcurrentlyService 內部有一個執行緒池,用於併發消費,同樣地,如果需要順序消費,那麼 RocketMQ 提供了 ConsumeMessageOrderlyService 類進行順序訊息消費處理。
經過對 Kafka 消費執行緒模型的思考之後,從 ConsumeMessageOrderlyService 原始碼中能夠看出 RocketMQ 能夠實現區域性消費順序,我認為主要有以下兩點:
1)RocketMQ 會為每個訊息佇列建一個物件鎖,這樣只要執行緒池中有該訊息佇列在處理,則需等待處理完才能進行下一次消費,保證在當前 Consumer 內,同一佇列的訊息進行序列消費。
2)向 Broker 端請求鎖定當前順序消費的佇列,防止在消費過程中被分配給其它消費者處理從而打亂消費順序
總結
1)多分割槽的情況下:
如果想要保證 Kafka 在消費時要保證消費的順序性,可以使用每個執行緒維護一個 KafkaConsumer 例項的消費執行緒模型,並且是一條一條地去拉取訊息並進行消費(防止重平衡時有可能打亂消費順序)。(備註:每個 KafkaConsumer 會負責固定的分割槽,因此無法提升單個分割槽的消費能力,如果一個主題分割槽數量很多,只能通過增加 KafkaConsumer 例項提高消費能力,這樣一來執行緒數量過多,導致專案 Socket 連線開銷巨大,專案中一般不用該執行緒模型去消費。)
對於能容忍訊息短暫亂序的業務(話說回來, Kafka 叢集也不能保證嚴格的訊息順序),可以使用單 KafkaConsumer 例項 + 多 worker 執行緒 + 一條執行緒對應一個阻塞佇列消費執行緒模型(以上兩圖就是對此消費執行緒模型的解釋)。
1)單分割槽的情況下:
由於單分割槽不存在重平衡問題,以上所提到的執行緒模型都可以保證消費的順序性。
另外如果是 RocketMQ,使用 MessageListenerOrderly 監聽消費可保證訊息消費順序。
很多人也有這個疑問:既然 Kafka 和 RocketMQ 都不能保證嚴格的順序訊息,那麼順序消費還有意義嗎?
一般來說普通的的順序訊息能夠滿足大部分業務場景,如果業務能夠容忍叢集異常狀態下訊息短暫不一致的情況,則不需要嚴格的順序訊息。