1. 程式人生 > 實用技巧 >kafka學習筆記(三)kafka的使用技巧

kafka學習筆記(三)kafka的使用技巧

概述

上一篇隨筆主要介紹了kafka的基本使用包括叢集引數,生產者基本使用,consumer基本使用,現在來介紹一下kafka的使用技巧。

分割槽機制

我們在使用 Apache Kafka 生產和消費訊息的時候,肯定是希望能夠將資料均勻地分配到所有伺服器上。比如很多公司使用 Kafka 收集應用伺服器的日誌資料,這種資料都是很多的,特別是對於那種大批量機器組成的叢集環境,每分鐘產生的日誌量都能以 GB 數,因此如何將這麼大的資料量均勻地分配到 Kafka 的各個 Broker 上,就成為一個非常重要的問題。Kafka 有主題(Topic)的概念,它是承載真實資料的邏輯容器,而在主題之下還分為若干個分割槽,也就是說 Kafka 的訊息組織方式實際上是三級結構:主題 - 分割槽 - 訊息。主題下的每條訊息只會儲存在某一個分割槽中,而不會在多個分割槽中被儲存多份。官網上的這張圖非常清晰地展示了 Kafka 的三級結構,如下所示:

現在我丟擲一個問題你可以先思考一下:你覺得為什麼 Kafka 要做這樣的設計?為什麼使用分割槽的概念而不是直接使用多個主題呢?其實分割槽的作用就是提供負載均衡的能力,或者說對資料進行分割槽的主要原因,就是為了實現系統的高伸縮性(Scalability)。不同的分割槽能夠被放置到不同節點的機器上,而資料的讀寫操作也都是針對分割槽這個粒度而進行的,這樣每個節點的機器都能獨立地執行各自分割槽的讀寫請求處理。並且,我們還可以通過新增新的節點機器來增加整體系統的吞吐量。

分割槽策略

下面我們說說 Kafka 生產者的分割槽策略。所謂分割槽策略是決定生產者將訊息傳送到哪個分割槽的演算法。Kafka 為我們提供了預設的分割槽策略,同時它也支援你自定義分割槽策略。

如果要自定義分割槽策略,你需要顯式地配置生產者端的引數partitioner.class。這個引數該怎麼設定呢?方法很簡單,在編寫生產者程式時,你可以編寫一個具體的類實現org.apache.kafka.clients.producer.Partitioner介面。這個介面也很簡單,只定義了兩個方法:partition()和close(),通常你只需要實現最重要的 partition 方法。我們來看看這個方法的方法簽名:

1 int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster);

這裡的topic、key、keyBytes、value和valueBytes都屬於訊息資料,cluster則是叢集資訊(比如當前 Kafka 叢集共有多少主題、多少 Broker 等)。Kafka 給你這麼多資訊,就是希望讓你能夠充分地利用這些資訊對訊息進行分割槽,計算出它要被髮送到哪個分割槽中。只要你自己的實現類定義好了 partition 方法,同時設定partitioner.class引數為你自己實現類的 Full Qualified Name,那麼生產者程式就會按照你的程式碼邏輯對訊息進行分割槽。雖說可以有無數種分割槽的可能,但比較常見的分割槽策略也就那麼幾種,下面我來詳細介紹一下。

輪訓策略:

也稱 Round-robin 策略,即順序分配。比如一個主題下有 3 個分割槽,那麼第一條訊息被髮送到分割槽 0,第二條被髮送到分割槽 1,第三條被髮送到分割槽 2,以此類推。當生產第 4 條訊息時又會重新開始,即將其分配到分割槽 0,輪詢策略有非常優秀的負載均衡表現,它總是能保證訊息最大限度地被平均分配到所有分割槽上,故預設情況下它是最合理的分割槽策略,也是我們最常用的分割槽策略之一。

隨機策略:

也稱 Randomness 策略。所謂隨機就是我們隨意地將訊息放置到任意一個分割槽上。本質上看隨機策略也是力求將資料均勻地打散到各個分割槽,但從實際表現來看,它要遜於輪詢策略,所以如果追求資料的均勻分佈,還是使用輪詢策略比較好。事實上,隨機策略是老版本生產者使用的分割槽策略,在新版本中已經改為輪詢了。

按訊息建儲存策略:

Kafka 允許為每條訊息定義訊息鍵,簡稱為 Key。這個 Key 的作用非常大,它可以是一個有著明確業務含義的字串,比如客戶程式碼、部門編號或是業務 ID 等;也可以用來表徵訊息元資料。特別是在 Kafka 不支援時間戳的年代,在一些場景中,工程師們都是直接將訊息建立時間封裝進 Key 裡面的。一旦訊息被定義了 Key,那麼你就可以保證同一個 Key 的所有訊息都進入到相同的分割槽裡面,由於每個分割槽下的訊息處理都是有順序的,故這個策略被稱為按訊息鍵保序策略。

其他分割槽策略:

上面這幾種分割槽策略都是比較基礎的策略,除此之外你還能想到哪些有實際用途的分割槽策略?其實還有一種比較常見的,即所謂的基於地理位置的分割槽策略。當然這種策略一般只針對那些大規模的 Kafka 叢集,特別是跨城市、跨國家甚至是跨大洲的叢集。

壓縮演算法

說起壓縮(compression),我相信你一定不會感到陌生。它秉承了用時間去換空間的經典 trade-off 思想,具體來說就是用 CPU 時間去換磁碟空間或網路 I/O 傳輸量,希望以較小的 CPU 開銷帶來更少的磁碟佔用或更少的網路 I/O 傳輸。在 Kafka 中,壓縮也是用來做這件事的。今天我就來跟你分享一下 Kafka 中壓縮的那些事兒。

怎麼壓縮

Kafka 是如何壓縮訊息的呢?要弄清楚這個問題,就要從 Kafka 的訊息格式說起了。目前 Kafka 共有兩大類訊息格式,社群分別稱之為 V1 版本和 V2 版本。V2 版本是 Kafka 0.11.0.0 中正式引入的。不論是哪個版本,Kafka 的訊息層次都分為兩層:訊息集合(message set)以及訊息(message)。一個訊息集合中包含若干條日誌項(record item),而日誌項才是真正封裝訊息的地方。Kafka 底層的訊息日誌由一系列訊息集合日誌項組成。Kafka 通常不會直接操作具體的一條條訊息,它總是在訊息集合這個層面上進行寫入操作。

那麼社群引入 V2 版本的目的是什麼呢?V2 版本主要是針對 V1 版本的一些弊端做了修正,和我們今天討論的主題相關的修正有哪些呢?先介紹一個,就是把訊息的公共部分抽取出來放到外層訊息集合裡面,這樣就不用每條訊息都儲存這些資訊了。

我來舉個例子。原來在 V1 版本中,每條訊息都需要執行 CRC 校驗,但有些情況下訊息的 CRC 值是會發生變化的。比如在 Broker 端可能會對訊息時間戳欄位進行更新,那麼重新計算之後的 CRC 值也會相應更新;再比如 Broker 端在執行訊息格式轉換時(主要是為了相容老版本客戶端程式),也會帶來 CRC 值的變化。鑑於這些情況,再對每條訊息都執行 CRC 校驗就有點沒必要了,不僅浪費空間還耽誤 CPU 時間,因此在 V2 版本中,訊息的 CRC 校驗工作就被移到了訊息集合這一層。

V2 版本還有一個和壓縮息息相關的改進,就是儲存壓縮訊息的方法發生了變化。之前 V1 版本中儲存壓縮訊息的方法是把多條訊息進行壓縮然後儲存到外層訊息的訊息體欄位中;而 V2 版本的做法是對整個訊息集合進行壓縮。顯然後者應該比前者有更好的壓縮效果。

何時壓縮

在 Kafka 中,壓縮可能發生在兩個地方:生產者端和 Broker 端。

生產者程式中配置 compression.type 引數即表示啟用指定型別的壓縮演算法。比如下面這段程式程式碼展示瞭如何構建一個開啟 GZIP 的 Producer 物件:

1  Properties props = new Properties();
2  props.put("bootstrap.servers", "localhost:9092");
3  props.put("acks", "all");
4  props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
5  props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
6  // 開啟GZIP壓縮
7  props.put("compression.type", "gzip");
8  
9  Producer<String, String> producer = new KafkaProducer<>(props);

在生產者端啟用壓縮是很自然的想法,那為什麼我說在 Broker 端也可能進行壓縮呢?其實大部分情況下 Broker 從 Producer 端接收到訊息後僅僅是原封不動地儲存而不會對其進行任何修改,但這裡的“大部分情況”也是要滿足一定條件的。有兩種例外情況就可能讓 Broker 重新壓縮訊息。

情況一:Broker 端指定了和 Producer 端不同的壓縮演算法。

你看,這種情況下 Broker 接收到 GZIP 壓縮訊息後,只能解壓縮然後使用 Snappy 重新壓縮一遍。如果你翻開 Kafka 官網,你會發現 Broker 端也有一個引數叫 compression.type,和上面那個例子中的同名。但是這個引數的預設值是 producer,這表示 Broker 端會“尊重”Producer 端使用的壓縮演算法。可一旦你在 Broker 端設定了不同的 compression.type 值,就一定要小心了,因為可能會發生預料之外的壓縮 / 解壓縮操作,通常表現為 Broker 端 CPU 使用率飆升。

情況二:Broker 端發生了訊息格式轉換。

所謂的訊息格式轉換主要是為了相容老版本的消費者程式。還記得之前說過的 V1、V2 版本吧?在一個生產環境中,Kafka 叢集中同時儲存多種版本的訊息格式非常常見。為了相容老版本的格式,Broker 端會對新版本訊息執行向老版本格式的轉換。這個過程中會涉及訊息的解壓縮和重新壓縮。一般情況下這種訊息格式轉換對效能是有很大影響的,除了這裡的壓縮之外,它還讓 Kafka 喪失了引以為豪的 Zero Copy 特性。

何時解壓縮

有壓縮必有解壓縮!通常來說解壓縮發生在消費者程式中,也就是說 Producer 傳送壓縮訊息到 Broker 後,Broker 照單全收並原樣儲存起來。當 Consumer 程式請求這部分訊息時,Broker 依然原樣傳送出去,當訊息到達 Consumer 端後,由 Consumer 自行解壓縮還原成之前的訊息。

