1. 程式人生 > 其它 >深入解析Kafka的offset管理

深入解析Kafka的offset管理

Kafka中的每個partition都由一系列有序的、不可變的訊息組成,這些訊息被連續的追加到partition中。partition中的每個訊息都有一個連續的序號,用於partition唯一標識一條訊息。

Offset記錄著下一條將要傳送給Consumer的訊息的序號。

Offset從語義上來看擁有兩種:Current Offset 和 Committed Offset。

Current Offset

Current Offset儲存在Consumer客戶端中,它表示Consumer希望收到的下一條訊息的序號。它僅僅在poll()方法中使用。例如,Consumer第一次呼叫poll()方法後收到了20條訊息,那麼Current Offset就被設定為20。這樣Consumer下一次呼叫poll()方法時,Kafka就知道應該從序號為21的訊息開始讀取。這樣就能夠保證每次Consumer poll訊息時,都能夠收到不重複的訊息。

Committed Offset

Committed Offset儲存在Broker上,它表示Consumer已經確認消費過的訊息的序號。主要通過commitSynccommitAsync
API來操作。舉個例子,Consumer通過poll() 方法收到20條訊息後,此時Current Offset就是20,經過一系列的邏輯處理後,並沒有呼叫consumer.commitAsync()consumer.commitSync()來提交Committed Offset,那麼此時Committed Offset依舊是0。

Committed Offset主要用於Consumer Rebalance。

大資料培訓在Consumer Rebalance的過程中,一個partition被分配給了一個Consumer,那麼這個Consumer該從什麼位置開始消費訊息呢?答案就是Committed Offset。另外,如果一個Consumer消費了5條訊息(poll並且成功commitSync)之後宕機了,重新啟動之後它仍然能夠從第6條訊息開始消費,因為Committed Offset已經被Kafka記錄為5。

總結一下,Current Offset是針對Consumer的poll過程的,它可以保證每次poll都返回不重複的訊息;而Committed Offset是用於Consumer Rebalance過程的,它能夠保證新的Consumer能夠從正確的位置開始消費一個partition,從而避免重複消費。

在Kafka 0.9前,Committed Offset資訊儲存在zookeeper的[consumers/{group}/offsets/{topic}/{partition}]目錄中(zookeeper其實並不適合進行大批量的讀寫操作,尤其是寫操作)。而在0.9之後,所有的offset資訊都儲存在了Broker上的一個名為__consumer_offsets的topic中。

Kafka叢集中offset的管理都是由Group Coordinator中的Offset Manager完成的。

Group Coordinator

Group Coordinator是執行在Kafka叢集中每一個Broker內的一個程序。它主要負責Consumer Group的管理,Offset位移管理以及Consumer Rebalance。

對於每一個Consumer Group,Group Coordinator都會儲存以下資訊:

  • 訂閱的topics列表

  • Consumer Group配置資訊,包括session timeout等

  • 組中每個Consumer的元資料。包括主機名,consumer id

  • 每個Group正在消費的topic partition的當前offsets

  • Partition的ownership元資料,包括consumer消費的partitions對映關係

Consumer Group如何確定自己的coordinator是誰呢?簡單來說分為兩步:

1. 確定Consumer Group offset資訊將要寫入__consumers_offsets topic的哪個分割槽。具體計算公式:

__consumers_offsets partition#= Math.abs(groupId.hashCode()% offsets.topic.num.partitions)//offsets.topic.num.partitions預設值為50。

2. 該分割槽leader所在的broker就是被選定的coordinator

Offset儲存模型

由於一個partition只能固定的交給一個消費者組中的一個消費者消費,因此Kafka儲存offset時並不直接為每個消費者儲存,而是以groupid-topic-partition -> offset的方式儲存。如圖所示:

group-offset.png

Kafka在儲存Offset的時候,實際上是將Consumer Group和partition對應的offset以訊息的方式儲存在__consumers_offsets這個topic中

__consumers_offsets預設擁有50個partition,可以通過

Math.abs(groupId.hashCode() % offsets.topic.num.partitions)

的方式來查詢某個Consumer Group的offset資訊儲存在__consumers_offsets的哪個partition中。下圖展示了__consumers_offsets中儲存的offset訊息的格式:

__consumers_offsets.png

__consumers_offsets_data.png

如圖所示,一條offset訊息的格式為groupid-topic-partition -> offset。因此consumer poll訊息時,已知groupid和topic,北京大資料培訓又通過Coordinator分配partition的方式獲得了對應的partition,自然能夠通過Coordinator查詢__consumers_offsets的方式獲得最新的offset了。

Offset查詢

前面我們已經描述過offset的儲存模型,它是按照groupid-topic-partition -> offset的方式儲存的。然而Kafka只提供了根據offset讀取訊息的模型,並不支援根據key讀取訊息的方式。那麼Kafka是如何支援Offset的查詢呢?

答案就是Offsets Cache!!

Offsets Cache.JPG

如圖所示,Consumer提交offset時,Kafka Offset Manager會首先追加一條條新的commit訊息到__consumers_offsets topic中,然後更新對應的快取。讀取offset時從快取中讀取,而不是直接讀取__consumers_offsets這個topic。

Log Compaction

我們已經知道,Kafka使用groupid-topic-partition -> offset*的訊息格式,將Offset資訊儲存在__consumers_offsets topic中。請看下面一個例子:

__consumers_offsets.JPG

如圖,對於audit-consumer這個Consumer Group來說,上面的儲存了兩條具有相同key的記錄:PageViewEvent-0 -> 240PageViewEvent-0 -> 323。事實上,這就是一種無用的冗餘。因為對於一個partition來說,我們實際上只需要它當前最新的Offsets。因此這條舊的PageViewEvent-0 -> 240記錄事實上是無用的。

為了消除這樣的過期資料,Kafka為__consumers_offsets topic設定了Log Compaction功能。Log Compaction意味著對於有相同key的的不同value值,只保留最後一個版本。如果應用只關心key對應的最新value值,可以開啟Kafka的Log Compaction功能,Kafka會定期將相同key的訊息進行合併,只保留最新的value值。

這張圖片生動的闡述了Log Compaction的過程:

Log Compaction.JPG

下圖闡釋了__consumers_offsets topic中的資料在Log Compaction下的變化:

Log Compaction for __consumers_offsets.JPG

在新建topic時新增log.cleanup.policy=compact引數就可以為topic開啟Log Compaction功能。

auto.offset.reset引數

auto.offset.reset表示如果Kafka中沒有儲存對應的offset資訊的話(有可能offset資訊被刪除),消費者從何處開始消費訊息。它擁有三個可選值:

  • earliest:從最早的offset開始消費

  • latest:從最後的offset開始消費

  • none:直接丟擲exception給consumer

看一下下面兩個場景:

1、Consumer消費了5條訊息後宕機了,重啟之後它讀取到對應的partition的Committed Offset為5,因此會直接從第6條訊息開始讀取。此時完全依賴於Committed Offset機制,和auto.offset.reset配置完全無關。

2、新建了一個新的Group,並添加了一個Consumer,它訂閱了一個已經存在的Topic。此時Kafka中還沒有這個Consumer相應的Offset資訊,因此此時Kafka就會根據auto.offset.reset配置來決定這個Consumer從何處開始消費訊息。