CMMI評審(開發人員問題彙總)
Kafka 是一個高度可擴充套件的分散式訊息系統,在實時事件流和流式處理為中心的架構越來越風靡的今天,它扮演了這個架構中核心儲存的角色。從某種角度說,Kafka 可以看成實時版的 Hadoop 系統。Hadoop 可以儲存和定期處理大量的資料檔案,而 Kafka 可以儲存和持續處理大型的資料流。
Hadoop 和檔案系統提供檔案流的讀取位點( offset ),並支援通過 seek 方法將檔案流移動到特定位置;Kafka 對應的提供了主題下每個分割槽的消費位點( offset ),並允許消費者設定分割槽的讀取位置。本文首先介紹 Kafka 消費者消費訊息的方式,隨後回答 Kafka 如何管理消費位點這一元資料的問題。後面一個主題包括 Kafka 如何提交以及設定消費位點的實現,這是 Kafka 為應用系統提供可靠性保障的重要組成部分
Kafka 消費者的消費模式
Kafka 的資料由主題和分區劃分。應用程式使用 KafkaConsumer 向 Kafka 訂閱主題,並從訂閱的主題上接受訊息,訂閱主題的模板程式碼如下所示。
consumer.subscribe(Collections.singletonList("customTopic"));
可以看到,我們為每個消費者指定了它所消費的主題。
在分散式系統的語境下,當生產者通過水平擴充套件提高了整體主題寫入訊息的速度時,單個消費者很快就跟不上訊息生產的速度。直觀地,我們想要通過同樣地水平擴充套件手段,使用多個消費者來分攤訊息消費的壓力。
Kafka 利用消費組的概念來支援消費者的水平擴充套件。消費者從屬於消費組,消費組的消費者訂閱同一個主題,每個消費者接受主題的一部分分割槽的訊息。消費者通過建立時的 group.id 指定它所從屬的消費組。
消費者加入消費組或離開消費組會引起消費組所消費的主題的分割槽在組內消費者之間的再均衡( rebalance )。訊息的再均衡在流式處理的範疇裡是一個複雜的話題,本文不討論其細節,假設每個消費者都穩定地消費主題的若干個分割槽。
在 Kafka 與某些系統的整合裡,消費者消費的分割槽是由外部系統所指定和協調的,Kafka 為了支援這樣的場景提供了主動為消費者分配分割槽的介面。
consumer.assign(Collections.singletonList(new TopicPartition("customTopic", 1)));
當消費者指定了自己所要消費的主題和分割槽後,應用程式通過訊息輪詢來與 Kafka 叢集互動並請求資料進行消費。Kafka 在輪詢中進行很多操作,包括消費組協調、分割槽再均衡和獲取資料。在這裡我們主要關心獲取資料進行消費的情況,模板程式碼如下所示。
try {
while (true) {
for (ConsumerRecord<String, String> record : consumer.poll(Duration.ofMills(1000L))) {
// do something with record...
}
}
} finally {
consumer.close();
}
可以看到,應用程式通常在一個無限迴圈裡通過輪詢來消費 Kafka 裡的資料。當消費者緩衝區有資料或 poll 最長的阻塞時間到達時,將返回本次輪詢取得的訊息集合。
返回的訊息集合的元素包括訊息所屬主題的資訊、所在分割槽的資訊、所在分割槽的消費位點以及訊息資料即其鍵值對。通常,我們遍歷訊息集合來處理輪詢取得的訊息。
最後,在 finally 塊中我們呼叫了消費者的 close 方法,從而顯式地關閉消費者,並關閉網路連線。這個操作同時會觸發一次消費組的再均衡,從而避免必須等待消費組協調者在該消費者心跳超時後才發現其離開消費者並觸發再均衡。
Kafka 如何提交消費位點
消費者每次呼叫 poll 方法總是返回由生產者寫入 Kafka 但是還沒有被消費者消費的訊息,那麼 Kafka 是怎麼定位哪些訊息還沒被消費者消費的呢?
答案就是消費位點。
Kafka 通過消費位點來追蹤訊息在分割槽裡的消費進度,而不需要強制對每個訊息都進行確認。我們把更新分割槽消費位點的操作叫做提交( commit )。
Kafka 追蹤消費位點的方式充分利用了 Kafka 自身的能力,通過向 Kafka 內部名為 __consumer_offsets 的主題傳送包裝了消費位點資訊的訊息來儲存消費位點。消費者正常執行時,還會在記憶體中為每個分配的分割槽記錄一個獲取資料的資料位點。
因此,如果消費者一直正常執行,持久化在 __consumer_offsets 主題的消費位點元資料用處不大,因為消費者會自己追蹤消費位點。但是在有的消費者發生崩潰重啟或者主題分割槽發生再均衡時,重啟的分割槽需要恢復丟失的記憶體中的消費位點資訊,或者再均衡後的消費者接手新的分割槽的情形下,消費者就需要讀取分割槽最後一次提交的消費位點,以從該消費位點繼續往下消費資料。
在這種情形下,如果提交的消費位點小於應用程式消費者實際曾經處理過的最後一個訊息的消費位點,那麼這兩點之間的訊息就會被重複處理。反之,如果提交的消費位點大於應用程式消費者實際曾經處理過的最後一個訊息的消費位點,那麼這兩點之間的訊息就會被跳過,不被處理。
因此,為了提高應用程式處理訊息的可靠性,Kafka 提供了若干種提交消費位點的方式,以支援應用程式根據自身邏輯提交儘可能準確的消費位點。
自動提交
簡單的 Kafka 消費應用程式可以採用自動提交的手段讓消費者自動提交消費位點。只要在建立消費者的時候將 enable.auto.commit 配置設定為 true 值,那麼消費者就會在 poll 方法裡在拉取新的訊息之前自動提交當前的消費位點。決定自動提交週期的是 auto.commit.interval.ms 配置,預設是 5 秒,即每過 5 秒,在下一次 poll 時自動提交。
自動提交雖然方便,但是一切自動的行為,使用者都需要小心的確認其行為並瞭解它在極端情況下的表現。
一個典型的自動提交的邊界場景是分割槽再均衡場景。假設我們採用預設 5 秒的自動提交時間間隔,在本分割槽最近一次提交後 3 秒發生了故障,再均衡之後,新的消費者從本分割槽的消費位點開始讀取並處理訊息。由於消費位點是故障前 3 秒前自動提交的,在這 3 秒之間讀取的訊息及其影響的消費位點沒有被提交,因此這些資料將被重複處理。
可以通過縮短自動提交的間隔來減小重複資料的時間視窗,但是重複資料在理論上是不可避免的。此外,頻繁的提交將帶來額外的排程開銷和通訊開銷。
另一個值得注意的是,自動提交的配置下,每一次 poll 呼叫都會提交上一次 poll 移動到的消費位點,在呼叫消費者的 close 方法時也會觸發自動提交。通常來說,自動提交的消費位點總是不大於消費者實際處理的訊息。但是,如果在輪詢時拉取到一批訊息,並在處理完所有訊息之前丟擲異常,就有可能導致自動提交時按照這批訊息處理過的假設進行提交,從而導致部分訊息被跳過,不被處理的情形。
主動提交
對於定製 Kafka 消費邏輯的應用,或者說整合 Kafka 到更大的流式處理系統的場景,主動提交當前的消費位點是一個必須的功能。應用程式或者複合系統通過控制消費位點的提交時間來消除訊息丟失的可能性,並在發生再均衡時減少重複訊息的數量。
首先,這要求把前面提到的 enable.auto.commit 配置設定為 false 值。隨後,通過呼叫消費者的提交消費位點的介面來進行主動提交。Kafka 提供的介面包括以下幾種。
void commitSync();
void commitSync(Map<TopicPartition, OffsetAndMetadata> offsets);
void commitAsync();
void commitAsync(OffsetCommitCallback callback);
void commitAsync(Map<TopicPartition, OffsetAndMetadata> offsets, OffsetCommitCallback callback);
可以看到,這些介面一方面分為同步和非同步兩類,另一方面分為是否帶有 offsets 引數兩類。
顯然,同步的提交會阻塞應用程式的執行,預設情況下這個阻塞的超時時間是由 default.api.timeout.ms 配置決定的,預設為 1 分鐘。在早期的版本中,這個時間是無限的,即應用程式會永遠阻塞並無限重試直到丟擲不可恢復的異常或成功提交。
非同步的提交則會將提交動作放在非同步執行緒完成,並支援傳入可選的 OffsetCommitCallback 來定製提交動作成功後的回撥邏輯。非同步的提交的非同步性在於消費者向 Kafka 叢集傳送提交消費位點的請求後,不阻塞等待叢集的返回,而是註冊一個叢集返回時的回撥來響應成功或者失敗的事件。
由於傳送請求是同步的,而且 Kafka 底層採用 TCP 進行通訊,因此我們可以認為非同步的消費位點提交也是順序發生在 Kafka 叢集的。然而,如果在提交失敗的情形下消費者通過 Callback 嘗試重新提交,就必須注意重試可能導致提交一個更早的消費位點從而在再平衡場景下導致不必要的重複消費的情況。
到現在我們所討論的介面都是不帶 offsets 引數的,在這種情況下,Kafka 會自動獲取當前消費者記憶體所儲存的消費位點來進行提交。對於某些定製化的消費位點管理邏輯,可以通過傳入 offsets 引數來自定義需要提交的消費位點的內容。offsets 引數是一個主題及分割槽鎖定具體分割槽的鍵和將要提交的該分割槽的消費位點及元資料的值組成的對映。
叢集互動
上面介紹的提交消費位點的技術都是從消費者角度看那些介面參與實現這個功能的。對於想深入理解這一過程的同學,這裡簡要介紹一下相關的邏輯在原始碼的位置及相應的邏輯線。
在消費者一側的底層邏輯,上面自動提交和主動提交的邏輯最終都由 ConsumerCoordicator 類來執行,對應的介面定義如下。
class ConsumerCoordinator {
void maybeAutoCommitOffsetsAsync(long now);
boolean commitOffsetsSync(Map<TopicPartition, OffsetAndMetadata> offsets, Timer timer);
void commitOffsetsAsync(Map<TopicPartition, OffsetAndMetadata> offsets, final OffsetCommitCallback callback);
}
這些方法在底層都會首先查詢當前消費者對應的消費組協調者,即 Kafka 叢集中的某個伺服器,隨後向它發起 OffsetCommitRequest 請求,該請求包含了伺服器向 __consumer_offsets 主題寫入消費位點所需要的所有資訊。
消費組協調者通過 SocketServer 元件接受到請求後,反序列化請求並交給 KafkaApis 元件處理請求。KafkaApis 是 Kafka 伺服器所有業務邏輯的聚合類。它識別出 OffsetCommitRequest 後轉發到 handleOffsetCommitRequest 方法進行處理。在一系列的引數檢查之後,主流程的訊息處理最終將由 GroupCoordinator#handleCommitOffsets 處理。
Kafka 如何設定消費位點
現在,我們知道了 Kafka 提交消費位點的方式,並且知道了持久化到 Kafka 叢集的消費位點通常在消費者崩潰或者叢集發生再均衡的時候被讀取和使用。但是,同樣是在某些定製場景下,Kafka 消費者的消費位點是由外部系統維護的。
在這種情況下,Kafka 也支援從特定的消費位點開始處理訊息,對應的介面定義如下。
void seekToBeginning(Collection<TopicPartition> partitions);
void seekToEnd(Collection<TopicPartition> partitions);
void seek(TopicPartition partition, long offset);
void seek(TopicPartition partition, OffsetAndMetadata offsetAndMetadata);
其中 seekToBeginning 方法將分割槽的消費位點回撥到分割槽的起始位置開始讀取訊息,而 seekToEnd 方法將分割槽的消費位點跳到分割槽的末尾開始讀取資訊。顯然,前者將會導致大量的重複訊息處理,而後者將帶來跳過某些訊息不做處理的風險。
在本文的開頭我們提到,Hadoop 和檔案系統提供檔案流的讀取位點,並支援通過 seek 方法將檔案流移動到特定位置。Kafka 同樣支援 seek 方法來設定訊息的消費位點,從新的消費位點開始消費資料。
從實現上說,設定消費位點是一個消費者的本地操作。它直接改動了訂閱狀態下主題分割槽狀態的消費位點訊息,從而在下一次 poll 方法呼叫時從新設定的消費位點開始向 Kafka 叢集拉取訊息進行消費。
這一功能的實際應用場景包括應用程式需要保證某種程度的資料可靠性的情形。
通常,資料的消費者在接收到 Kafka 訊息之後會進行相應的處理並生成新的資料,典型場景下這一資料將被持久化到資料庫中或者進入到下一階段的流式處理系統裡。如果資料儲存在資料庫裡或進入其他系統之中,而消費位點提交到 Kafka 上,這樣多個系統之間天然的非同步性將使原子提交的操作成為不可能。
但是,如果在同一個事務裡將資料和消費位點都寫到資料庫裡,或者進入到流式處理系統裡追蹤起來,我們就可以保證這兩者是原子地被提交或者失敗。此時,消費位點儲存在 Kafka 系統之外,因此需要上面的 seek 方法來主動設定消費位點以告訴消費者從什麼位置開始讀取並消費資料。
另外兩個方法跟 auto.offset.reset 配置的 ealiest 和 latest 選項相對應,常作為主題分割槽消費位點不存在時採用的兜底設定消費位點的方案。