那麼現在問題來了,Consumer 怎麼知道這些訊息是用何種壓縮演算法壓縮的呢?其實答案就在訊息中。Kafka 會將啟用了哪種壓縮演算法封裝進訊息集合中,這樣當 Consumer 讀取到訊息集合時,它自然就知道了這些訊息使用的是哪種壓縮演算法。如果用一句話總結一下壓縮和解壓縮,那麼我希望你記住這句話:Producer 端壓縮、Broker 端保持、Consumer 端解壓縮。除了在 Consumer 端解壓縮,Broker 端也會進行解壓縮。注意了,這和前面提到訊息格式轉換時發生的解壓縮是不同的場景。每個壓縮過的訊息集合在 Broker 端寫入時都要發生解壓縮操作,目的就是為了對訊息執行各種驗證。我們必須承認這種解壓縮對 Broker 端效能是有一定影響的,特別是對 CPU 的使用率而言。

無訊息丟失配置

一句話概括,Kafka 只對“已提交”的訊息(committed message)做有限度的持久化保證。

第一個核心要素是“已提交的訊息”。什麼是已提交的訊息?當 Kafka 的若干個 Broker 成功地接收到一條訊息並寫入到日誌檔案後,它們會告訴生產者程式這條訊息已成功提交。此時,這條訊息在 Kafka 看來就正式變為“已提交”訊息了。那為什麼是若干個 Broker 呢?這取決於你對“已提交”的定義。你可以選擇只要有一個 Broker 成功儲存該訊息就算是已提交,也可以是令所有 Broker 都成功儲存該訊息才算是已提交。不論哪種情況,Kafka 只對已提交的訊息做持久化保證這件事情是不變的。

第二個核心要素就是“有限度的持久化保證”,也就是說 Kafka 不可能保證在任何情況下都做到不丟失訊息。舉個極端點的例子,如果地球都不存在了,Kafka 還能儲存任何訊息嗎?顯然不能!倘若這種情況下你依然還想要 Kafka 不丟訊息,那麼只能在別的星球部署 Kafka Broker 伺服器了。現在你應該能夠稍微體會出這裡的“有限度”的含義了吧,其實就是說 Kafka 不丟訊息是有前提條件的。假如你的訊息儲存在 N 個 Kafka Broker 上,那麼這個前提條件就是這 N 個 Broker 中至少有 1 個存活。只要這個條件成立,Kafka 就能保證你的這條訊息永遠不會丟失。總結一下,Kafka 是能做到不丟失訊息的,只不過這些訊息必須是已提交的訊息,而且還要滿足一定的條件。當然,說明這件事並不是要為 Kafka 推卸責任,而是為了在出現該類問題時我們能夠明確責任邊界。

生產端丟失訊息

目前 Kafka Producer 是非同步傳送訊息的,也就是說如果你呼叫的是 producer.send(msg) 這個 API,那麼它通常會立即返回,但此時你不能認為訊息傳送已成功完成。這種傳送方式有個有趣的名字,叫“fire and forget”,翻譯一下就是“發射後不管”。這個術語原本屬於導彈制導領域,後來被借鑑到計算機領域中,它的意思是,執行完一個操作後不去管它的結果是否成功。呼叫 producer.send(msg) 就屬於典型的“fire and forget”,因此如果出現訊息丟失,我們是無法知曉的。這個傳送方式挺不靠譜吧,不過有些公司真的就是在使用這個 API 傳送訊息。

實際上,解決此問題的方法非常簡單:Producer 永遠要使用帶有回撥通知的傳送 API,也就是說不要使用 producer.send(msg),而要使用 producer.send(msg, callback)。不要小瞧這裡的 callback(回撥),它能準確地告訴你訊息是否真的提交成功了。一旦出現訊息提交失敗的情況,你就可以有針對性地進行處理。

消費者丟失訊息

Consumer 端丟失資料主要體現在 Consumer 端要消費的訊息不見了。Consumer 程式有個“位移”的概念,表示的是這個 Consumer 當前消費到的 Topic 分割槽的位置。下面這張圖來自於官網,它清晰地展示了 Consumer 端的位移資料。

比如對於 Consumer A 而言,它當前的位移值就是 9;Consumer B 的位移值是 11。Kafka 中 Consumer 端的訊息丟失就是這麼一回事。要對抗這種訊息丟失,辦法很簡單:維持先消費訊息(閱讀),再更新位移(書籤)的順序即可。這樣就能最大限度地保證訊息不丟失。

如果Consumer 程式自動地向前更新位移。假如其中某個執行緒執行失敗了,它負責的訊息沒有被成功處理,但位移已經被更新了,因此這條訊息對於 Consumer 而言實際上是丟失了。這裡的關鍵在於 Consumer 自動提交位移,與你沒有確認書籍內容被全部讀完就將書歸還類似,你沒有真正地確認訊息是否真的被消費就“盲目”地更新了位移。這個問題的解決方案也很簡單:如果是多執行緒非同步處理消費訊息,Consumer 程式不要開啟自動提交位移,而是要應用程式手動提交位移。在這裡我要提醒你一下,單個 Consumer 程式使用多執行緒來消費訊息說起來容易,寫成程式碼卻異常困難,因為你很難正確地處理位移的更新,也就是說避免無消費訊息丟失很簡單,但極易出現訊息被消費了多次的情況。

最佳實踐

看完這兩個案例之後,我來分享一下 Kafka 無訊息丟失的配置,每一個其實都能對應上面提到的問題。

不要使用 producer.send(msg),而要使用 producer.send(msg, callback)。記住,一定要使用帶有回撥通知的 send 方法。

設定 acks = all。acks 是 Producer 的一個引數,代表了你對“已提交”訊息的定義。如果設定成 all,則表明所有副本 Broker 都要接收到訊息,該訊息才算是“已提交”。這是最高等級的“已提交”定義。

設定 retries 為一個較大的值。這裡的 retries 同樣是 Producer 的引數,對應前面提到的 Producer 自動重試。當出現網路的瞬時抖動時,訊息傳送可能會失敗,此時配置了 retries > 0 的 Producer 能夠自動重試訊息傳送,避免訊息丟失。

設定 unclean.leader.election.enable = false。這是 Broker 端的引數,它控制的是哪些 Broker 有資格競選分割槽的 Leader。如果一個 Broker 落後原先的 Leader 太多,那麼它一旦成為新的 Leader,必然會造成訊息的丟失。故一般都要將該引數設定成 false,即不允許這種情況的發生。

設定 replication.factor >= 3。這也是 Broker 端的引數。其實這裡想表述的是,最好將訊息多儲存幾份,畢竟目前防止訊息丟失的主要機制就是冗餘。

設定 min.insync.replicas > 1。這依然是 Broker 端引數,控制的是訊息至少要被寫入到多少個副本才算是“已提交”。設定成大於 1 可以提升訊息永續性。在實際環境中千萬不要使用預設值 1。

確保 replication.factor > min.insync.replicas。如果兩者相等,那麼只要有一個副本掛機,整個分割槽就無法正常工作了。我們不僅要改善訊息的永續性,防止資料丟失,還要在不降低可用性的基礎上完成。推薦設定成 replication.factor = min.insync.replicas + 1。

確保訊息消費完成再提交。Consumer 端有個引數 enable.auto.commit,最好把它設定成 false,並採用手動提交位移的方式。就像前面說的,這對於單 Consumer 多執行緒處理的場景而言是至關重要的。

攔截器

Kafka 攔截器分為生產者攔截器和消費者攔截器。生產者攔截器允許你在傳送訊息前以及訊息提交成功後植入你的攔截器邏輯;而消費者攔截器支援在消費訊息前以及提交位移後編寫特定邏輯。值得一提的是,這兩種攔截器都支援鏈的方式,即你可以將一組攔截器串連成一個大的攔截器,Kafka 會按照新增順序依次執行攔截器邏輯。舉個例子,假設你想在生產訊息前執行兩個“前置動作”:第一個是為訊息增加一個頭資訊,封裝傳送該訊息的時間,第二個是更新發送訊息數字段,那麼當你將這兩個攔截器串聯在一起統一指定給 Producer 後,Producer 會按順序執行上面的動作,然後再發送訊息。

當前 Kafka 攔截器的設定方法是通過引數配置完成的。生產者和消費者兩端有一個相同的引數,名字叫 interceptor.classes,它指定的是一組類的列表,每個類就是特定邏輯的攔截器實現類。拿上面的例子來說,假設第一個攔截器的完整類路徑是 com.yourcompany.kafkaproject.interceptors.AddTimeStampInterceptor,第二個類是 com.yourcompany.kafkaproject.interceptors.UpdateCounterInterceptor,那麼你需要按照以下方法在 Producer 端指定攔截器:

1 Properties props = new Properties();
2 List<String> interceptors = new ArrayList<>();
3 interceptors.add("com.yourcompany.kafkaproject.interceptors.AddTimestampInterceptor"); // 攔截器1
4 interceptors.add("com.yourcompany.kafkaproject.interceptors.UpdateCounterInterceptor"); // 攔截器2
5 props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, interceptors);
6 ……

現在問題來了,我們應該怎麼編寫 AddTimeStampInterceptor 和 UpdateCounterInterceptor 類呢?其實很簡單,這兩個類以及你自己編寫的所有 Producer 端攔截器實現類都要繼承 org.apache.kafka.clients.producer.ProducerInterceptor 介面。該介面是 Kafka 提供的,裡面有兩個核心的方法。

onSend:該方法會在訊息傳送之前被呼叫。如果你想在傳送之前對訊息“美美容”,這個方法是你唯一的機會。

onAcknowledgement:該方法會在訊息成功提交或傳送失敗之後被呼叫。還記得我在上一期中提到的傳送回撥通知 callback 嗎?onAcknowledgement 的呼叫要早於 callback 的呼叫。值得注意的是,這個方法和 onSend 不是在同一個執行緒中被呼叫的,因此如果你在這兩個方法中呼叫了某個共享可變物件,一定要保證執行緒安全哦。還有一點很重要,這個方法處在 Producer 傳送的主路徑中,所以最好別放一些太重的邏輯進去,否則你會發現你的 Producer TPS 直線下降。

同理,指定消費者攔截器也是同樣的方法,只是具體的實現類要實現 org.apache.kafka.clients.consumer.ConsumerInterceptor 介面,這裡面也有兩個核心方法。

onConsume:該方法在訊息返回給 Consumer 程式之前呼叫。也就是說在開始正式處理訊息之前,攔截器會先攔一道,搞一些事情,之後再返回給你。

onCommit:Consumer 在提交位移之後呼叫該方法。通常你可以在該方法中做一些記賬類的動作,比如打日誌等。

