1. 程式人生 > >線上Kafka突發rebalance異常,如何快速解決?

線上Kafka突發rebalance異常,如何快速解決?

# 文章首發於[【陳樹義的部落格】](https://www.cnblogs.com/chanshuyi),點選跳轉到原文[《線上Kafka突發rebalance異常,如何快速解決?》](https://www.cnblogs.com/chanshuyi/p/kafka_rebalance_quick_guide.html) Kafka 是我們最常用的訊息佇列,它那幾萬、甚至幾十萬的處理速度讓我們為之欣喜若狂。但是隨著使用場景的增加,我們遇到的問題也越來越多,其中一個經常遇到的問題就是:rebalance(重平衡)問題。 ## 什麼是消費組 要想了解 rebalance,那就得先了解消費組(consumer group)。 消費組指的是多個消費者(consumer)組成起來的一個組,它們共同消費 topic 的所有訊息,並且一個 topic 的一個 partition 只能被一個 consumer 消費。 Kafka 為消費者組定義了 5 種狀態,它們分別是:Empty、Dead、PreparingRebalance、CompletingRebalance 和 Stable。 ![](https://www-shuyi-me.oss-cn-shenzhen.aliyuncs.com/15892444057803.jpg) 瞭解了這些狀態的含義之後,我們來看一張圖片,它展示了狀態機的各個狀態流轉。 ![](https://www-shuyi-me.oss-cn-shenzhen.aliyuncs.com/15892444207503.jpg) 一個消費者組最開始是 Empty 狀態,當重平衡過程開啟後,它會被置於 PreparingRebalance 狀態等待成員加入,之後變更到 CompletingRebalance 狀態等待分配方案,最後流轉到 Stable 狀態完成重平衡。 當有新成員加入或已有成員退出時,消費者組的狀態從 Stable 直接跳到 PreparingRebalance 狀態,此時,所有現存成員就必須重新申請加入組。當所有成員都退出組後,消費者組狀態變更為 Empty。Kafka 定期自動刪除過期位移的條件就是,組要處於 Empty 狀態。因此,如果你的消費者組停掉了很長時間(超過 7 天),那麼 Kafka 很可能就把該組的位移資料刪除了。我相信,你在 Kafka 的日誌中一定經常看到下面這個輸出: ``` Removed ✘✘✘ expired offsets in ✘✘✘ milliseconds. ``` 這就是 Kafka 在嘗試定期刪除過期位移。現在你知道了,只有 Empty 狀態下的組,才會執行過期位移刪除的操作。 ## 什麼是rebalance? 我們都知道 kafka 主要可以分為三大塊:生產者、kafka broker、消費者。 ![](https://www-shuyi-me.oss-cn-shenzhen.aliyuncs.com/15891592181787.jpg) 而 kafka 怎麼均勻地分配某個 topic 下的所有 partition 到各個消費者,從而使得訊息的消費速度達到最快,這就是平衡(balance)。**而 rebalance(重平衡)其實就是重新進行 partition 的分配,從而使得 partition 的分配重新達到平衡狀態。** # 文章首發於[【陳樹義的部落格】](https://www.cnblogs.com/chanshuyi),點選跳轉到原文[《線上Kafka突發rebalance異常,如何快速解決?》](https://www.cnblogs.com/chanshuyi/p/kafka_rebalance_quick_guide.html) ## rebalance的流程 重平衡的完整流程需要消費者端和協調者元件共同參與才能完成。我們先從消費者的視角來審視一下重平衡的流程。 在消費者端,重平衡分為兩個步驟:分別是加入組和等待領導消費者(Leader Consumer)分配方案。這兩個步驟分別對應兩類特定的請求:**JoinGroup 請求和 SyncGroup 請求。** ### JoinGroup請求 當組內成員加入組時,它會向協調者傳送 JoinGroup 請求。在該請求中,每個成員都要將自己訂閱的主題上報,這樣協調者就能收集到所有成員的訂閱資訊。一旦收集了全部成員的 JoinGroup 請求後,協調者會從這些成員中選擇一個擔任這個消費者組的領導者。 通常情況下,第一個傳送 JoinGroup 請求的成員自動成為領導者。你一定要注意區分這裡的領導者和之前我們介紹的領導者副本,它們不是一個概念。這裡的領導者是具體的消費者例項,它既不是副本,也不是協調者。這裡的領導者指的是消費組(consumer group)的領導者,**消費組領導者的任務是收集所有成員的訂閱資訊,然後根據這些資訊,制定具體的分割槽消費分配方案。** 選出領導者之後,協調者會把消費者組訂閱資訊封裝進 JoinGroup 請求的響應體中,然後發給領導者,由領導者統一做出分配方案後,進入到下一步:傳送 SyncGroup 請求。 ### SyncGroup請求 在這一步中,領導者向協調者傳送 SyncGroup 請求,將剛剛做出的分配方案發給協調者。值得注意的是,其他成員也會向協調者傳送 SyncGroup 請求,只不過請求體中並沒有實際的內容。這一步的主要目的是讓協調者接收分配方案,然後統一以 SyncGroup 響應的方式分發給所有成員,這樣組內所有成員就都知道自己該消費哪些分割槽了。 接下來,我用一張圖來形象地說明一下 JoinGroup 請求的處理過程。 ![](https://www-shuyi-me.oss-cn-shenzhen.aliyuncs.com/15892437016869.jpg) 就像前面說的,JoinGroup 請求的主要作用是將組成員訂閱資訊傳送給領導者消費者,待領導者制定好分配方案後,重平衡流程進入到 SyncGroup 請求階段。 下面這張圖描述的是 SyncGroup 請求的處理流程。 ![](https://www-shuyi-me.oss-cn-shenzhen.aliyuncs.com/15892437578771.jpg) SyncGroup 請求的主要目的,就是讓協調者把領導者制定的分配方案下發給各個組內成員。當所有成員都成功接收到分配方案後,消費者組進入到 Stable 狀態,即開始正常的消費工作。 ## 什麼時候會發生rebalance? 前面我們已經說到,rebalance 其實就是對 partition 進行重新分配。那麼什麼時候會發生 rebalance 呢?其實在以下三種情況下,會觸發 rebalance: * 訂閱 Topic 的分割槽數發生變化。 * 訂閱的 Topic 個數發生變化。 * 消費組內成員個數發生變化。例如有新的 consumer 例項加入該消費組或者離開組。 ### 訂閱Topic的分割槽數發生變化 簡單地說,就是之前 topic 有 10 個分割槽,現在變成了 20 個,那麼多出來的 10 個分割槽的資料就沒人消費了。那麼此時就需要進行重平衡,將新增的 10 個分割槽分給消費組內的消費者進行消費。所以在這個情況下,會發生重平衡。 ### 訂閱的Topic個數發生變化 簡單地說,一個 consumer group 如果之前只訂閱了 A topic,那麼其組內的 consumer 知會消費 A topic 的訊息。而如果現在新增訂閱了 B topic,那麼 kafka 就需要把 B topic 的 partition 分配給組內的 consumer 進行消費。這個分配的過程,其實也是一個 rebalance 的過程。 ### 消費組內成員個數發生變化 我們都知道 kafka 中是以消費組(consumer group)的方式進行消費的,消費組內的消費者共同消費一個 topic 下的訊息。而當消費組內成員個數發生變化,例如某個 consumer 離開,或者新 consumer 加入,都會導致消費組內成員個數發生變化,從而導致重平衡。 **相比起之前的兩個情況,這種情況在實際情況中更加常見。因為訂閱分割槽數、以及訂閱 topic 數都是我們主動改變才會發生,而組內消費組成員個數發生變化,則是更加隨機的。** # 文章首發於[【陳樹義的部落格】](https://www.cnblogs.com/chanshuyi),點選跳轉到原文[《線上Kafka突發rebalance異常,如何快速解決?》](https://www.cnblogs.com/chanshuyi/p/kafka_rebalance_quick_guide.html) 下面我們一起分析一下「消費組內成員個數發生變化」的幾種情況: * 新成員加入 * 組成員主動離開 * 組成員崩潰 #### 新成員加入 新成員入組是指組處於 Stable 狀態後,有新成員加入。如果是全新啟動一個消費者組,Kafka 是有一些自己的小優化的,流程上會有些許的不同。我們這裡討論的是,組穩定了之後有新成員加入的情形。 當協調者收到新的 JoinGroup 請求後,它會通過心跳請求響應的方式通知組內現有的所有成員,強制它們開啟新一輪的重平衡。具體的過程和之前的客戶端重平衡流程是一樣的。現在,我用一張時序圖來說明協調者一端是如何處理新成員入組的。 ![](https://www-shuyi-me.oss-cn-shenzhen.aliyuncs.com/15892432564564.jpg) #### 組成員主動離開 何謂主動離組?就是指消費者例項所線上程或程序呼叫 close() 方法主動通知協調者它要退出。這個場景就涉及到了第三類請求:LeaveGroup 請求。協調者收到 LeaveGroup 請求後,依然會以心跳響應的方式通知其他成員,因此我就不再贅述了,還是直接用一張圖來說明。 ![](https://www-shuyi-me.oss-cn-shenzhen.aliyuncs.com/15892438535570.jpg) #### 組成員崩潰 崩潰離組是指消費者例項出現嚴重故障,突然宕機導致的離組。它和主動離組是有區別的,因為後者是主動發起的離組,協調者能馬上感知並處理。但崩潰離組是被動的,協調者通常需要等待一段時間才能感知到,這段時間一般是由消費者端引數 session.timeout.ms 控制的。 也就是說,Kafka 一般不會超過 session.timeout.ms 就能感知到這個崩潰。當然,後面處理崩潰離組的流程與之前是一樣的,我們來看看下面這張圖。 ![](https://www-shuyi-me.oss-cn-shenzhen.aliyuncs.com/15892439595340.jpg) #### 疑惑 在許多文章中,它們會加多了一個 rebalance 場景,即:「重平衡時協調者對組內成員提交位移的處理」。其實這個要說是 rebalance 場景,有點牽強。我們先來了解下這個場景究竟是什麼情況。 正常情況下,每個組內成員都會定期彙報位移給協調者。當重平衡開啟時,協調者會給予成員一段緩衝時間,要求每個成員必須在這段時間內快速地上報自己的位移資訊,然後再開啟正常的 JoinGroup/SyncGroup 請求傳送。還是老辦法,我們使用一張圖來說明。 ![](https://www-shuyi-me.oss-cn-shenzhen.aliyuncs.com/15892440922259.jpg) 所以這種場景是指 rebalance 發生之時,留有時間給消費者提交 offset,並不是引起 rebalance 的觸發原因(並不是因為提交 offset 引發 rebalance)。因此在我這篇文章裡,我並沒有將其作為 rebalance 的一種場景。 ## rebalance問題處理思路 前面我們講過 rebalance 一般會有 3 種情況,分別是: * 新成員加入 * 組成員主動離開 * 組成員崩潰 對於「新成員加入」、「組成員主動離開」都是我們主動觸發的,能比較好地控制。但是「組成員崩潰」則是我們預料不到的,遇到問題的時候也比較不好排查。但對於「組成員崩潰」也是有一些通用的排查思路的,下面我們就來聊聊「rebalance問題的處理思路」。 要學會處理 rebalance 問題,我們需要先搞清楚 kafaka 消費者配置的四個引數: * session.timeout.ms 設定了超時時間 * heartbeat.interval.ms 心跳時間間隔 * max.poll.interval.ms 每次消費的處理時間 * max.poll.records 每次消費的訊息數 **session.timeout.ms** 表示 consumer 向 broker 傳送心跳的超時時間。例如 session.timeout.ms = 180000 表示在最長 180 秒內 broker 沒收到 consumer 的心跳,那麼 broker 就認為該 consumer 死亡了,會啟動 rebalance。 **heartbeat.interval.ms** 表示 consumer 每次向 broker 傳送心跳的時間間隔。heartbeat.interval.ms = 60000 表示 consumer 每 60 秒向 broker 傳送一次心跳。一般來說,session.timeout.ms 的值是 heartbeat.interval.ms 值的 3 倍以上。 **max.poll.interval.ms** 表示 consumer 每兩次 poll 訊息的時間間隔。簡單地說,其實就是 consumer 每次消費訊息的時長。如果訊息處理的邏輯很重,那麼市場就要相應延長。否則如果時間到了 consumer 還麼消費完,broker 會預設認為 consumer 死了,發起 rebalance。 **max.poll.records** 表示每次消費的時候,獲取多少條訊息。獲取的訊息條數越多,需要處理的時間越長。所以每次拉取的訊息數不能太多,需要保證在 max.poll.interval.ms 設定的時間內能消費完,否則會發生 rebalance。 簡單來說,會導致崩潰的幾個點是: * 消費者心跳超時,導致 rebalance。 * 消費者處理時間過長,導致 rebalance。 ### 消費者心跳超時 我們知道消費者是通過心跳和協調者保持通訊的,如果協調者收不到心跳,那麼協調者會認為這個消費者死亡了,從而發起 rebalance。 而 kafka 的消費者引數設定中,跟心跳相關的兩個引數為: * session.timeout.ms 設定了超時時間 * heartbeat.interval.ms 心跳時間間隔 這時候需要調整 session.timeout.ms 和 heartbeat.interval.ms 引數,使得消費者與協調者能保持心跳。一般來說,超時時間應該是心跳間隔的 3 倍時間。即 session.timeout.ms 如果設定為 180 秒,那麼 heartbeat.interval.ms 最多設定為 60 秒。 為什麼要這麼設定超時時間應該是心跳間隔的 3 倍時間?因為這樣的話,在一個超時週期內就可以有多次心跳,避免網路問題導致偶發失敗。 ### 消費者處理時間過長 如果消費者處理時間過長,那麼同樣會導致協調者認為該 consumer 死亡了,從而發起重平衡。 而 kafka 的消費者引數設定中,跟消費處理的兩個引數為: * max.poll.interval.ms 每次消費的處理時間 * max.poll.records 每次消費的訊息數 對於這種情況,**一般來說就是增加消費者處理的時間(即提高 max.poll.interval.ms 的值),減少每次處理的訊息數(即減少 max.poll.records 的值)。** 除此之外,超時時間引數(session.timeout.ms)與 消費者每次處理的時間(max.poll.interval.ms)也是有關聯的。**max.poll.interval.ms 時間不能超過 session.timeout.ms 時間。** 因為在 kafka 消費者的實現中,其是單執行緒去消費訊息和執行心跳的,如果執行緒卡在處理訊息,那麼這時候即使到時間要心跳了,還是沒有執行緒可以去執行心跳操作。很多同學在處理問題的時候,明明設定了很長的 session.timeout.ms 時間,但最終還是心跳超時了,就是因為沒有處理好這兩個引數的關聯。 對於 rebalance 類問題,簡單總結就是:**處理好心跳超時問題和消費處理超時問題**。 * 對於心跳超時問題。一般是調高心跳超時時間(session.timeout.ms),調整超時時間(session.timeout.ms)和心跳間隔時間(heartbeat.interval.ms)的比例。阿里雲官方文件建議超時時間(session.timeout.ms)設定成 25s,最長不超過 30s。那麼心跳間隔時間(heartbeat.interval.ms)就不超過 10s。 * 對於消費處理超時問題。一般是增加消費者處理的時間(max.poll.interval.ms),減少每次處理的訊息數(max.poll.records)。阿里雲官方文件建議 max.poll.records 引數要遠小於當前消費組的消費能力(records < 單個執行緒每秒消費的條數 x 消費執行緒的個數 x session.timeout的秒數)。 # 文章首發於[【陳樹義的部落格】](https://www.cnblogs.com/chanshuyi),點選跳轉到原文[《線上Kafka突發rebalance異常,如何快速解決?》](https://www.cnblogs.com/chanshuyi/p/kafka_rebalance_quick_guide.html) ## 參考資料 * [重平衡場景,寫得更好,更詳細!推薦!!Kafka | 消費者組重平衡全流程解析_大資料_sinat_27143551的部落格-CSDN部落格](https://blog.csdn.net/sinat_27143551/article/details/103033628) * [Kafka 重平衡機制 - 後端進階 - SegmentFault 思否](https://segmentfault.com/a/1190000020873848) * [為什麼消費客戶端頻繁出現Rebalance?_客戶端消費問題_常見問題_訊息佇列Kafka版-阿里雲](https://help.aliyun.com/knowledge_detail/1544