1. 程式人生 > >kafka中的消費組

kafka中的消費組

ssi 都是 我只 mage scala語言 讀取 如何 n) 檢查

一直以來都想寫一點關於kafka consumer的東西,特別是關於新版consumer的中文資料很少。最近Kafka社區郵件組已經在討論是否應該正式使用新版本consumer替換老版本,筆者也覺得時機成熟了,於是寫下這篇文章討論並總結一下新版本consumer的些許設計理念,希望能把consumer這點事說清楚,從而對廣大使用者有所幫助。

在開始之前,我想花一點時間先來明確一些概念和術語,這會極大地方便我們下面的討論。另外請原諒這文章有點長,畢竟要討論的東西很多,雖然已然刪除了很多太過細節的東西。

一、 誤區澄清與概念明確

1 Kafka的版本

很多人在Kafka中國社區(替群主做個宣傳,QQ號:162272557)提問時的開頭經常是這樣的:“我使用的kafka版本是2.10/2.11, 現在碰到一個奇怪的問題。。。。” 無意冒犯,但這裏的2.10/2.11不是kafka的版本,而是編譯kafka的Scala版本。Kafka的server端代碼是由Scala語言編寫的,目前Scala主流的3個版本分別是2.10、2.11和2.12。實際上Kafka現在每個PULL request都已經自動增加了這三個版本的檢查。下圖是我的一個PULL request,可以看到這個fix會同時使用3個scala版本做編譯檢查:

技術分享圖片

目前廣泛使用kafka的版本應該是這三個大版本:0.8.x, 0.9.x和0.10.* 。 這三個版本對於consumer和consumer group來說都有很大的變化,我們後面會詳談。

2 新版本 VS 老版本

