1. 程式人生 > >kafka消費者連線topic分割槽失敗造成訊息大量堆積

kafka消費者連線topic分割槽失敗造成訊息大量堆積

晚上7點收到topic堆積告警,經檢查,發現消費者到topic分割槽斷連,分割槽覆蓋率下降為0,由於業務TPS高,所以幾分鐘內即形成上千萬條訊息堆積,業務成功率下降明顯,第一時間懷疑晚上高峰期業務量大,頻寬消耗大,網路不穩定造成的,所以第一時間增加消費方的超時時間(socket.timeout.ms)並重啟,消費者隨即連線成功,重新開始消費,堆積逐漸減小,業務逐漸恢復。

但一週不到再次出現同樣的問題,分割槽覆蓋降到0,可是上次增加消費超時時間之後並未還原,所以再次出現之後覺得很奇怪。

檢視消費方日誌,發現幾分鐘內都沒有列印新日誌,於是,將日誌級別調整為debug,發現是獲取之前的消費位置offset失敗:Could not fetch offsets (because offset cache is being loaded).結合consumer啟動流程來看:consumer初始化->連線負載均衡,給連線分配要繫結的分割槽->連線訪問kafka叢集的Controller獲取上次消費的位置(當前就是這裡有問題,一直獲取不到)->從上次消費的位置開始向後消費

報錯內容:because offset cache is being loaded,是說kafka正在載入offset資訊,kafka維護offset資料,是自己建立了一個內部的_consumer_offset的Topic,該topic儲存消費者的消費進度,當前業務消費者的offset是優先提交到kafka的,同時也提交到zookeeper,所以會優先從kafka讀取,只有當kafka沒有的時候才會嘗試去zookeeper讀取,但是當前kafka並非沒有,只是正在載入,可見_consumer_offset佇列長期處於載入中,正是問題所在。

問題分析到這裡,暫且不展開分析_consumer_offset被重新載入的原因,現網問題的處理原則在任何時候都是恢復業務放在首位,所以我們採取修改消費者配置,提交方式修改為優先提交到zookeeper,同時提交到kafka,這樣消費者也會優先從zookeeper讀取offset,修改後重啟,業務逐漸恢復,至此消費者的啟動流程已經修改為:consumer初始化->連線負載均衡,給連線分配要繫結的分割槽->連線直接訪問zookeeper叢集獲取上次消費的位置->從上次消費的位置開始向後消費

。(修改消費提交方式為在消費者的配置檔案中新增offsets.storage=zookeeper)

業務恢復後,再來看_consumer_offset為何被重新載入,進一步排查消費端和kafka端的日誌檔案,發現連續出現2次zookeeper session超時(目前kafka以zookeeper來做叢集管理與節點發現),預設超時時間為zookeeper.session.timout.ms=6000ms,錯誤日誌如下:

kafka的controller.log:

ZK expired; shut down all controller components and try to re-elect (kafka.controller.KafkaController$SessionExpirationListener)

Closing socket connection to XXXX(業務主機)

re-registering broker info in ZK for broker 1

Registered broker 1 at path /brokers/ids/1 with address XXXX:9092(kafka主機)

done re-registering broker

Subscribing to /brokers/topics path to watch for new topics

.

.

New leader is 2

斷連兩次,kafka重新加入叢集,可以認為kafka重啟了兩次,每次都需要重新載入topic _consuer_offset:

第一次斷連,offset資料最長載入時間為200000+ms:

FInishing loading offsets from [_consuer_offsets,14] in 200000 milliseconds;

第二次斷連,offset資料最長載入時間達到了6000000+ms:

FInishing loading offsets from [_consuer_offsets,14] in 6000000 milliseconds;

分析佇列載入耗時原因:_consuer_offsets分割槽資料巨大,84億條,且分佈不均,分割槽49有34億條資料,這與topicID和groupid命名有關,不展開討論,造成topic分割槽資料巨大的原因是為儘可能保證訊息不丟失,該topic的清除策略配置為壓縮不刪除cleanup.policy=compact.

綜上,改進建議如下:

1、修改__consumer_offset的cleanup.policy=delete,保留時間為15天,減少topic儲存的資料量,減少kafka載入壓力;

2、增加zookeeper.session.timout.ms超時時間,配置為30s,減少由於網路抖動或者磁碟cpu、網絡卡IO影響導致kafka與zookeeper斷連

在問題已修復,已定位之後,進一步回頭來看最根本原因,kafka與zk之間為何斷連,檢視主機zabbbix歷史資料來看,在斷連發生時,cpu的iowait是異常高的,由此可以看出來是由於kafka主機瞬時磁碟IO導致的kafka與zk的心跳阻塞,所以除了以上兩點改進之外,還需要進一步分析kafka主機的效能瓶頸,進行效能優化或者叢集橫向擴容。