一定要注意的是,指定攔截器類時要指定它們的全限定名,即 full qualified name。通俗點說就是要把完整包名也加上,不要只有一個類名在那裡,並且還要保證你的 Producer 程式能夠正確載入你的攔截器類。

典型使用場景

Kafka 攔截器都能用在哪些地方呢?其實,跟很多攔截器的用法相同,Kafka 攔截器可以應用於包括客戶端監控、端到端系統效能檢測、訊息審計等多種功能在內的場景。

今天 Kafka 預設提供的監控指標都是針對單個客戶端或 Broker 的,你很難從具體的訊息維度去追蹤叢集間訊息的流轉路徑。同時,如何監控一條訊息從生產到最後消費的端到端延時也是很多 Kafka 使用者迫切需要解決的問題。從技術上來說,我們可以在客戶端程式中增加這樣的統計邏輯,但是對於那些將 Kafka 作為企業級基礎架構的公司來說,在應用程式碼中編寫統一的監控邏輯其實是很難的,畢竟這東西非常靈活,不太可能提前確定好所有的計算邏輯。另外,將監控邏輯與主業務邏輯耦合也是軟體工程中不提倡的做法。現在,通過實現攔截器的邏輯以及可插拔的機制,我們能夠快速地觀測、驗證以及監控叢集間的客戶端效能指標,特別是能夠從具體的訊息層面上去收集這些資料。這就是 Kafka 攔截器的一個非常典型的使用場景。我們再來看看訊息審計(message audit)的場景。設想你的公司把 Kafka 作為一個私有云訊息引擎平臺向全公司提供服務,這必然要涉及多租戶以及訊息審計的功能。作為私有云的 PaaS 提供方,你肯定要能夠隨時檢視每條訊息是哪個業務方在什麼時間釋出的,之後又被哪些業務方在什麼時刻消費。一個可行的做法就是你編寫一個攔截器類,實現相應的訊息審計邏輯,然後強行規定所有接入你的 Kafka 服務的客戶端程式必須設定該攔截器。

kafka如何管理TCP連線

Apache Kafka 的所有通訊都是基於 TCP 的,而不是基於 HTTP 或其他協議。無論是生產者、消費者,還是 Broker 之間的通訊都是如此。你可能會問,為什麼 Kafka 不使用 HTTP 作為底層的通訊協議呢?其實這裡面的原因有很多,但最主要的原因在於 TCP 和 HTTP 之間的區別。從社群的角度來看,在開發客戶端時,人們能夠利用 TCP 本身提供的一些高階功能,比如多路複用請求以及同時輪詢多個連線的能力。

所謂的多路複用請求,即 multiplexing request,是指將兩個或多個數據流合併到底層單一物理連線中的過程。TCP 的多路複用請求會在一條物理連線上建立若干個虛擬連線,每個虛擬連線負責流轉各自對應的資料流。其實嚴格來說,TCP 並不能多路複用,它只是提供可靠的訊息交付語義保證,比如自動重傳丟失的報文。

更嚴謹地說,作為一個基於報文的協議,TCP 能夠被用於多路複用連線場景的前提是,上層的應用協議(比如 HTTP)允許傳送多條訊息。不過,我們今天並不是要詳細討論 TCP 原理,因此你只需要知道這是社群採用 TCP 的理由之一就行了。除了 TCP 提供的這些高階功能有可能被 Kafka 客戶端的開發人員使用之外,社群還發現,目前已知的 HTTP 庫在很多程式語言中都略顯簡陋。基於這兩個原因,Kafka 社群決定採用 TCP 協議作為所有請求通訊的底層協議。

Java生產者程式管理tcp

Kafka 的 Java 生產者 API 主要的物件就是 KafkaProducer。通常我們開發一個生產者的步驟有 4 步。

第 1 步:構造生產者物件所需的引數物件。

第 2 步:利用第 1 步的引數物件,建立 KafkaProducer 物件例項。

第 3 步:使用 KafkaProducer 的 send 方法傳送訊息。

第 4 步:呼叫 KafkaProducer 的 close 方法關閉生產者並釋放各種系統資源。

上面這 4 步寫成 Java 程式碼的話大概是這個樣子:

1 Properties props = new Properties ();
2 props.put(“引數1”, “引數1的值”);
3 props.put(“引數2”, “引數2的值”);
4 ……
5 try (Producer<String, String> producer = new KafkaProducer<>(props)) {
6             producer.send(new ProducerRecord<String, String>(……), callback);
7   ……
8 }

這段程式碼使用了 Java 7 提供的 try-with-resource 特性,所以並沒有顯式呼叫 producer.close() 方法。無論是否顯式呼叫 close 方法,所有生產者程式大致都是這個路數。現在問題來了,當我們開發一個 Producer 應用時,生產者會向 Kafka 叢集中指定的主題(Topic)傳送訊息,這必然涉及與 Kafka Broker 建立 TCP 連線。那麼,Kafka 的 Producer 客戶端是如何管理這些 TCP 連線的呢?

要回答上面這個問題,我們首先要弄明白生產者程式碼是什麼時候建立 TCP 連線的。

首先,生產者應用在建立 KafkaProducer 例項時是會建立與 Broker 的 TCP 連線的。其實這種表述也不是很準確,應該這樣說:在建立 KafkaProducer 例項時,生產者應用會在後臺建立並啟動一個名為 Sender 的執行緒,該 Sender 執行緒開始執行時首先會建立與 Broker 的連線。我截取了一段測試環境中的日誌來說明這一點:

你也許會問:怎麼可能是這樣?如果不呼叫 send 方法,這個 Producer 都不知道給哪個主題發訊息,它又怎麼能知道連線哪個 Broker 呢?難不成它會連線 bootstrap.servers 引數指定的所有 Broker 嗎?嗯,是的,Java Producer 目前還真是這樣設計的。我在這裡稍微解釋一下 bootstrap.servers 引數。它是 Producer 的核心引數之一,指定了這個 Producer 啟動時要連線的 Broker 地址。請注意,這裡的“啟動時”,代表的是 Producer 啟動時會發起與這些 Broker 的連線。因此,如果你為這個引數指定了 1000 個 Broker 連線資訊,那麼很遺憾,你的 Producer 啟動時會首先建立與這 1000 個 Broker 的 TCP 連線。在實際使用過程中,我並不建議把叢集中所有的 Broker 資訊都配置到 bootstrap.servers 中,通常你指定 3~4 臺就足以了。因為 Producer 一旦連線到叢集中的任一臺 Broker,就能拿到整個叢集的 Broker 資訊,故沒必要為 bootstrap.servers 指定所有的 Broker。

針對 TCP 連線何時建立的問題,目前我們的結論是這樣的:TCP 連線是在建立 KafkaProducer 例項時建立的。那麼,我們想問的是,它只會在這個時候被建立嗎?當然不是!TCP 連線還可能在兩個地方被建立:一個是在更新元資料後,另一個是在訊息傳送時。為什麼說是可能?因為這兩個地方並非總是建立 TCP 連線。當 Producer 更新了叢集的元資料資訊之後,如果發現與某些 Broker 當前沒有連線,那麼它就會建立一個 TCP 連線。同樣地,當要傳送訊息時,Producer 發現尚不存在與目標 Broker 的連線,也會建立一個。

接下來,我們來看看 Producer 更新叢集元資料資訊的兩個場景。

場景一:當 Producer 嘗試給一個不存在的主題傳送訊息時,Broker 會告訴 Producer 說這個主題不存在。此時 Producer 會發送 METADATA 請求給 Kafka 叢集,去嘗試獲取最新的元資料資訊。

場景二:Producer 通過 metadata.max.age.ms 引數定期地去更新元資料資訊。該引數的預設值是 300000,即 5 分鐘,也就是說不管叢集那邊是否有變化,Producer 每 5 分鐘都會強制重新整理一次元資料以保證它是最及時的資料。

說完了 TCP 連線的建立,我們來說說它們何時被關閉。Producer 端關閉 TCP 連線的方式有兩種:一種是使用者主動關閉;一種是 Kafka 自動關閉。

我們先說第一種。這裡的主動關閉實際上是廣義的主動關閉,甚至包括使用者呼叫 kill -9 主動“殺掉”Producer 應用。當然最推薦的方式還是呼叫 producer.close() 方法來關閉。第二種是 Kafka 幫你關閉,這與 Producer 端引數 connections.max.idle.ms 的值有關。預設情況下該引數值是 9 分鐘,即如果在 9 分鐘內沒有任何請求“流過”某個 TCP 連線,那麼 Kafka 會主動幫你把該 TCP 連線關閉。使用者可以在 Producer 端設定 connections.max.idle.ms=-1 禁掉這種機制。一旦被設定成 -1,TCP 連線將成為永久長連線。當然這只是軟體層面的“長連線”機制,由於 Kafka 建立的這些 Socket 連線都開啟了 keepalive,因此 keepalive 探活機制還是會遵守的。值得注意的是,在第二種方式中,TCP 連線是在 Broker 端被關閉的,但其實這個 TCP 連線的發起方是客戶端,因此在 TCP 看來,這屬於被動關閉的場景,即 passive close。被動關閉的後果就是會產生大量的 CLOSE_WAIT 連線,因此 Producer 端或 Client 端沒有機會顯式地觀測到此連線已被中斷。

Java消費者程式管理tcp

我們先從消費者建立 TCP 連線開始討論。消費者端主要的程式入口是 KafkaConsumer 類。和生產者不同的是,構建 KafkaConsumer 例項時是不會建立任何 TCP 連線的,也就是說,當你執行完 new KafkaConsumer(properties) 語句後,你會發現,沒有 Socket 連線被創建出來。這一點和 Java 生產者是有區別的,主要原因就是生產者入口類 KafkaProducer 在構建例項的時候,會在後臺默默地啟動一個 Sender 執行緒,這個 Sender 執行緒負責 Socket 連線的建立。從這一點上來看,我個人認為 KafkaConsumer 的設計比 KafkaProducer 要好。就像我在第 13 講中所說的,在 Java 建構函式中啟動執行緒,會造成 this 指標的逃逸,這始終是一個隱患。如果 Socket 不是在建構函式中建立的,那麼是在 KafkaConsumer.subscribe 或 KafkaConsumer.assign 方法中建立的嗎?嚴格來說也不是。我還是直接給出答案吧:TCP 連線是在呼叫 KafkaConsumer.poll 方法時被建立的。再細粒度地說,在 poll 方法內部有 3 個時機可以建立 TCP 連線。

1.發起 FindCoordinator 請求時。

