Kafka分割槽分配策略分析——重點:StickyAssignor
“ 為什麼Kafka在RangeAssigor、RoundRobinAssignor的基礎上,又新增了PartitionAssignor,它解決了什麼問題?”
背景
用過Kafka的同學應該都知道Kafka的分割槽和消費組的概念。在Kafka中,每個Topic會包含多個分割槽,預設情況下一個分割槽只能被一個消費組下面的一個消費者消費,這裡就產生了分割槽分配的問題。Kafka中提供了多重分割槽分配演算法(PartitionAssignor)的實現:RangeAssigor、RoundRobinAssignor、StickyAssignor。本文主要介紹StickyAssignor,順帶會介紹RangeAssigor、RoundRobinAssignor作為分割槽分配的背景知識。
RangeAssignor
PartitionAssignor介面用於使用者定義實現分割槽分配演算法,以實現Consumer之間的分割槽分配。消費組的成員訂閱它們感興趣的Topic並將這種訂閱關係傳遞給作為訂閱組協調者的Broker。協調者選擇其中的一個消費者來執行這個消費組的分割槽分配並將分配結果轉發給消費組內所有的消費者。Kafka預設採用RangeAssignor的分配演算法。
RangeAssignor對每個Topic進行獨立的分割槽分配。對於每一個Topic,首先對分割槽按照分割槽ID進行排序,然後訂閱這個Topic的消費組的消費者再進行排序,之後儘量均衡的將分割槽分配給消費者。這裡只能是儘量均衡,因為分割槽數可能無法被消費者數量整除,那麼有一些消費者就會多分配到一些分割槽。
分配示意圖如下:
大致演算法如下:
1 assign(topic, consumers) { 2 // 對分割槽和Consumer進行排序 3 List<Partition> partitions = topic.getPartitions(); 4 sort(partitions); 5 sort(consumers); 6 // 計算每個Consumer分配的分割槽數 7 int numPartitionsPerConsumer = partition.size() / consumers.size(); 8 // 額外有一些Consumer會多分配到分割槽 9 int consumersWithExtraPartition = partition.size() % consumers.size(); 10 // 計算分配結果 11 for (int i = 0, n = consumers.size(); i < n; i++) { 12 // 第i個Consumer分配到的分割槽的index 13 int start = numPartitionsPerConsumer * i + Math.min(i, consumersWithExtraPartition); 14 // 第i個Consumer分配到的分割槽數 15 int length = numPartitionsPerConsumer + (i + 1 > consumersWithExtraPartition ? 0 : 1); 16 // 分裝分配結果 17 assignment.get(consumersForTopic.get(i)).addAll(partitions.subList(start, start + length)); 18 } 19 }
RangeAssignor策略的原理是按照消費者總數和分割槽總數進行整除運算來獲得一個跨度,然後將分割槽按照跨度進行平均分配,以保證分割槽儘可能均勻地分配給所有的消費者。對於每一個Topic,RangeAssignor策略會將消費組內所有訂閱這個Topic的消費者按照名稱的字典序排序,然後為每個消費者劃分固定的分割槽範圍,如果不夠平均分配,那麼字典序靠前的消費者會被多分配一個分割槽。
這種分配方式明顯的一個問題是隨著消費者訂閱的Topic的數量的增加,不均衡的問題會越來越嚴重,比如上圖中4個分割槽3個消費者的場景,C0會多分配一個分割槽。如果此時再訂閱一個分割槽數為4的Topic,那麼C0又會比C1、C2多分配一個分割槽,這樣C0總共就比C1、C2多分配兩個分割槽了,而且隨著Topic的增加,這個情況會越來越嚴重。
分配結果:
訂閱2個Topic,每個Topic4個分割槽,共3個Consumer
C0:[T0P0,T0P1,T1P0,T1P1]
C1:[T0P2,T1P2]
C2:[T0P3,T1P3]
RoundRobinAssignor
RoundRobinAssignor的分配策略是將消費組內訂閱的所有Topic的分割槽及所有消費者進行排序後儘量均衡的分配(RangeAssignor是針對單個Topic的分割槽進行排序分配的)。如果消費組內,消費者訂閱的Topic列表是相同的(每個消費者都訂閱了相同的Topic),那麼分配結果是儘量均衡的(消費者之間分配到的分割槽數的差值不會超過1)。如果訂閱的Topic列表是不同的,那麼分配結果是不保證“儘量均衡”的,因為某些消費者不參與一些Topic的分配。
分配示意圖如下:
相對於RangeAssignor,在訂閱多個Topic的情況下,RoundRobinAssignor的方式能消費者之間儘量均衡的分配到分割槽(分配到的分割槽數的差值不會超過1——RangeAssignor的分配策略可能隨著訂閱的Topic越來越多,差值越來越大)。
對於訂閱組內消費者訂閱Topic不一致的情況:假設有三個消費者分別為C0、C1、C2,有3個Topic T0、T1、T2,分別擁有1、2、3個分割槽,並且C0訂閱T0,C1訂閱T0和T1,C2訂閱T0、T1、T0,那麼RoundRobinAssignor的分配結果如下:
看上去分配已經儘量的保證均衡了,不過可以發現C2承擔了4個分割槽的消費而C1訂閱了T1,是不是把T1P1交給C1消費能更加的均衡呢?
StickyAssignor
動機
儘管RoundRobinAssignor已經在RangeAssignor上做了一些優化來更均衡的分配分割槽,但是在一些情況下依舊會產生嚴重的分配偏差,比如消費組中訂閱的Topic列表不相同的情況下(這個情況可能更多的發生在釋出階段,但是這真的是一個問題嗎?——可以參照Kafka官方的說明:KIP-49 Fair Partition Assignment Strategy)。更核心的問題是無論是RangeAssignor,還是RoundRobinAssignor,當前的分割槽分配演算法都沒有考慮上一次的分配結果。顯然,在執行一次新的分配之前,如果能考慮到上一次分配的結果,儘量少的調整分割槽分配的變動,顯然是能節省很多開銷的。
目標
從字面意義上看,Sticky是“粘性的”,可以理解為分配結果是帶“粘性的”——每一次分配變更相對上一次分配做最少的變動(上一次的結果是有粘性的),其目標有兩點:
1. 分割槽的分配儘量的均衡
2. 每一次重分配的結果儘量與上一次分配結果保持一致
當這兩個目標發生衝突時,優先保證第一個目標。第一個目標是每個分配演算法都儘量嘗試去完成的,而第二個目標才真正體現出StickyAssignor特性的。
我們先來看預期分配的結構,後續再具體分析StickyAssignor的演算法實現。
例如:
-
有3個Consumer:C0、C1、C2
-
有4個Topic:T0、T1、T2、T3,每個Topic有2個分割槽
-
所有Consumer都訂閱了這4個分割槽
StickyAssignor的分配結果如下圖所示(增加RoundRobinAssignor分配作為對比):
上面的例子中,Sticky模式原來分配給C0、C2的分割槽都沒有發生變動,且最終C0、C1達到的均衡的目的。
再舉一個例子:
-
有3個Consumer:C0、C1、C2
-
3個Topic:T0、T1、T2,它們分別有1、2、3個分割槽
-
C0訂閱T0;C1訂閱T0、T1;C2訂閱T0、T1、T2
分配結果如下圖所示:
從以上兩個例子的分配結果可以看出,StickyAssignor是比RangeAssignor和RoundRobinAssignor更好的分配方式,不過它的實現也更加的複雜。
實現
StickyAssignor的實現程式碼是RangeAssignor和RoundRobinAssignor的十倍,複雜度則遠遠在十倍以上。目前基本沒有看到對這塊原始碼實現的分析。
StickyAssignor分配演算法的核心邏輯如下:
-
先構建出當前的分配狀態:currentAssignment
-
如果currentAssignment為空,則是全新的分配
-
構建出partition2AllPotentialConsumers和consumer2AllPotentialPartitions兩個輔助後續分配的資料結構
-
partition2AllPotentialConsumers是一個Map<TopicPartition, List<String>>,記錄著每個Partition可以分配給哪些Consumer
-
consumer2AllPotentialPartitions是一個Map<String, List<TopicPartition>>,記錄著每個Consumer可以分配的Partition列表
-
補全currentAssignment,將不屬於currentAssignment的Consumer新增進去(如果新增了一個Consumer,這個Consumer上一次是沒參與分配的,新新增進去分配的Partition列表為空)
-
構建出currentPartitionConsumer來用於輔助的分配,currentPartitionConsumer記錄了當前每個Partition分配給了哪個Consumer——就是把currentAssignment從Consumer作為Key轉換到Partition作為Key用於輔助分配
-
對所有分割槽進行排序(排序結果為sortedPartitions),排序有兩種規則:
-
如果不是初次分配,並且每個Consumer訂閱是相同的:
-
對Consumer按照它所分配的Partition數進行排序
-
按照上一步的排序結果,將每個Consumer分配的分割槽插入到List中(List就是排序後的分割槽)
-
將不屬於任何Consumer的分割槽加入List中
-
否則:分割槽之間按照可以被分配的Consumer的數量進行排序
-
構造unassignedPartitions記錄所有要被分配的分割槽(初始為上一步排序過的所有分割槽,後續進行調整:將已分配的,不需要移除了Partition從unassignedPartitions中移除)
-
進行分割槽調整,來達到分割槽分配均衡的目的;分割槽的Rebalance包含多個步驟
-
將上一步未分配的分割槽(unassignedPartitions)分配出去。分配的策略是:按照當前的分配結果,每一次分配時將分割槽分配給訂閱了對應Topic的Consumer列表中擁有的分割槽最少的那一個Consumer
-
校驗每一個分割槽是否需要調整,如果分割槽不需要調整,則從sortedPartitions中移除。分割槽是否可以被調整的規則是:如果這個分割槽是否在partition2AllPotentialConsumers中屬於兩個或超過兩個Consumer。
-
校驗每個Consumer是否需要調整被分配的分割槽,如果不能調整,則將這個Consumer從sortedCurrentSubscriptions中移除,不參與後續的重分配。判斷是否調整的規則是:如果當前Consumer分配的分割槽數少於它可以被分配的最大分割槽數,或者它的分割槽滿足上一條規則。
-
將以上步驟中獲取的可以進行重分配的分割槽,進行重新的分配。每次分配時都進行校驗,如果當前已經達到了均衡的狀態,則終止調整。均衡狀態的判斷依據是Consumer之間分配的分割槽數量的差值不超過1;或者所有Consumer已經拿到了它可以被分配的分割槽之後仍無法達到均衡的上一個條件(比如c1訂閱t1,c2訂閱t2,t1 t2分割槽數相差超過1,此時沒法重新調整)。如果不滿足上面兩個條件,且一個Consumer所分配的分割槽數少於同一個Topic的其他訂閱者分配到的所有分割槽的情況,那麼還可以繼續調整,屬於不滿足均衡的情況——比如上文中RoundRobinAssignor的最後一個例子。
-
後續流程和普通分配一致,就不分析了(Sticky模式會儲存分配結果)
StickyAssignor的分配演算法確實非常負責,筆者也是一步步Debug程式碼來分析整個過程的,希望上述分析的步驟對讀者能有一些幫助(建議對照著上面的步驟去Debug Kafka原始碼中的單元測試來梳理這塊內容)。
總結
本文主要介紹了Kafka的幾種分割槽分配策略:RangeAssignor、RoundRobinAssignor、StickyAssignor。其中重點分析了StickyAssignor的實現,StickyAssignor的模式能比RangeAssignor和RoundRobinAssignor提供更加均衡的分配結果,在發生Consumer或者Partition變更的情況下,也能減少不必要的分割槽調整。總體而言StickyAssignor是一種更好的分配演算法,只是實現上稍微有一些複雜。
寫在最後
螞蟻金服中介軟體團隊是服務於整個螞蟻金服集團的核心技術團隊,打造了世界領先的金融級分散式架構的基礎中介軟體平臺。
如果你有興趣加入螞蟻中介軟體團隊,歡迎聯絡我。
詳情諮詢微信:l_happytime
&n