1. 程式人生 > 其它 >Kafka Rebalance機制分析

Kafka Rebalance機制分析

什麼是 Rebalance

Rebalance 本質上是一種協議,規定了一個 Consumer Group 下的所有 consumer 如何達成一致,來分配訂閱 Topic 的每個分割槽。

例如:某 Group 下有 20 個 consumer 例項,它訂閱了一個具有 100 個 partition 的 Topic 。正常情況下,kafka 會為每個 Consumer 平均的分配 5 個分割槽。這個分配的過程就是 Rebalance。

觸發 Rebalance 的時機

Rebalance 的觸發條件有3個。

  • 組成員個數發生變化。例如有新的 consumer 例項加入該消費組或者離開組。
  • 訂閱的 Topic 個數發生變化。
  • 訂閱 Topic 的分割槽數發生變化。

Rebalance 發生時,Group 下所有 consumer 例項都會協調在一起共同參與,kafka 能夠保證儘量達到最公平的分配。但是 Rebalance 過程對 consumer group 會造成比較嚴重的影響。在 Rebalance 的過程中 consumer group 下的所有消費者例項都會停止工作,等待 Rebalance 過程完成。

Rebalance 過程分析

Rebalance 過程分為兩步:Join 和 Sync。

  1. Join 顧名思義就是加入組。這一步中,所有成員都向coordinator傳送JoinGroup請求,請求加入消費組。一旦所有成員都發送了JoinGroup請求,coordinator會從中選擇一個consumer擔任leader的角色,並把組成員資訊以及訂閱資訊發給leader——注意leader和coordinator不是一個概念。leader負責消費分配方案的制定。
  1. Sync,這一步leader開始分配消費方案,即哪個consumer負責消費哪些topic的哪些partition。一旦完成分配,leader會將這個方案封裝進SyncGroup請求中發給coordinator,非leader也會發SyncGroup請求,只是內容為空。coordinator接收到分配方案之後會把方案塞進SyncGroup的response中發給各個consumer。這樣組內的所有成員就都知道自己應該消費哪些分割槽了。

Rebalance 場景分析

新成員加入組

組成員“崩潰”

組成員崩潰和組成員主動離開是兩個不同的場景。因為在崩潰時成員並不會主動地告知coordinator此事,coordinator有可能需要一個完整的session.timeout週期(心跳週期)才能檢測到這種崩潰,這必然會造成consumer的滯後。可以說離開組是主動地發起rebalance;而崩潰則是被動地發起rebalance。

組成員主動離開組

提交位移

如何避免不必要的rebalance

要避免 Rebalance,還是要從 Rebalance 發生的時機入手。我們在前面說過,Rebalance 發生的時機有三個:

  • 組成員數量發生變化
  • 訂閱主題數量發生變化
  • 訂閱主題的分割槽數發生變化

後兩個我們大可以人為的避免,發生rebalance最常見的原因是消費組成員的變化。

消費者成員正常的新增和停掉導致rebalance,這種情況無法避免,但是時在某些情況下,Consumer 例項會被 Coordinator 錯誤地認為 “已停止” 從而被“踢出”Group。從而導致rebalance。

當 Consumer Group 完成 Rebalance 之後,每個 Consumer 例項都會定期地向 Coordinator 傳送心跳請求,表明它還存活著。如果某個 Consumer 例項不能及時地傳送這些心跳請求,Coordinator 就會認為該 Consumer 已經 “死” 了,從而將其從 Group 中移除,然後開啟新一輪 Rebalance。這個時間可以通過Consumer 端的引數 session.timeout.ms進行配置。預設值是 10 秒。

除了這個引數,Consumer 還提供了一個控制傳送心跳請求頻率的引數,就是 heartbeat.interval.ms。這個值設定得越小,Consumer 例項傳送心跳請求的頻率就越高。頻繁地傳送心跳請求會額外消耗頻寬資源,但好處是能夠更加快速地知曉當前是否開啟 Rebalance,因為,目前 Coordinator 通知各個 Consumer 例項開啟 Rebalance 的方法,就是將 REBALANCE_NEEDED 標誌封裝進心跳請求的響應體中。

除了以上兩個引數,Consumer 端還有一個引數,用於控制 Consumer 實際消費能力對 Rebalance 的影響,即 max.poll.interval.ms 引數。它限定了 Consumer 端應用程式兩次呼叫 poll 方法的最大時間間隔。它的預設值是 5 分鐘,表示你的 Consumer 程式如果在 5 分鐘之內無法消費完 poll 方法返回的訊息,那麼 Consumer 會主動發起 “離開組” 的請求,Coordinator 也會開啟新一輪 Rebalance。

通過上面的分析,我們可以看一下那些rebalance是可以避免的:

第一類非必要 Rebalance 是因為未能及時傳送心跳,導致 Consumer 被 “踢出”Group 而引發的。這種情況下我們可以設定 session.timeout.ms 和 heartbeat.interval.ms 的值,來儘量避免rebalance的出現。(以下的配置是在網上找到的最佳實踐,暫時還沒測試過

  • 設定 session.timeout.ms = 6s。
  • 設定 heartbeat.interval.ms = 2s。
  • 要保證 Consumer 例項在被判定為 “dead” 之前,能夠傳送至少 3 輪的心跳請求,即 session.timeout.ms >= 3 * heartbeat.interval.ms。

將 session.timeout.ms 設定成 6s 主要是為了讓 Coordinator 能夠更快地定位已經掛掉的 Consumer,早日把它們踢出 Group。

第二類非必要 Rebalance 是 Consumer 消費時間過長導致的。此時,max.poll.interval.ms 引數值的設定顯得尤為關鍵。如果要避免非預期的 Rebalance,你最好將該引數值設定得大一點,比你的下游最大處理時間稍長一點。

總之,要為業務處理邏輯留下充足的時間。這樣,Consumer 就不會因為處理這些訊息的時間太長而引發 Rebalance 。

相關概念

coordinator

Group Coordinator是一個服務,每個Broker在啟動的時候都會啟動一個該服務。Group Coordinator的作用是用來儲存Group的相關Meta資訊,並將對應Partition的Offset資訊記錄到Kafka內建Topic(__consumer_offsets)中。Kafka在0.9之前是基於Zookeeper來儲存Partition的Offset資訊(consumers/{group}/offsets/{topic}/{partition}),因為ZK並不適用於頻繁的寫操作,所以在0.9之後通過內建Topic的方式來記錄對應Partition的Offset。

每個Group都會選擇一個Coordinator來完成自己組內各Partition的Offset資訊,選擇的規則如下:

  • 1,計算Group對應在__consumer_offsets上的Partition
  • 2,根據對應的Partition尋找該Partition的leader所對應的Broker,該Broker上的Group Coordinator即就是該Group的Coordinator

Partition計算規則:

複製程式碼
  • 1
language-sql
partition-Id(__consumer_offsets) = Math.abs(groupId.hashCode() % groupMetadataTopicPartitionCount)

其中groupMetadataTopicPartitionCount對應offsets.topic.num.partitions引數值,預設值是50個分割槽

一次Rebalance所耗時間

測試環境

1個Topic,10個partition,3個consumer

在本地環境進行測試

測試結果

經過幾輪測試發現每次rebalance所消耗的時間大概在 80ms~100ms平均耗時在87ms左右。