還記得消費者端有個元件叫協調者(Coordinator)嗎?它駐留在 Broker 端的記憶體中,負責消費者組的組成員管理和各個消費者的位移提交管理。當消費者程式首次啟動呼叫 poll 方法時,它需要向 Kafka 叢集傳送一個名為 FindCoordinator 的請求,希望 Kafka 叢集告訴它哪個 Broker 是管理它的協調者。不過,消費者應該向哪個 Broker 傳送這類請求呢?理論上任何一個 Broker 都能回答這個問題,也就是說消費者可以傳送 FindCoordinator 請求給叢集中的任意伺服器。在這個問題上,社群做了一點點優化:消費者程式會向叢集中當前負載最小的那臺 Broker 傳送請求。負載是如何評估的呢?其實很簡單,就是看消費者連線的所有 Broker 中,誰的待發送請求最少。當然了,這種評估顯然是消費者端的單向評估,並非是站在全域性角度,因此有的時候也不一定是最優解。不過這不併影響我們的討論。總之,在這一步,消費者會建立一個 Socket 連線。

2.連線協調者時。

Broker 處理完上一步傳送的 FindCoordinator 請求之後,會返還對應的響應結果(Response),顯式地告訴消費者哪個 Broker 是真正的協調者,因此在這一步,消費者知曉了真正的協調者後,會建立連向該 Broker 的 Socket 連線。只有成功連入協調者,協調者才能開啟正常的組協調操作,比如加入組、等待組分配方案、心跳請求處理、位移獲取、位移提交等。

3.消費資料時。

消費者會為每個要消費的分割槽建立與該分割槽領導者副本所在 Broker 連線的 TCP。舉個例子,假設消費者要消費 5 個分割槽的資料,這 5 個分割槽各自的領導者副本分佈在 4 臺 Broker 上,那麼該消費者在消費時會建立與這 4 臺 Broker 的 Socket 連線。

通常來說,消費者程式會建立 3 類 TCP 連線:確定協調者和獲取叢集元資料。連線協調者,令其執行組成員管理操作。執行實際的訊息獲取。

和生產者類似,消費者關閉 Socket 也分為主動關閉和 Kafka 自動關閉。主動關閉是指你顯式地呼叫消費者 API 的方法去關閉消費者,具體方式就是手動呼叫 KafkaConsumer.close() 方法,或者是執行 Kill 命令,不論是 Kill -2 還是 Kill -9;而 Kafka 自動關閉是由消費者端引數 connection.max.idle.ms 控制的,該引數現在的預設值是 9 分鐘,即如果某個 Socket 連線上連續 9 分鐘都沒有任何請求“過境”的話,那麼消費者會強行“殺掉”這個 Socket 連線。

不過,和生產者有些不同的是,如果在編寫消費者程式時,你使用了迴圈的方式來呼叫 poll 方法消費訊息,那麼上面提到的所有請求都會被定期傳送到 Broker,因此這些 Socket 連線上總是能保證有請求在傳送,從而也就實現了“長連線”的效果。針對上面提到的三類 TCP 連線,你需要注意的是,當第三類 TCP 連線成功建立後,消費者程式就會廢棄第一類 TCP 連線,之後在定期請求元資料時,它會改為使用第三類 TCP 連線。也就是說,最終你會發現,第一類 TCP 連線會在後臺被默默地關閉掉。對一個運行了一段時間的消費者程式來說,只會有後面兩類 TCP 連線存在。

從理論上說,Kafka Java 消費者管理 TCP 資源的機制我已經說清楚了,但如果仔細推敲這裡面的設計原理,還是會發現一些問題。我們剛剛講過,第一類 TCP 連線僅僅是為了首次獲取元資料而建立的,後面就會被廢棄掉。最根本的原因是,消費者在啟動時還不知道 Kafka 叢集的資訊,只能使用一個“假”的 ID 去註冊,即使消費者獲取了真實的 Broker ID,它依舊無法區分這個“假”ID 對應的是哪臺 Broker,因此也就無法重用這個 Socket 連線,只能再重新建立一個新的連線。為什麼會出現這種情況呢?主要是因為目前 Kafka 僅僅使用 ID 這一個維度的資料來表徵 Socket 連線資訊。這點資訊明顯不足以確定連線的是哪臺 Broker,也許在未來,社群應該考慮使用 < 主機名、埠、ID> 三元組的方式來定位 Socket 資源,這樣或許能夠讓消費者程式少建立一些 TCP 連線。

冪等生產者和事務生產者

所謂的訊息交付可靠性保障,是指 Kafka 對 Producer 和 Consumer 要處理的訊息提供什麼樣的承諾。常見的承諾有以下三種:

最多一次(at most once):訊息可能會丟失,但絕不會被重複傳送。

至少一次(at least once):訊息不會丟失,但有可能被重複傳送。

精確一次(exactly once):訊息不會丟失,也不會被重複傳送。

目前,Kafka 預設提供的交付可靠性保障是第二種,即至少一次。我們說過訊息“已提交”的含義,即只有 Broker 成功“提交”訊息且 Producer 接到 Broker 的應答才會認為該訊息成功傳送。不過倘若訊息成功“提交”,但 Broker 的應答沒有成功傳送回 Producer 端(比如網路出現瞬時抖動),那麼 Producer 就無法確定訊息是否真的提交成功了。因此,它只能選擇重試,也就是再次傳送相同的訊息。這就是 Kafka 預設提供至少一次可靠性保障的原因,不過這會導致訊息重複傳送。Kafka 也可以提供最多一次交付保障,只需要讓 Producer 禁止重試即可。這樣一來,訊息要麼寫入成功,要麼寫入失敗,但絕不會重複傳送。我們通常不會希望出現訊息丟失的情況,但一些場景裡偶發的訊息丟失其實是被允許的,相反,訊息重複是絕對要避免的。此時,使用最多一次交付保障就是最恰當的。無論是至少一次還是最多一次,都不如精確一次來得有吸引力。大部分使用者還是希望訊息只會被交付一次,這樣的話,訊息既不會丟失,也不會被重複處理。或者說,即使 Producer 端重複傳送了相同的訊息,Broker 端也能做到自動去重。在下游 Consumer 看來,訊息依然只有一條。那麼問題來了,Kafka 是怎麼做到精確一次的呢?簡單來說,這是通過兩種機制:冪等性(Idempotence)和事務(Transaction)。它們分別是什麼機制?兩者是一回事嗎?要回答這些問題,我們首先來說說什麼是冪等性。

冪等性

“冪等”這個詞原是數學領域中的概念,指的是某些操作或函式能夠被執行多次,但每次得到的結果都是不變的。我來舉幾個簡單的例子說明一下。比如在乘法運算中,讓數字乘以 1 就是一個冪等操作,因為不管你執行多少次這樣的運算,結果都是相同的。再比如,取整函式(floor 和 ceiling)是冪等函式,那麼執行 1 次 floor(3.4) 和 100 次 floor(3.4),結果是一樣的,都是 3。相反地,讓一個數加 1 這個操作就不是冪等的,因為執行一次和執行多次的結果必然不同。

在計算機領域中,冪等性的含義稍微有一些不同:在指令式程式設計語言(比如 C)中,若一個子程式是冪等的,那它必然不能修改系統狀態。這樣不管執行這個子程式多少次,與該子程式關聯的那部分系統狀態保持不變。在函數語言程式設計語言(比如 Scala 或 Haskell)中,很多純函式(pure function)天然就是冪等的,它們不執行任何的 side effect。

冪等性有很多好處,其最大的優勢在於我們可以安全地重試任何冪等性操作,反正它們也不會破壞我們的系統狀態。如果是非冪等性操作,我們還需要擔心某些操作執行多次對狀態的影響,但對於冪等性操作而言,我們根本無需擔心此事。

在 Kafka 中,Producer 預設不是冪等性的,但我們可以建立冪等性 Producer。它其實是 0.11.0.0 版本引入的新功能。在此之前,Kafka 向分割槽傳送資料時,可能會出現同一條訊息被髮送了多次,導致訊息重複的情況。在 0.11 之後,指定 Producer 冪等性的方法很簡單,僅需要設定一個引數即可,即 props.put(“enable.idempotence”, ture),或 props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true)。

enable.idempotence 被設定成 true 後,Producer 自動升級成冪等性 Producer,其他所有的程式碼邏輯都不需要改變。Kafka 自動幫你做訊息的重複去重。底層具體的原理很簡單,就是經典的用空間去換時間的優化思路,即在 Broker 端多儲存一些欄位。當 Producer 傳送了具有相同欄位值的訊息後,Broker 能夠自動知曉這些訊息已經重複了,於是可以在後臺默默地把它們“丟棄”掉。當然,實際的實現原理並沒有這麼簡單,但你大致可以這麼理解。

看上去,冪等性 Producer 的功能很酷,使用起來也很簡單,僅僅設定一個引數就能保證訊息不重複了,但實際上,我們必須要了解冪等性 Producer 的作用範圍。首先,它只能保證單分割槽上的冪等性,即一個冪等性 Producer 能夠保證某個主題的一個分割槽上不出現重複訊息,它無法實現多個分割槽的冪等性。其次,它只能實現單會話上的冪等性,不能實現跨會話的冪等性。這裡的會話,你可以理解為 Producer 程序的一次執行。當你重啟了 Producer 程序之後,這種冪等性保證就喪失了。那麼你可能會問,如果我想實現多分割槽以及多會話上的訊息無重複,應該怎麼做呢?答案就是事務(transaction)或者依賴事務型 Producer。這也是冪等性 Producer 和事務型 Producer 的最大區別!

事務

Kafka 的事務概念類似於我們熟知的資料庫提供的事務。在資料庫領域,事務提供的安全性保障是經典的 ACID,即原子性(Atomicity)、一致性 (Consistency)、隔離性 (Isolation) 和永續性 (Durability)。

當然,在實際場景中各家資料庫對 ACID 的實現各不相同。特別是 ACID 本身就是一個有歧義的概念,比如對隔離性的理解。大體來看,隔離性非常自然和必要,但是具體到實現細節就顯得不那麼精確了。通常來說,隔離性表明併發執行的事務彼此相互隔離,互不影響。經典的資料庫教科書把隔離性稱為可序列化 (serializability),即每個事務都假裝它是整個資料庫中唯一的事務。