“我的kafkaoffsetmonitor為什麽無法監控到offset了?”——這是我在Kafka中國社區見到最多的問題,沒有之一!實際上,Kafka 0.9開始提供了新版本的consumer及consumer group,位移的管理與保存機制發生了很大的變化——新版本consumer默認將不再保存位移到zookeeper中,而目前kafkaoffsetmonitor還沒有應對這種變化(雖然已經有很多人在要求他們改了,詳見https://github.com/quantifind/KafkaOffsetMonitor/issues/79),所以很有可能是因為你使用了新版本的consumer才無法看到的。關於新舊版本,這裏統一說明一下:kafka0.9以前的consumer是使用Scala編寫的,包名結構是kafka.consumer.*,分為high-level consumer和low-level consumer兩種。我們熟知的ConsumerConnector、ZookeeperConsumerConnector以及SimpleConsumer就是這個版本提供的;自0.9版本開始,Kafka提供了java版本的consumer,包名結構是o.a.k.clients.consumer.*,熟知的類包括KafkaConsumer和ConsumerRecord等。新版本的consumer可以單獨部署,不再需要依賴server端的代碼。

二、消費者組 (Consumer Group)

1 什麽是消費者組

其實對於這些基本概念的普及,網上資料實在太多了。我本不應該再畫蛇添足了,但為了本文的完整性,我還是要花一些篇幅來重談consumer group,至少可以說說我的理解。值得一提的是,由於我們今天基本上只探討consumer group,對於單獨的消費者不做過多討論。

什麽是consumer group? 一言以蔽之,consumer group是kafka提供的可擴展且具有容錯性的消費者機制。既然是一個組,那麽組內必然可以有多個消費者或消費者實例(consumer instance),它們共享一個公共的ID,即group ID。組內的所有消費者協調在一起來消費訂閱主題(subscribed topics)的所有分區(partition)。當然,每個分區只能由同一個消費組內的一個consumer來消費。(網上文章中說到此處各種炫目多彩的圖就會緊跟著拋出來,我這裏就不畫了,請原諒)。個人認為,理解consumer group記住下面這三個特性就好了:

  • consumer group下可以有一個或多個consumer instance,consumer instance可以是一個進程,也可以是一個線程
  • group.id是一個字符串,唯一標識一個consumer group
  • consumer group下訂閱的topic下的每個分區只能分配給某個group下的一個consumer(當然該分區還可以被分配給其他group)

2 消費者位置(consumer position)

消費者在消費的過程中需要記錄自己消費了多少數據,即消費位置信息。在Kafka中這個位置信息有個專門的術語:位移(offset)。很多消息引擎都把這部分信息保存在服務器端(broker端)。這樣做的好處當然是實現簡單,但會有三個主要的問題:1. broker從此變成有狀態的,會影響伸縮性;2. 需要引入應答機制(acknowledgement)來確認消費成功。3. 由於要保存很多consumer的offset信息,必然引入復雜的數據結構,造成資源浪費。而Kafka選擇了不同的方式:每個consumer group保存自己的位移信息,那麽只需要簡單的一個整數表示位置就夠了;同時可以引入checkpoint機制定期持久化,簡化了應答機制的實現。

3 位移管理(offset management)

3.1 自動VS手動

Kafka默認是定期幫你自動提交位移的(enable.auto.commit = true),你當然可以選擇手動提交位移實現自己控制。另外kafka會定期把group消費情況保存起來,做成一個offset map,如下圖所示:

技術分享圖片

上圖中表明了test-group這個組當前的消費情況。

3.2 位移提交

老版本的位移是提交到zookeeper中的,圖就不畫了,總之目錄結構是:/consumers/<group.id>/offsets/<topic>/<partitionId>,但是zookeeper其實並不適合進行大批量的讀寫操作,尤其是寫操作。因此kafka提供了另一種解決方案:增加__consumeroffsets topic,將offset信息寫入這個topic,擺脫對zookeeper的依賴(指保存offset這件事情)。__consumer_offsets中的消息保存了每個consumer group某一時刻提交的offset信息。依然以上圖中的consumer group為例,格式大概如下:

技術分享圖片

__consumers_offsets topic配置了compact策略,使得它總是能夠保存最新的位移信息,既控制了該topic總體的日誌容量,也能實現保存最新offset的目的。compact的具體原理請參見:Log Compaction

至於每個group保存到__consumers_offsets的哪個分區,如何查看的問題請參見這篇文章:Kafka 如何讀取offset topic內容 (__consumer_offsets)

4 Rebalance

4.1 什麽是rebalance?

rebalance本質上是一種協議,規定了一個consumer group下的所有consumer如何達成一致來分配訂閱topic的每個分區。比如某個group下有20個consumer,它訂閱了一個具有100個分區的topic。正常情況下,Kafka平均會為每個consumer分配5個分區。這個分配的過程就叫rebalance。

4.2 什麽時候rebalance?

這也是經常被提及的一個問題。rebalance的觸發條件有三種:

  • 組成員發生變更(新consumer加入組、已有consumer主動離開組或已有consumer崩潰了——這兩者的區別後面會談到)
  • 訂閱主題數發生變更——這當然是可能的,如果你使用了正則表達式的方式進行訂閱,那麽新建匹配正則表達式的topic就會觸發rebalance
  • 訂閱主題的分區數發生變更

4.3 如何進行組內分區分配?

之前提到了group下的所有consumer都會協調在一起共同參與分配,這是如何完成的?Kafka新版本consumer默認提供了兩種分配策略:range和round-robin。當然Kafka采用了可插拔式的分配策略,你可以創建自己的分配器以實現不同的分配策略。實際上,由於目前range和round-robin兩種分配器都有一些弊端,Kafka社區已經提出第三種分配器來實現更加公平的分配策略,只是目前還在開發中。我們這裏只需要知道consumer group默認已經幫我們把訂閱topic的分區分配工作做好了就行了。

簡單舉個例子,假設目前某個consumer group下有兩個consumer: A和B,當第三個成員加入時,kafka會觸發rebalance並根據默認的分配策略重新為A、B和C分配分區,如下圖所示:

技術分享圖片

4.4 誰來執行rebalance和consumer group管理?

Kafka提供了一個角色:coordinator來執行對於consumer group的管理。坦率說kafka對於coordinator的設計與修改是一個很長的故事。最新版本的coordinator也與最初的設計有了很大的不同。這裏我只想提及兩次比較大的改變。

首先是0.8版本的coordinator,那時候的coordinator是依賴zookeeper來實現對於consumer group的管理的。Coordinator監聽zookeeper的/consumers/<group>/ids的子節點變化以及/brokers/topics/<topic>數據變化來判斷是否需要進行rebalance。group下的每個consumer都自己決定要消費哪些分區,並把自己的決定搶先在zookeeper中的/consumers/<group>/owners/<topic>/<partition>下註冊。很明顯,這種方案要依賴於zookeeper的幫助,而且每個consumer是單獨做決定的,沒有那種“大家屬於一個組,要協商做事情”的精神。

基於這些潛在的弊端,0.9版本的kafka改進了coordinator的設計,提出了group coordinator——每個consumer group都會被分配一個這樣的coordinator用於組管理和位移管理。這個group coordinator比原來承擔了更多的責任,比如組成員管理、位移提交保護機制等。當新版本consumer group的第一個consumer啟動的時候,它會去和kafka server確定誰是它們組的coordinator。之後該group內的所有成員都會和該coordinator進行協調通信。顯而易見,這種coordinator設計不再需要zookeeper了,性能上可以得到很大的提升。後面的所有部分我們都將討論最新版本的coordinator設計。

4.5 如何確定coordinator?

上面簡單討論了新版coordinator的設計,那麽consumer group如何確定自己的coordinator是誰呢? 簡單來說分為兩步:

  • 確定consumer group位移信息寫入__consumers_offsets的哪個分區。具體計算公式:
    •   __consumers_offsets partition# = Math.abs(groupId.hashCode() % groupMetadataTopicPartitionCount) 註意:groupMetadataTopicPartitionCount由offsets.topic.num.partitions指定,默認是50個分區。
  • 該分區leader所在的broker就是被選定的coordinator

4.6 Rebalance Generation

JVM GC的分代收集就是這個詞(嚴格來說是generational),我這裏把它翻譯成“屆”好了,它表示了rebalance之後的一屆成員,主要是用於保護consumer group,隔離無效offset提交的。比如上一屆的consumer成員是無法提交位移到新一屆的consumer group中。我們有時候可以看到ILLEGAL_GENERATION的錯誤,就是kafka在抱怨這件事情。每次group進行rebalance之後,generation號都會加1,表示group進入到了一個新的版本,如下圖所示: Generation 1時group有3個成員,隨後成員2退出組,coordinator觸發rebalance,consumer group進入Generation 2,之後成員4加入,再次觸發rebalance,group進入Generation 3.

技術分享圖片

4.7 協議(protocol)

前面說過了, rebalance本質上是一組協議。group與coordinator共同使用它來完成group的rebalance。目前kafka提供了5個協議來處理與consumer group coordination相關的問題:

  • Heartbeat請求:consumer需要定期給coordinator發送心跳來表明自己還活著
  • LeaveGroup請求:主動告訴coordinator我要離開consumer group
  • SyncGroup請求:group leader把分配方案告訴組內所有成員
  • JoinGroup請求:成員請求加入組
  • DescribeGroup請求:顯示組的所有信息,包括成員信息,協議名稱,分配方案,訂閱信息等。通常該請求是給管理員使用

Coordinator在rebalance的時候主要用到了前面4種請求。
4.8 liveness

consumer如何向coordinator證明自己還活著? 通過定時向coordinator發送Heartbeat請求。如果超過了設定的超時時間,那麽coordinator就認為這個consumer已經掛了。一旦coordinator認為某個consumer掛了,那麽它就會開啟新一輪rebalance,並且在當前其他consumer的心跳response中添加“REBALANCE_IN_PROGRESS”,告訴其他consumer:不好意思各位,你們重新申請加入組吧!

4.9 Rebalance過程

終於說到consumer group執行rebalance的具體流程了。很多用戶估計對consumer內部的工作機制也很感興趣。下面就跟大家一起討論一下。當然我必須要明確表示,rebalance的前提是coordinator已經確定了。

總體而言,rebalance分為2步:Join和Sync

1 Join, 顧名思義就是加入組。這一步中,所有成員都向coordinator發送JoinGroup請求,請求入組。一旦所有成員都發送了JoinGroup請求,coordinator會從中選擇一個consumer擔任leader的角色,並把組成員信息以及訂閱信息發給leader——註意leader和coordinator不是一個概念。leader負責消費分配方案的制定。

2 Sync,這一步leader開始分配消費方案,即哪個consumer負責消費哪些topic的哪些partition。一旦完成分配,leader會將這個方案封裝進SyncGroup請求中發給coordinator,非leader也會發SyncGroup請求,只是內容為空。coordinator接收到分配方案之後會把方案塞進SyncGroup的response中發給各個consumer。這樣組內的所有成員就都知道自己應該消費哪些分區了。

還是拿幾張圖來說明吧,首先是加入組的過程:

技術分享圖片

值得註意的是, 在coordinator收集到所有成員請求前,它會把已收到請求放入一個叫purgatory(煉獄)的地方。記得國內有篇文章以此來證明kafka開發人員都是很有文藝範的,寫得也是比較有趣,有興趣可以去搜搜。
然後是分發分配方案的過程,即SyncGroup請求:

技術分享圖片

註意!! consumer group的分區分配方案是在客戶端執行的!Kafka將這個權利下放給客戶端主要是因為這樣做可以有更好的靈活性。比如這種機制下我可以實現類似於Hadoop那樣的機架感知(rack-aware)分配方案,即為consumer挑選同一個機架下的分區數據,減少網絡傳輸的開銷。Kafka默認為你提供了兩種分配策略:range和round-robin。由於這不是本文的重點,這裏就不再詳細展開了,你只需要記住你可以覆蓋consumer的參數:partition.assignment.strategy來實現自己分配策略就好了。

4.10 consumer group狀態機

和很多kafka組件一樣,group也做了個狀態機來表明組狀態的流轉。coordinator根據這個狀態機會對consumer group做不同的處理,如下圖所示(完全是根據代碼註釋手動畫的,多見諒吧)

技術分享圖片

簡單說明下圖中的各個狀態:

  • Dead:組內已經沒有任何成員的最終狀態,組的元數據也已經被coordinator移除了。這種狀態響應各種請求都是一個response: UNKNOWN_MEMBER_ID
  • Empty:組內無成員,但是位移信息還沒有過期。這種狀態只能響應JoinGroup請求
  • PreparingRebalance:組準備開啟新的rebalance,等待成員加入
  • AwaitingSync:正在等待leader consumer將分配方案傳給各個成員
  • Stable:rebalance完成!可以開始消費了~

至於各個狀態之間的流程條件以及action,這裏就不具體展開了。

三、rebalance場景剖析

上面詳細闡述了consumer group是如何執行rebalance的,可能依然有些雲裏霧裏。這部分對其中的三個重要的場景做詳盡的時序展開,進一步加深對於consumer group內部原理的理解。由於圖比較直觀,所有的描述都將以圖的方式給出,不做過多的文字化描述了。

1 新成員加入組(member join)

技術分享圖片

2 組成員崩潰(member failure)

前面說過了,組成員崩潰和組成員主動離開是兩個不同的場景。因為在崩潰時成員並不會主動地告知coordinator此事,coordinator有可能需要一個完整的session.timeout周期才能檢測到這種崩潰,這必然會造成consumer的滯後。可以說離開組是主動地發起rebalance;而崩潰則是被動地發起rebalance。okay,直接上圖:

技術分享圖片

3 組成員主動離組(member leave group)

技術分享圖片

4 提交位移(member commit offset)

技術分享圖片

轉載自:http://www.cnblogs.com/huxi2b/p/6223228.html

kafka中的消費組