提到隔離級別,這種歧義或混亂就更加明顯了。很多資料庫廠商對於隔離級別的實現都有自己不同的理解,比如有的資料庫提供 Snapshot 隔離級別,而在另外一些資料庫中,它們被稱為可重複讀(repeatable read)。好在對於已提交讀(read committed)隔離級別的提法,各大主流資料庫廠商都比較統一。所謂的 read committed,指的是當讀取資料庫時,你只能看到已提交的資料,即無髒讀。同時,當寫入資料庫時,你也只能覆蓋掉已提交的資料,即無髒寫。Kafka 自 0.11 版本開始也提供了對事務的支援,目前主要是在 read committed 隔離級別上做事情。它能保證多條訊息原子性地寫入到目標分割槽,同時也能保證 Consumer 只能看到事務成功提交的訊息。下面我們就來看看 Kafka 中的事務型 Producer。

事務型Producer

事務型 Producer 能夠保證將訊息原子性地寫入到多個分割槽中。這批訊息要麼全部寫入成功,要麼全部失敗。另外,事務型 Producer 也不懼程序的重啟。Producer 重啟回來後,Kafka 依然保證它們傳送訊息的精確一次處理。設定事務型 Producer 的方法也很簡單,滿足兩個要求即可:和冪等性 Producer 一樣,開啟 enable.idempotence = true。設定 Producer 端引數 transactional. id。最好為其設定一個有意義的名字。此外,你還需要在 Producer 程式碼中做一些調整,如這段程式碼所示:

1 producer.initTransactions();
2 try {
3             producer.beginTransaction();
4             producer.send(record1);
5             producer.send(record2);
6             producer.commitTransaction();
7 } catch (KafkaException e) {
8             producer.abortTransaction();
9 }

這段程式碼能夠保證 Record1 和 Record2 被當作一個事務統一提交到 Kafka,要麼它們全部提交成功,要麼全部寫入失敗。實際上即使寫入失敗,Kafka 也會把它們寫入到底層的日誌中,也就是說 Consumer 還是會看到這些訊息。因此在 Consumer 端,讀取事務型 Producer 傳送的訊息也是需要一些變更的。修改起來也很簡單,設定 isolation.level 引數的值即可。當前這個引數有兩個取值:read_uncommitted:這是預設值,表明 Consumer 能夠讀取到 Kafka 寫入的任何訊息,不論事務型 Producer 提交事務還是終止事務,其寫入的訊息都可以讀取。很顯然,如果你用了事務型 Producer,那麼對應的 Consumer 就不要使用這個值。read_committed:表明 Consumer 只會讀取事務型 Producer 成功提交事務寫入的訊息。當然了,它也能看到非事務型 Producer 寫入的所有訊息。

消費者組

消費者組,即 Consumer Group,應該算是 Kafka 比較有亮點的設計了。那麼何謂 Consumer Group 呢?用一句話概括就是:Consumer Group 是 Kafka 提供的可擴充套件且具有容錯性的消費者機制。既然是一個組,那麼組內必然可以有多個消費者或消費者例項(Consumer Instance),它們共享一個公共的 ID,這個 ID 被稱為 Group ID。組內的所有消費者協調在一起來消費訂閱主題(Subscribed Topics)的所有分割槽(Partition)。當然,每個分割槽只能由同一個消費者組內的一個 Consumer 例項來消費。個人認為,理解 Consumer Group 記住下面這三個特性就好了。

Consumer Group 下可以有一個或多個 Consumer 例項。這裡的例項可以是一個單獨的程序,也可以是同一程序下的執行緒。在實際場景中,使用程序更為常見一些。

Group ID 是一個字串,在一個 Kafka 叢集中,它標識唯一的一個 Consumer Group。

Consumer Group 下所有例項訂閱的主題的單個分割槽,只能分配給組內的某個 Consumer 例項消費。這個分割槽當然也可以被其他的 Group 消費。

Consumer Group 之間彼此獨立,互不影響,它們能夠訂閱相同的一組主題而互不干涉。再加上 Broker 端的訊息留存機制,Kafka 的 Consumer Group 完美地規避了上面提到的伸縮性差的問題。可以這麼說,Kafka 僅僅使用 Consumer Group 這一種機制,卻同時實現了傳統訊息引擎系統的兩大模型:如果所有例項都屬於同一個 Group,那麼它實現的就是訊息佇列模型;如果所有例項分別屬於不同的 Group,那麼它實現的就是釋出 / 訂閱模型。

在瞭解了 Consumer Group 以及它的設計亮點之後,你可能會有這樣的疑問:在實際使用場景中,我怎麼知道一個 Group 下該有多少個 Consumer 例項呢?理想情況下,Consumer 例項的數量應該等於該 Group 訂閱主題的分割槽總數。舉個簡單的例子,假設一個 Consumer Group 訂閱了 3 個主題,分別是 A、B、C,它們的分割槽數依次是 1、2、3,那麼通常情況下,為該 Group 設定 6 個 Consumer 例項是比較理想的情形,因為它能最大限度地實現高伸縮性。你可能會問,我能設定小於或大於 6 的例項嗎?當然可以!如果你有 3 個例項,那麼平均下來每個例項大約消費 2 個分割槽(6 / 3 = 2);如果你設定了 8 個例項,那麼很遺憾,有 2 個例項(8 – 6 = 2)將不會被分配任何分割槽,它們永遠處於空閒狀態。因此,在實際使用過程中一般不推薦設定大於總分割槽數的 Consumer 例項。設定多餘的例項只會浪費資源,而沒有任何好處。

Kafka 有新舊客戶端 API 之分,那自然也就有新舊 Consumer 之分。老版本的 Consumer 也有消費者組的概念,它和我們目前討論的 Consumer Group 在使用感上並沒有太多的不同,只是它管理位移的方式和新版本是不一樣的。老版本的 Consumer Group 把位移儲存在 ZooKeeper 中。Apache ZooKeeper 是一個分散式的協調服務框架,Kafka 重度依賴它實現各種各樣的協調管理。將位移儲存在 ZooKeeper 外部系統的做法,最顯而易見的好處就是減少了 Kafka Broker 端的狀態儲存開銷。現在比較流行的提法是將伺服器節點做成無狀態的,這樣可以自由地擴縮容,實現超強的伸縮性。Kafka 最開始也是基於這樣的考慮,才將 Consumer Group 位移儲存在獨立於 Kafka 叢集之外的框架中。

不過,慢慢地人們發現了一個問題,即 ZooKeeper 這類元框架其實並不適合進行頻繁的寫更新,而 Consumer Group 的位移更新卻是一個非常頻繁的操作。這種大吞吐量的寫操作會極大地拖慢 ZooKeeper 叢集的效能,因此 Kafka 社群漸漸有了這樣的共識:將 Consumer 位移儲存在 ZooKeeper 中是不合適的做法。於是,在新版本的 Consumer Group 中,Kafka 社群重新設計了 Consumer Group 的位移管理方式,採用了將位移儲存在 Kafka 內部主題的方法。這個內部主題就是讓人既愛又恨的 __consumer_offsets。我會在專欄後面的內容中專門介紹這個神祕的主題。不過,現在你需要記住新版本的 Consumer Group 將位移儲存在 Broker 端的內部主題中。最後,我們來說說 Consumer Group 端大名鼎鼎的重平衡,也就是所謂的 Rebalance 過程。我形容其為“大名鼎鼎”,從某種程度上來說其實也是“臭名昭著”,因為有關它的 bug 真可謂是此起彼伏,從未間斷。這裡我先賣個關子,後面我會解釋它“遭人恨”的地方。我們先來了解一下什麼是 Rebalance。

Rebalance 本質上是一種協議,規定了一個 Consumer Group 下的所有 Consumer 如何達成一致,來分配訂閱 Topic 的每個分割槽。比如某個 Group 下有 20 個 Consumer 例項,它訂閱了一個具有 100 個分割槽的 Topic。正常情況下,Kafka 平均會為每個 Consumer 分配 5 個分割槽。這個分配的過程就叫 Rebalance。那麼 Consumer Group 何時進行 Rebalance 呢?Rebalance 的觸發條件有 3 個。

組成員數發生變更。比如有新的 Consumer 例項加入組或者離開組,抑或是有 Consumer 例項崩潰被“踢出”組。

訂閱主題數發生變更。Consumer Group 可以使用正則表示式的方式訂閱主題,比如 consumer.subscribe(Pattern.compile(“t.*c”)) 就表明該 Group 訂閱所有以字母 t 開頭、字母 c 結尾的主題。在 Consumer Group 的執行過程中,你新建立了一個滿足這樣條件的主題,那麼該 Group 就會發生 Rebalance。

訂閱主題的分割槽數發生變更。Kafka 當前只能允許增加一個主題的分割槽數。當分割槽數增加時,就會觸發訂閱該主題的所有 Group 開啟 Rebalance。

下面來說說reblance“遭人恨”的地方,首先,Rebalance 過程對 Consumer Group 消費過程有極大的影響。如果你瞭解 JVM 的垃圾回收機制,你一定聽過萬物靜止的收集方式,即著名的 stop the world,簡稱 STW。在 STW 期間,所有應用執行緒都會停止工作,表現為整個應用程式僵在那邊一動不動。Rebalance 過程也和這個類似,在 Rebalance 過程中,所有 Consumer 例項都會停止消費,等待 Rebalance 完成。這是 Rebalance 為人詬病的一個方面。其次,目前 Rebalance 的設計是所有 Consumer 例項共同參與,全部重新分配所有分割槽。其實更高效的做法是儘量減少分配方案的變動。例如例項 A 之前負責消費分割槽 1、2、3,那麼 Rebalance 之後,如果可能的話,最好還是讓例項 A 繼續消費分割槽 1、2、3,而不是被重新分配其他的分割槽。這樣的話,例項 A 連線這些分割槽所在 Broker 的 TCP 連線就可以繼續用,不用重新建立連線其他 Broker 的 Socket 資源。最後,Rebalance 實在是太慢了。曾經,有個國外使用者的 Group 內有幾百個 Consumer 例項,成功 Rebalance 一次要幾個小時!這完全是不能忍受的。最悲劇的是,目前社群對此無能為力,至少現在還沒有特別好的解決方案。所謂“本事大不如不攤上”,也許最好的解決方案就是避免 Rebalance 的發生吧。

避免消費者組平衡

具體來講,Consumer 端應用程式在提交位移時,其實是向 Coordinator 所在的 Broker 提交位移。同樣地,當 Consumer 應用啟動時,也是向 Coordinator 所在的 Broker 傳送各種請求,然後由 Coordinator 負責執行消費者組的註冊、成員管理記錄等元資料管理操作。

所有 Broker 在啟動時,都會建立和開啟相應的 Coordinator 元件。也就是說,所有 Broker 都有各自的 Coordinator 元件。那麼,Consumer Group 如何確定為它服務的 Coordinator 在哪臺 Broker 上呢?答案就在Kafka 內部位移主題 __consumer_offsets 身上。

目前,Kafka 為某個 Consumer Group 確定 Coordinator 所在的 Broker 的演算法有 2 個步驟。

第 1 步:確定由位移主題的哪個分割槽來儲存該 Group 資料:partitionId=Math.abs(groupId.hashCode() % offsetsTopicPartitionCount)。

第 2 步:找出該分割槽 Leader 副本所在的 Broker,該 Broker 即為對應的 Coordinator。

好了,我們說回 Rebalance。既然我們今天要討論的是如何避免 Rebalance,那就說明 Rebalance 這個東西不好,或者說至少有一些弊端需要我們去規避。那麼,Rebalance 的弊端是什麼呢?總結起來有以下 3 點:

Rebalance 影響 Consumer 端 TPS。總之就是,在 Rebalance 期間,Consumer 會停下手頭的事情,什麼也幹不了。

Rebalance 很慢。如果你的 Group 下成員很多,就一定會有這樣的痛點。還記得那個國外使用者的例子吧?他的 Group 下有幾百個 Consumer 例項,Rebalance 一次要幾個小時。在那種場景下,Consumer Group 的 Rebalance 已經完全失控了。

Rebalance 效率不高。當前 Kafka 的設計機制決定了每次 Rebalance 時,Group 下的所有成員都要參與進來,而且通常不會考慮區域性性原理,但區域性性原理對提升系統性能是特別重要的。

關於第 3 點,我們來舉個簡單的例子。比如一個 Group 下有 10 個成員,每個成員平均消費 5 個分割槽。假設現在有一個成員退出了,此時就需要開啟新一輪的 Rebalance,把這個成員之前負責的 5 個分割槽“轉移”給其他成員。顯然,比較好的做法是維持當前 9 個成員消費分割槽的方案不變,然後將 5 個分割槽隨機分配給這 9 個成員,這樣能最大限度地減少 Rebalance 對剩餘 Consumer 成員的衝擊。遺憾的是,目前 Kafka 並不是這樣設計的。在預設情況下,每次 Rebalance 時,之前的分配方案都不會被保留。就拿剛剛這個例子來說,當 Rebalance 開始時,Group 會打散這 50 個分割槽(10 個成員 * 5 個分割槽),由當前存活的 9 個成員重新分配它們。顯然這不是效率很高的做法。基於這個原因,社群於 0.11.0.0 版本推出了 StickyAssignor,即有粘性的分割槽分配策略。所謂的有粘性,是指每次 Rebalance 時,該策略會盡可能地保留之前的分配方案,儘量實現分割槽分配的最小變動。不過有些遺憾的是,這個策略目前還有一些 bug,而且需要升級到 0.11.0.0 才能使用,因此在實際生產環境中用得還不是很多。

就我個人經驗而言,在真實的業務場景中,很多 Rebalance 都是計劃外的或者說是不必要的。我們應用的 TPS 大多是被這類 Rebalance 拖慢的,因此避免這類 Rebalance 就顯得很有必要了。下面我們就來說說如何避免 Rebalance。要避免 Rebalance,還是要從 Rebalance 發生的時機入手。

我們在前面說過,Rebalance 發生的時機有三個:

組成員數量發生變化

訂閱主題數量發生變化

訂閱主題的分割槽數發生變化

Consumer 程式時,實際上就向這個 Group 添加了一個新的 Consumer 例項。此時,Coordinator 會接納這個新例項,將其加入到組中,並重新分配分割槽。通常來說,增加 Consumer 例項的操作都是計劃內的,可能是出於增加 TPS 或提高伸縮性的需要。總之,它不屬於我們要規避的那類“不必要 Rebalance”。我們更在意的是 Group 下例項數減少這件事。如果你就是要停掉某些 Consumer 例項,那自不必說,關鍵是在某些情況下,Consumer 例項會被 Coordinator 錯誤地認為“已停止”從而被“踢出”Group。如果是這個原因導致的 Rebalance,我們就不能不管了。Coordinator 會在什麼情況下認為某個 Consumer 例項已掛從而要退組呢?這個絕對是需要好好討論的話題,我們來詳細說說。

當 Consumer Group 完成 Rebalance 之後,每個 Consumer 例項都會定期地向 Coordinator 傳送心跳請求,表明它還存活著。如果某個 Consumer 例項不能及時地傳送這些心跳請求,Coordinator 就會認為該 Consumer 已經“死”了,從而將其從 Group 中移除,然後開啟新一輪 Rebalance。Consumer 端有個引數,叫 session.timeout.ms,就是被用來表徵此事的。該引數的預設值是 10 秒,即如果 Coordinator 在 10 秒之內沒有收到 Group 下某 Consumer 例項的心跳,它就會認為這個 Consumer 例項已經掛了。可以這麼說,session.timeout.ms 決定了 Consumer 存活性的時間間隔。

除了這個引數,Consumer 還提供了一個允許你控制傳送心跳請求頻率的引數,就是 heartbeat.interval.ms。這個值設定得越小,Consumer 例項傳送心跳請求的頻率就越高。頻繁地傳送心跳請求會額外消耗頻寬資源,但好處是能夠更加快速地知曉當前是否開啟 Rebalance,因為,目前 Coordinator 通知各個 Consumer 例項開啟 Rebalance 的方法,就是將 REBALANCE_NEEDED 標誌封裝進心跳請求的響應體中。除了以上兩個引數,Consumer 端還有一個引數,用於控制 Consumer 實際消費能力對 Rebalance 的影響,即 max.poll.interval.ms 引數。它限定了 Consumer 端應用程式兩次呼叫 poll 方法的最大時間間隔。它的預設值是 5 分鐘,表示你的 Consumer 程式如果在 5 分鐘之內無法消費完 poll 方法返回的訊息,那麼 Consumer 會主動發起“離開組”的請求,Coordinator 也會開啟新一輪 Rebalance。

kafka管理位移的主題

__consumer_offsets 在 Kafka 原始碼中有個更為正式的名字,叫位移主題,即 Offsets Topic。老版本 Consumer 的位移管理是依託於 Apache ZooKeeper 的,它會自動或手動地將位移資料提交到 ZooKeeper 中儲存。當 Consumer 重啟後,它能自動從 ZooKeeper 中讀取位移資料,從而在上次消費截止的地方繼續消費。這種設計使得 Kafka Broker 不需要儲存位移資料,減少了 Broker 端需要持有的狀態空間,因而有利於實現高伸縮性。

但是,ZooKeeper 其實並不適用於這種高頻的寫操作,因此,Kafka 社群自 0.8.2.x 版本開始,就在醞釀修改這種設計,並最終在新版本 Consumer 中正式推出了全新的位移管理機制,自然也包括這個新的位移主題。新版本 Consumer 的位移管理機制其實也很簡單,就是將 Consumer 的位移資料作為一條條普通的 Kafka 訊息,提交到 __consumer_offsets 中。可以這麼說,__consumer_offsets 的主要作用是儲存 Kafka 消費者的位移資訊。它要求這個提交過程不僅要實現高永續性,還要支援高頻的寫操作。顯然,Kafka 的主題設計天然就滿足這兩個條件,因此,使用 Kafka 主題來儲存位移這件事情,實際上就是一個水到渠成的想法了。

雖說位移主題是一個普通的 Kafka 主題,但它的訊息格式卻是 Kafka 自己定義的,使用者不能修改,也就是說你不能隨意地向這個主題寫訊息,因為一旦你寫入的訊息不滿足 Kafka 規定的格式,那麼 Kafka 內部無法成功解析,就會造成 Broker 的崩潰。事實上,Kafka Consumer 有 API 幫你提交位移,也就是向位移主題寫訊息。你千萬不要自己寫個 Producer 隨意向該主題傳送訊息。你可能會好奇,這個主題存的到底是什麼格式的訊息呢?所謂的訊息格式,你可以簡單地理解為是一個 KV 對。Key 和 Value 分別表示訊息的鍵值和訊息體,在 Kafka 中它們就是位元組陣列而已。

位移主題的 Key 中應該儲存 3 部分內容:<GROUP ID,主題名,分割槽號>。接下來,我們再來看看訊息體的設計。也許你會覺得訊息體應該很簡單,儲存一個位移值就可以了。實際上,社群的方案要複雜得多,比如訊息體還儲存了位移提交的一些其他元資料,諸如時間戳和使用者自定義的資料等。儲存這些元資料是為了幫助 Kafka 執行各種各樣後續的操作,比如刪除過期位移訊息等。但總體來說,我們還是可以簡單地認為訊息體就是儲存了位移值。

位移主題是 Kafka 自動建立的,那麼該主題的分割槽數是 50,副本數是 3。建立位移主題當然是為了用的,那麼什麼地方會用到位移主題呢?我們前面一直在說 Kafka Consumer 提交位移時會寫入該主題,那 Consumer 是怎麼提交位移的呢?目前 Kafka Consumer 提交位移的方式有兩種:自動提交位移和手動提交位移。

事實上,很多與 Kafka 整合的大資料框架都是禁用自動提交位移的,如 Spark、Flink 等。這就引出了另一種位移提交方式:手動提交位移,即設定 enable.auto.commit = false。一旦設定了 false,作為 Consumer 應用開發的你就要承擔起位移提交的責任。Kafka Consumer API 為你提供了位移提交的方法,如 consumer.commitSync 等。當呼叫這些方法時,Kafka 會向位移主題寫入相應的訊息。如果你選擇的是自動提交位移,那麼就可能存在一個問題:只要 Consumer 一直啟動著,它就會無限期地向位移主題寫入訊息。

Kafka 是怎麼刪除位移主題中的過期訊息的呢?答案就是 Compaction。國內很多文獻都將其翻譯成壓縮,我個人是有一點保留意見的。在英語中,壓縮的專有術語是 Compression,它的原理和 Compaction 很不相同,我更傾向於翻譯成壓實,或乾脆採用 JVM 垃圾回收中的術語:整理。

不管怎麼翻譯,Kafka 使用 Compact 策略來刪除位移主題中的過期訊息,避免該主題無限期膨脹。那麼應該如何定義 Compact 策略中的過期呢?對於同一個 Key 的兩條訊息 M1 和 M2,如果 M1 的傳送時間早於 M2,那麼 M1 就是過期訊息。Compact 的過程就是掃描日誌的所有訊息,剔除那些過期的訊息,然後把剩下的訊息整理在一起。我在這裡貼一張來自官網的圖片,來說明 Compact 過程。

Kafka 提供了專門的後臺執行緒定期地巡檢待 Compact 的主題,看看是否存在滿足條件的可刪除資料。這個後臺執行緒叫 Log Cleaner。很多實際生產環境中都出現過位移主題無限膨脹佔用過多磁碟空間的問題,如果你的環境中也有這個問題,我建議你去檢查一下 Log Cleaner 執行緒的狀態,通常都是這個執行緒掛掉了導致的。

consumer位移提交

Consumer 端有個位移的概念,它和訊息在分割槽中的位移不是一回事兒,雖然它們的英文都是 Offset。今天我們要聊的位移是 Consumer 的消費位移,它記錄了 Consumer 要消費的下一條訊息的位移。這可能和你以前瞭解的有些出入,不過切記是下一條訊息的位移,而不是目前最新消費訊息的位移。提交位移主要是為了表徵 Consumer 的消費進度,這樣當 Consumer 發生故障重啟之後,就能夠從 Kafka 中讀取之前提交的位移值,然後從相應的位移處繼續消費,從而避免整個消費過程重來一遍。換句話說,位移提交是 Kafka 提供給你的一個工具或語義保障,你負責維持這個語義保障,即如果你提交了位移 X,那麼 Kafka 會認為所有位移值小於 X 的訊息你都已經成功消費了。

這一點特別關鍵。因為位移提交非常靈活,你完全可以提交任何位移值,但由此產生的後果你也要一併承擔。假設你的 Consumer 消費了 10 條訊息,你提交的位移值卻是 20,那麼從理論上講,位移介於 11~19 之間的訊息是有可能丟失的;相反地,如果你提交的位移值是 5,那麼位移介於 5~9 之間的訊息就有可能被重複消費。所以,我想再強調一下,位移提交的語義保障是由你來負責的,Kafka 只會“無腦”地接受你提交的位移。你對位移提交的管理直接影響了你的 Consumer 所能提供的訊息語義保障。鑑於位移提交甚至是位移管理對 Consumer 端的巨大影響,Kafka,特別是 KafkaConsumer API,提供了多種提交位移的方法。從使用者的角度來說,位移提交分為自動提交和手動提交;從 Consumer 端的角度來說,位移提交分為同步提交和非同步提交。

我們先來說說自動提交和手動提交。所謂自動提交,就是指 Kafka Consumer 在後臺默默地為你提交位移,作為使用者的你完全不必操心這些事;而手動提交,則是指你要自己提交位移,Kafka Consumer 壓根不管。開啟自動提交位移的方法很簡單。Consumer 端有個引數 enable.auto.commit,把它設定為 true 或者壓根不設定它就可以了。因為它的預設值就是 true,即 Java Consumer 預設就是自動提交位移的。如果啟用了自動提交,Consumer 端還有個引數就派上用場了:auto.commit.interval.ms。它的預設值是 5 秒,表明 Kafka 每 5 秒會為你自動提交一次位移。為了把這個問題說清楚,我給出了完整的 Java 程式碼。這段程式碼展示了設定自動提交位移的方法。有了這段程式碼做基礎,今天后面的講解我就不再展示完整的程式碼了。

 1 Properties props = new Properties();
 2      props.put("bootstrap.servers", "localhost:9092");
 3      props.put("group.id", "test");
 4      props.put("enable.auto.commit", "true");
 5      props.put("auto.commit.interval.ms", "2000");
 6      props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
 7      props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
 8      KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
 9      consumer.subscribe(Arrays.asList("foo", "bar"));
10      while (true) {
11          ConsumerRecords<String, String> records = consumer.poll(100);
12          for (ConsumerRecord<String, String> record : records)
13              System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
14      }

上面的第 3、第 4 行程式碼,就是開啟自動提交位移的方法。總體來說,還是很簡單的吧。和自動提交相反的,就是手動提交了。開啟手動提交位移的方法就是設定 enable.auto.commit 為 false。但是,僅僅設定它為 false 還不夠,因為你只是告訴 Kafka Consumer 不要自動提交位移而已,你還需要呼叫相應的 API 手動提交位移。最簡單的 API 就是 KafkaConsumer#commitSync()。該方法會提交 KafkaConsumer#poll() 返回的最新位移。從名字上來看,它是一個同步操作,即該方法會一直等待,直到位移被成功提交才會返回。如果提交過程中出現異常,該方法會將異常資訊丟擲。下面這段程式碼展示了 commitSync() 的使用方法:

 1 while (true) {
 2             ConsumerRecords<String, String> records =
 3                         consumer.poll(Duration.ofSeconds(1));
 4             process(records); // 處理訊息
 5             try {
 6                         consumer.commitSync();
 7             } catch (CommitFailedException e) {
 8                         handle(e); // 處理提交失敗異常
 9             }
10 }

可見,呼叫 consumer.commitSync() 方法的時機,是在你處理完了 poll() 方法返回的所有訊息之後。如果你莽撞地過早提交了位移,就可能會出現消費資料丟失的情況。那麼你可能會問,自動提交位移就不會出現消費資料丟失的情況了嗎?它能恰到好處地把握時機進行位移提交嗎?為了搞清楚這個問題,我們必須要深入地瞭解一下自動提交位移的順序。一旦設定了 enable.auto.commit 為 true,Kafka 會保證在開始呼叫 poll 方法時,提交上次 poll 返回的所有訊息。從順序上來說,poll 方法的邏輯是先提交上一批訊息的位移,再處理下一批訊息,因此它能保證不出現消費丟失的情況。但自動提交位移的一個問題在於,它可能會出現重複消費。

在預設情況下,Consumer 每 5 秒自動提交一次位移。現在,我們假設提交位移之後的 3 秒發生了 Rebalance 操作。在 Rebalance 之後,所有 Consumer 從上一次提交的位移處繼續消費,但該位移已經是 3 秒前的位移資料了,故在 Rebalance 發生前 3 秒消費的所有資料都要重新再消費一次。雖然你能夠通過減少 auto.commit.interval.ms 的值來提高提交頻率,但這麼做只能縮小重複消費的時間視窗,不可能完全消除它。這是自動提交機制的一個缺陷。反觀手動提交位移,它的好處就在於更加靈活,你完全能夠把控位移提交的時機和頻率。但是,它也有一個缺陷,就是在呼叫 commitSync() 時,Consumer 程式會處於阻塞狀態,直到遠端的 Broker 返回提交結果,這個狀態才會結束。在任何系統中,因為程式而非資源限制而導致的阻塞都可能是系統的瓶頸,會影響整個應用程式的 TPS。當然,你可以選擇拉長提交間隔,但這樣做的後果是 Consumer 的提交頻率下降,在下次 Consumer 重啟回來後,會有更多的訊息被重新消費。

鑑於這個問題,Kafka 社群為手動提交位移提供了另一個 API 方法:KafkaConsumer#commitAsync()。從名字上來看它就不是同步的,而是一個非同步操作。呼叫 commitAsync() 之後,它會立即返回,不會阻塞,因此不會影響 Consumer 應用的 TPS。由於它是非同步的,Kafka 提供了回撥函式(callback),供你實現提交之後的邏輯,比如記錄日誌或處理異常等。下面這段程式碼展示了呼叫 commitAsync() 的方法:

while (true) {
            ConsumerRecords<String, String> records = 
  consumer.poll(Duration.ofSeconds(1));
            process(records); // 處理訊息
            consumer.commitAsync((offsets, exception) -> {
  if (exception != null)
  handle(exception);
  });
}

commitAsync 是否能夠替代 commitSync 呢?答案是不能。commitAsync 的問題在於,出現問題時它不會自動重試。因為它是非同步操作,倘若提交失敗後自動重試,那麼它重試時提交的位移值可能早已經“過期”或不是最新值了。因此,非同步提交的重試其實沒有意義,所以 commitAsync 是不會重試的。顯然,如果是手動提交,我們需要將 commitSync 和 commitAsync 組合使用才能到達最理想的效果,原因有兩個:

我們可以利用 commitSync 的自動重試來規避那些瞬時錯誤,比如網路的瞬時抖動,Broker 端 GC 等。因為這些問題都是短暫的,自動重試通常都會成功,因此,我們不想自己重試,而是希望 Kafka Consumer 幫我們做這件事。

我們不希望程式總處於阻塞狀態,影響 TPS。

我們來看一下下面這段程式碼,它展示的是如何將兩個 API 方法結合使用進行手動提交。

 1    try {
 2            while(true) {
 3                         ConsumerRecords<String, String> records = 
 4                                     consumer.poll(Duration.ofSeconds(1));
 5                         process(records); // 處理訊息
 6                         commitAysnc(); // 使用非同步提交規避阻塞
 7             }
 8 } catch(Exception e) {
 9             handle(e); // 處理異常
10 } finally {
11             try {
12                         consumer.commitSync(); // 最後一次提交使用同步阻塞式提交
13   } finally {
14        consumer.close();
15 }
16 }

CommitFailedException異常

我相信用過 Kafka Java Consumer 客戶端 API 的你一定不會感到陌生。所謂 CommitFailedException,顧名思義就是 Consumer 客戶端在提交位移時出現了錯誤或異常,而且還是那種不可恢復的嚴重異常。如果異常是可恢復的瞬時錯誤,提交位移的 API 自己就能規避它們了,因為很多提交位移的 API 方法是支援自動錯誤重試的。

我們就來討論下該異常是什麼時候被丟擲的。從原始碼方面來說,CommitFailedException 異常通常發生在手動提交位移時,即使用者顯式呼叫 KafkaConsumer.commitSync() 方法時。從使用場景來說,有兩種典型的場景可能遭遇該異常。

場景一

我們先說說最常見的場景。當訊息處理的總時間超過預設的 max.poll.interval.ms 引數值時,Kafka Consumer 端會丟擲 CommitFailedException 異常。這是該異常最“正宗”的登場方式。你只需要寫一個 Consumer 程式,使用 KafkaConsumer.subscribe 方法隨意訂閱一個主題,之後設定 Consumer 端引數 max.poll.interval.ms=5 秒,最後在迴圈呼叫 KafkaConsumer.poll 方法之間,插入 Thread.sleep(6000) 和手動提交位移,就可以成功復現這個異常了。

如果要防止這種場景下丟擲異常,你需要簡化你的訊息處理邏輯。具體來說有 4 種方法。

1,縮短單條訊息處理的時間。比如,之前下游系統消費一條訊息的時間是 100 毫秒,優化之後成功地下降到 50 毫秒,那麼此時 Consumer 端的 TPS 就提升了一倍。

2,增加 Consumer 端允許下游系統消費一批訊息的最大時長。這取決於 Consumer 端引數 max.poll.interval.ms 的值。在最新版的 Kafka 中,該引數的預設值是 5 分鐘。如果你的消費邏輯不能簡化,那麼提高該引數值是一個不錯的辦法。值得一提的是,Kafka 0.10.1.0 之前的版本是沒有這個引數的,因此如果你依然在使用 0.10.1.0 之前的客戶端 API,那麼你需要增加 session.timeout.ms 引數的值。不幸的是,session.timeout.ms 引數還有其他的含義,因此增加該引數的值可能會有其他方面的“不良影響”,這也是社群在 0.10.1.0 版本引入 max.poll.interval.ms 引數,將這部分含義從 session.timeout.ms 中剝離出來的原因之一。

3,減少下游系統一次性消費的訊息總數。這取決於 Consumer 端引數 max.poll.records 的值。當前該引數的預設值是 500 條,表明呼叫一次 KafkaConsumer.poll 方法,最多返回 500 條訊息。可以說,該引數規定了單次 poll 方法能夠返回的訊息總數的上限。如果前兩種方法對你都不適用的話,降低此引數值是避免 CommitFailedException 異常最簡單的手段。

4,下游系統使用多執行緒來加速消費。這應該算是“最高階”同時也是最難實現的解決辦法了。具體的思路就是,讓下游系統手動建立多個消費執行緒處理 poll 方法返回的一批訊息。之前你使用 Kafka Consumer 消費資料更多是單執行緒的,所以當消費速度無法匹及 Kafka Consumer 訊息返回的速度時,它就會丟擲 CommitFailedException 異常。如果是多執行緒,你就可以靈活地控制執行緒數量,隨時調整消費承載能力,再配以目前多核的硬體條件,該方法可謂是防止 CommitFailedException 最高檔的解決之道。事實上,很多主流的大資料流處理框架使用的都是這個方法,比如 Apache Flink 在整合 Kafka 時,就是建立了多個 KafkaConsumerThread 執行緒,自行處理多執行緒間的資料消費。不過,凡事有利就有弊,這個方法實現起來並不容易,特別是在多個執行緒間如何處理位移提交這個問題上,更是極容易出錯。在專欄後面的內容中,我將著重和你討論一下多執行緒消費的實現方案。

除了調整 max.poll.interval.ms 之外,你還可以選擇調整 max.poll.records 值,減少每次 poll 方法返回的訊息數。還拿剛才的例子來說,你可以設定 max.poll.records 值為 150,甚至更少,這樣每批訊息的總消費時長不會超過 300 秒(150*2=300),即 max.poll.interval.ms 的預設值 5 分鐘。這種減少 max.poll.records 值的做法就屬於上面提到的方法 3。

場景二

Kafka Java Consumer 端還提供了一個名為 Standalone Consumer 的獨立消費者。它沒有消費者組的概念,每個消費者例項都是獨立工作的,彼此之間毫無聯絡。不過,你需要注意的是,獨立消費者的位移提交機制和消費者組是一樣的,因此獨立消費者的位移提交也必須遵守之前說的那些規定,比如獨立消費者也要指定 group.id 引數才能提交位移。你可能會覺得奇怪,既然是獨立消費者,為什麼還要指定 group.id 呢?沒辦法,誰讓社群就是這麼設計的呢?總之,消費者組和獨立消費者在使用之前都要指定 group.id。

現在問題來了,如果你的應用中同時出現了設定相同 group.id 值的消費者組程式和獨立消費者程式,那麼當獨立消費者程式手動提交位移時,Kafka 就會立即丟擲 CommitFailedException 異常,因為 Kafka 無法識別這個具有相同 group.id 的消費者例項,於是就向它返回一個錯誤,表明它不是消費者組內合法的成員。

消費者消費進度監控

對於 Kafka 消費者來說,最重要的事情就是監控它們的消費進度了,或者說是監控它們消費的滯後程度。這個滯後程度有個專門的名稱:消費者 Lag 或 Consumer Lag。所謂滯後程度,就是指消費者當前落後於生產者的程度。比方說,Kafka 生產者向某主題成功生產了 100 萬條訊息,你的消費者當前消費了 80 萬條訊息,那麼我們就說你的消費者滯後了 20 萬條訊息,即 Lag 等於 20 萬。通常來說,Lag 的單位是訊息數,而且我們一般是在主題這個級別上討論 Lag 的,但實際上,Kafka 監控 Lag 的層級是在分割槽上的。如果要計算主題級別的,你需要手動彙總所有主題分割槽的 Lag,將它們累加起來,合併成最終的 Lag 值。

既然消費進度這麼重要,我們應該怎麼監控它呢?簡單來說,有 3 種方法。

使用 Kafka 自帶的命令列工具 kafka-consumer-groups 指令碼。

使用 Kafka Java Consumer API 程式設計。

使用 Kafka 自帶的 JMX 監控指標。

接下來,我們分別來討論下這 3 種方法。

kafka自帶命令

我們先來了解下第一種方法:使用 Kafka 自帶的命令列工具 bin/kafka-consumer-groups.sh(bat)。kafka-consumer-groups 指令碼是 Kafka 為我們提供的最直接的監控消費者消費進度的工具。當然,除了監控 Lag 之外,它還有其他的功能。今天,我們主要討論如何使用它來監控 Lag。使用 kafka-consumer-groups 指令碼很簡單。該指令碼位於 Kafka 安裝目錄的 bin 子目錄下,我們可以通過下面的命令來檢視某個給定消費者的 Lag 值:

1 $ bin/kafka-consumer-groups.sh --bootstrap-server <Kafka broker連線資訊> --describe --group <group名稱>

Kafka 連線資訊就是 < 主機名:埠 > 對,而 group 名稱就是你的消費者程式中設定的 group.id 值。我舉個實際的例子來說明具體的用法,請看下面這張圖的輸出:

在執行命令時,我指定了 Kafka 叢集的連線資訊,即 localhost:9092。另外,我還設定了要查詢的消費者組名:testgroup。kafka-consumer-groups 指令碼的輸出資訊很豐富。首先,它會按照消費者組訂閱主題的分割槽進行展示,每個分割槽一行資料;其次,除了主題、分割槽等資訊外,它會彙報每個分割槽當前最新生產的訊息的位移值(即 LOG-END-OFFSET 列值)、該消費者組當前最新消費訊息的位移值(即 CURRENT-OFFSET 值)、LAG 值(前兩者的差值)、消費者例項 ID、消費者連線 Broker 的主機名以及消費者的 CLIENT-ID 資訊。

Kafka Java Consumer API

下面這段程式碼展示瞭如何利用 Consumer 端 API 監控給定消費者組的 Lag 值:

 1 public static Map<TopicPartition, Long> lagOf(String groupID, String bootstrapServers) throws TimeoutException {
 2         Properties props = new Properties();
 3         props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
 4         try (AdminClient client = AdminClient.create(props)) {
 5             ListConsumerGroupOffsetsResult result = client.listConsumerGroupOffsets(groupID);
 6             try {
 7                 Map<TopicPartition, OffsetAndMetadata> consumedOffsets = result.partitionsToOffsetAndMetadata().get(10, TimeUnit.SECONDS);
 8                 props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); // 禁止自動提交位移
 9                 props.put(ConsumerConfig.GROUP_ID_CONFIG, groupID);
10                 props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
11                 props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
12                 try (final KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props)) {
13                     Map<TopicPartition, Long> endOffsets = consumer.endOffsets(consumedOffsets.keySet());
14                     return endOffsets.entrySet().stream().collect(Collectors.toMap(entry -> entry.getKey(),
15                             entry -> entry.getValue() - consumedOffsets.get(entry.getKey()).offset()));
16                 }
17             } catch (InterruptedException e) {
18                 Thread.currentThread().interrupt();
19                 // 處理中斷異常
20                 // ...
21                 return Collections.emptyMap();
22             } catch (ExecutionException e) {
23                 // 處理ExecutionException
24                 // ...
25                 return Collections.emptyMap();
26             } catch (TimeoutException e) {
27                 throw new TimeoutException("Timed out when getting lag for consumer group " + groupID);
28             }
29         }
30     }

這段程式碼送給你,你可以將 lagOf 方法直接應用於你的生產環境,以實現程式化監控消費者 Lag 的目的。不過請注意,這段程式碼只適用於 Kafka 2.0.0 及以上的版本,2.0.0 之前的版本中沒有 AdminClient.listConsumerGroupOffsets 方法。

Kafka JMX 監控指標

上面這兩種方式,都可以很方便地查詢到給定消費者組的 Lag 資訊。但在很多實際監控場景中,我們藉助的往往是現成的監控框架。如果是這種情況,以上這兩種辦法就不怎麼管用了,因為它們都不能整合進已有的監控框架中,如 Zabbix 或 Grafana。下面我們就來看第三種方法,使用 Kafka 預設提供的 JMX 監控指標來監控消費者的 Lag 值。當前,Kafka 消費者提供了一個名為 kafka.consumer:type=consumer-fetch-manager-metrics,client-id=“{client-id}”的 JMX 指標,裡面有很多屬性。和我們今天所講內容相關的有兩組屬性:records-lag-max 和 records-lead-min,它們分別表示此消費者在測試視窗時間內曾經達到的最大的 Lag 值和最小的 Lead 值。

試想一下,監控到 Lag 越來越大,可能只會給你一個感受,那就是消費者程式變得越來越慢了,至少是追不上生產者程式了,除此之外,你可能什麼都不會做。畢竟,有時候這也是能夠接受的。但反過來,一旦你監測到 Lead 越來越小,甚至是快接近於 0 了,你就一定要小心了,這可能預示著消費者端要丟訊息了。

為什麼?我們知道 Kafka 的訊息是有留存時間設定的,預設是 1 周,也就是說 Kafka 預設刪除 1 周前的資料。倘若你的消費者程式足夠慢,慢到它要消費的資料快被 Kafka 刪除了,這時你就必須立即處理,否則一定會出現訊息被刪除,從而導致消費者程式重新調整位移值的情形。這可能產生兩個後果:一個是消費者從頭消費一遍資料,另一個是消費者從最新的訊息位移處開始消費,之前沒來得及消費的訊息全部被跳過了,從而造成丟訊息的假象。

總結

以後關於kafka系列的總結大部分來自Geek Time的課件,大家可以自行關鍵字搜尋。