Kafka 簡介
好記憶不如爛筆頭,能夠記錄點什麼,就趕緊記錄點 , 方便知識的積累 !
Apache Kafka 是一款流行的分散式資料流平臺,它已經廣泛地被諸如 New Relic(資料智慧平臺)、Uber、Square(移動支付公司)等大型公司用來構建可擴充套件的、高吞吐量的、且高可靠的實時資料流系統。
例如,在 New Relic 的生產環境中,Kafka 群集每秒能夠處理超過 1500 萬條訊息,而且其資料聚合率接近 1Tbps。
可見,Kafka 大幅簡化了對於資料流的處理,因此它也獲得了眾多應用開發人員和資料管理專家的青睞。
然而,在大型系統中 Kafka 的應用會比較複雜。如果您的 Consumers 無法跟上資料流的話,各種訊息往往在未被檢視之前就已經消失掉了。
同時,它在自動化資料保留方面的限制,高流量的釋出+訂閱(publish-subscribe,pub/sub)模式等,可能都會影響到您系統的效能。
可以毫不誇張地說,如果那些存放著資料流的系統無法按需擴容、或穩定性不可靠的話,估計您經常會寢食難安。
為了減少上述複雜性,我在此分享 New Relic 公司為 Kafka 叢集在應對高吞吐量方面的 20 項最佳實踐。
我將從如下四個方面進行展開:
-
Partitions(分割槽)
-
Consumers(消費者)
-
Producers(生產者)
-
Brokers(代理)
快速瞭解 Kafka 的概念與架構
Kafka 是一種高效的分散式訊息系統。在效能上,它具有內建的資料冗餘度與彈性,也具有高吞吐能力和可擴充套件性。
在功能上,它支援自動化的資料儲存限制,能夠以“流”的方式為應用提供資料轉換,以及按照“鍵-值(key-value)”的建模關係“壓縮”資料流。
要了解各種最佳實踐,您需要首先熟悉如下關鍵術語:
Message(訊息)
Kafka 中的一條記錄或資料單位。每條訊息都有一個鍵和對應的一個值,有時還會有可選的訊息頭。
Producer(生產者)
Producer 將訊息釋出到 Kafka 的 topics 上。Producer 決定向 topic 分割槽的釋出方式,如:輪詢的隨機方法、或基於訊息鍵(key)的分割槽演算法。
Broker(代理)
Kafka 以分散式系統或叢集的方式執行。那麼群集中的每個節點稱為一個 Broker。
Topic(主題)
Topic 是那些被髮布的資料記錄或訊息的一種類別。消費者通過訂閱Topic,來讀取寫給它們的資料。
Topic Partition(主題分割槽)
不同的 Topic 被分為不同的分割槽,而每一條訊息都會被分配一個 Offset,通常每個分割槽都會被複制至少一到兩次。
每個分割槽都有一個 Leader 和存放在各個 Follower 上的一到多個副本(即:資料的副本),此法可防止某個 Broker 的失效。
群集中的所有 Broker 都可以作為 Leader 和 Follower,但是一個 Broker 最多隻能有一個 Topic Partition 的副本。Leader 可被用來進行所有的讀寫操作。
Offset(偏移量)
單個分割槽中的每一條訊息都被分配一個 Offset,它是一個單調遞增的整型數,可用來作為分割槽中訊息的唯一識別符號。
Consumer(消費者)
Consumer 通過訂閱 Topic partition,來讀取 Kafka 的各種 Topic 訊息。然後,消費類應用處理會收到訊息,以完成指定的工作。
Consumer group(消費組)
Consumer 可以按照 Consumer group 進行邏輯劃分。Topic Partition 被均衡地分配給組中的所有 Consumers。
因此,在同一個 Consumer group 中,所有的 Consumer 都以負載均衡的方式運作。
換言之,同一組中的每一個 Consumer 都能看到每一條訊息。如果某個 Consumer 處於“離線”狀態的話,那麼該分割槽將會被分配給同組中的另一個 Consumer。這就是所謂的“再均衡(rebalance)”。
當然,如果組中的 Consumer 多於分割槽數,則某些 Consumer 將會處於閒置的狀態。
相反,如果組中的 Consumer 少於分割槽數,則某些 Consumer 會獲得來自一個以上分割槽的訊息。
Lag(延遲)
當 Consumer 的速度跟不上訊息的產生速度時,Consumer 就會因為無法從分割槽中讀取訊息,而產生延遲。
延遲表示為分割槽頭後面的 Offset 數量。從延遲狀態(到“追趕上來”)恢復正常所需要的時間,取決於 Consumer 每秒能夠應對的訊息速度。
其公式如下:time = messages / (consume rate per second - produce rate per second)
針對 Partitions 的最佳實踐
①瞭解分割槽的資料速率,以確保提供合適的資料儲存空間
此處所謂“分割槽的資料速率”是指資料的生成速率。換言之,它是由“平均訊息大小”乘以“每秒訊息數”得出的資料速率決定了在給定時間內,所能保證的資料儲存空間的大小(以位元組為單位)。
如果您不知道資料速率的話,則無法正確地計算出滿足基於給定時間跨度的資料,所需要儲存的空間大小。
同時,資料速率也能夠標識出單個 Consumer 在不產生延時的情況下,所需要支援的最低效能值。
②除非您有其他架構上的需要,否則在寫 Topic 時請使用隨機分割槽
在您進行大型操作時,各個分割槽在資料速率上的參差不齊是非常難以管理的。
其原因來自於如下三個方面:
-
首先,“熱”(有較高吞吐量)分割槽上的 Consumer 勢必會比同組中的其他 Consumer 處理更多的訊息,因此很可能會導致出現在處理上和網路上的瓶頸。
-
其次,那些為具有最高資料速率的分割槽,所配置的最大保留空間,會導致Topic 中其他分割槽的磁碟使用量也做相應地增長。
-
第三,根據分割槽的 Leader 關係所實施的最佳均衡方案,比簡單地將 Leader 關係分散到所有 Broker 上,要更為複雜。在同一 Topic 中,“熱”分割槽會“承載”10 倍於其他分割槽的權重。
有關 Topic Partition 的使用,可以參閱《Kafka Topic Partition的各種有效策略》網頁連結。
針對 Consumers 的最佳實踐
③如果 Consumers 執行的是比 Kafka 0.10 還要舊的版本,那麼請馬上升級
在 0.8.x 版中,Consumer 使用 Apache ZooKeeper 來協調 Consumer group,而許多已知的 Bug 會導致其長期處於再均衡狀態,或是直接導致再均衡演算法的失敗(我們稱之為“再均衡風暴”)。
因此在再均衡期間,一個或多個分割槽會被分配給同一組中的每個 Consumer。
而在再均衡風暴中,分割槽的所有權會持續在各個 Consumers 之間流轉,這反而阻礙了任何一個 Consumer 去真正獲取分割槽的所有權。
④調優 Consumer 的套接字緩衝區(socket buffers),以應對資料的高速流入
在 Kafka 的 0.10.x 版本中,引數 receive.buffer.bytes 的預設值為 64KB。而在 Kafka 的 0.8.x 版本中,引數 socket.receive.buffer.bytes 的預設值為 100KB。
這兩個預設值對於高吞吐量的環境而言都太小了,特別是如果 Broker 和 Consumer 之間的網路頻寬延遲積(bandwidth-delay product)大於區域網(local areanetwork,LAN)時。
對於延遲為 1 毫秒或更多的高頻寬的網路(如 10Gbps 或更高),請考慮將套接字緩衝區設定為 8 或 16MB。
如果您的記憶體不足,也至少考慮設定為 1MB。當然,您也可以設定為 -1,它會讓底層作業系統根據網路的實際情況,去調整緩衝區的大小。
但是,對於需要啟動“熱”分割槽的 Consumers 來說,自動調整可能不會那麼快。
⑤設計具有高吞吐量的 Consumers,以便按需實施背壓(back-pressure)
通常,我們應該保證系統只去處理其能力範圍內的資料,而不要超負荷“消費”,進而導致程序中斷“掛起”,或出現 Consume group 的溢位。
如果是在 Java 虛擬機器(JVM)中執行,Consumers 應當使用固定大小的緩衝區,而且最好是使用堆外記憶體(off-heap)。請參見 Disruptor 模式:網頁連結
固定大小的緩衝區能夠阻止 Consumer 將過多的資料拉到堆疊上,以至於 JVM 花費掉其所有的時間去執行垃圾回收,進而無法履行其處理訊息的本質工作。
⑥在 JVM 上執行各種 Consumers 時,請警惕垃圾回收對它們可能產生的影響
例如,長時間垃圾回收的停滯,可能導致 ZooKeeper 的會話被丟棄、或 Consumer group 處於再均衡狀態。
對於 Broker 來說也如此,如果垃圾回收停滯的時間太長,則會產生叢集掉線的風險。
針對 Producers 的最佳實踐
⑦配置 Producer,以等待各種確認
籍此 Producer 能夠獲知訊息是否真正被髮送到了 Broker 的分割槽上。在 Kafka 的 0.10.x 版本上,其設定是 Acks;而在 0.8.x 版本上,則為 request.required.acks。
Kafka 通過複製,來提供容錯功能,因此單個節點的故障、或分割槽 Leader 關係的更改不會影響到系統的可用性。
如果您沒有用 Acks 來配置 Producer(或稱“fireand forget”)的話,則訊息可能會悄然丟失。
⑧為各個 Producer 配置 Retries
其預設值為 3,當然是非常低的。不過,正確的設定值取決於您的應用程式,即:就那些對於資料丟失零容忍的應用而言,請考慮設定為 Integer.MAX_VALUE(有效且最大)。
這樣將能夠應對 Broker 的 Leader 分割槽出現無法立刻響應 Produce 請求的情況。
⑨為高吞吐量的 Producer,調優緩衝區的大小
特別是 buffer.memory 和 batch.size(以位元組為單位)。由於 batch.size 是按照分割槽設定的,而 Producer 的效能和記憶體的使用量,都可以與 Topic 中的分割槽數量相關聯。
因此,此處的設定值將取決於如下幾個因素:
-
Producer 資料速率(訊息的大小和數量)
-
要生成的分割槽數
-
可用的記憶體量
請記住,將緩衝區調大並不總是好事,如果 Producer 由於某種原因而失效了(例如,某個 Leader 的響應速度比確認還要慢),那麼在堆內記憶體(on-heap)中的緩衝的資料量越多,其需要回收的垃圾也就越多。
⑩檢測應用程式,以跟蹤諸如生成的訊息數、平均訊息大小、以及已使用的訊息數等指標
針對 Brokers 的最佳實踐
⑪在各個 Brokers 上,請壓縮 Topics 所需的記憶體和 CPU 資源。
日誌壓縮(請參見網頁連結)需要各個 Broker 上的堆疊(記憶體)和 CPU 週期都能成功地配合實現而如果讓那些失敗的日誌壓縮資料持續增長的話,則會給 Brokers 分割槽帶來風險。
您可以在 Broker 上調整 log.cleaner.dedupe.buffer.size 和 log.cleaner.threads 這兩個引數,但是請記住,這兩個值都會影響到各個 Brokers 上的堆疊使用。
如果某個 Broker 丟擲 OutOfMemoryError 異常,那麼它將會被關閉、並可能造成資料的丟失。
而緩衝區的大小和執行緒的計數,則取決於需要被清除的 Topic Partition 數量、以及這些分割槽中訊息的資料速率與金鑰的大小。
對於 Kafka 的 0.10.2.1 版本而言,通過 ERROR 條目來監控日誌清理程式的日誌檔案,是檢測其執行緒可能出現問題的最可靠方法。
⑫通過網路吞吐量來監控 Brokers
請監控發向(transmit,TX)和收向(receive,RX)的流量,以及磁碟的 I/O、磁碟的空間、以及 CPU 的使用率,而且容量規劃是維護群集整體效能的關鍵步驟。
⑬在群集的各個 Brokers 之間分配分割槽的 Leader 關係
Leader 通常會需要大量的網路 I/O 資源。例如,當我們將複製因子(replication factor)配置為 3、並執行起來時。
Leader 必須首先獲取分割槽的資料,然後將兩套副本傳送給另兩個 Followers,進而再傳輸到多個需要該資料的 Consumers 上。
因此在該例子中,單個 Leader 所使用的網路 I/O,至少是 Follower 的四倍。而且,Leader 還可能需要對磁碟進行讀操作,而 Follower 只需進行寫操作。
⑭不要忽略監控 Brokers 的 in-sync replica(ISR)shrinks、under-replicatedpartitions 和 unpreferred leaders
這些都是叢集中潛在問題的跡象。例如,單個分割槽頻繁出現 ISR 收縮,則暗示著該分割槽的資料速率超過了 Leader 的能力,已無法為 Consumer 和其他副本執行緒提供服務了。
⑮按需修改 Apache Log4j 的各種屬性
詳細內容可以參考:網頁連結
Kafka 的 Broker 日誌記錄會耗費大量的磁碟空間,但是我們卻不能完全關閉它。
因為有時在發生事故之後,需要重建事件序列,那麼 Broker 日誌就會是我們最好的、甚至是唯一的方法。
⑯禁用 Topic 的自動建立,或針對那些未被使用的 Topics 建立清除策略
例如,在設定的 x 天內,如果未出現新的訊息,您應該考慮該 Topic 是否已經失效,並將其從群集中予以刪除。此舉可避免您花時間去管理群集中被額外建立的元資料。
⑰對於那些具有持續高吞吐量的 Brokers,請提供足夠的記憶體,以避免它們從磁碟子系統中進行讀操作
我們應儘可能地直接從作業系統的快取中直接獲取分割槽的資料。然而,這就意味著您必須確保自己的 Consumers 能夠跟得上“節奏”,而對於那些延遲的 Consumer 就只能強制 Broker 從磁碟中讀取了。
⑱對於具有高吞吐量服務級別目標(service level objectives,SLOs)的大型群集,請考慮為 Brokers 的子集隔離出不同的 Topic
至於如何確定需要隔離的 Topics,則完全取決於您自己的業務需要。例如,您有一些使用相同群集的聯機事務處理(multipleonline transaction processing,OLTP)系統。
那麼將每個系統的 Topics 隔離到不同 Brokers 子集中,則能夠有助於限制潛在事件的影響半徑。
⑲在舊的客戶端上使用新的 Topic 訊息格式。應當代替客戶端,在各個 Brokers 上載入額外的格式轉換服務
當然,最好還是要儘量避免這種情況的發生。
⑳不要錯誤地認為在本地主機上測試好 Broker,就能代表生產環境中的真實效能了
要知道,如果使用複製因子為 1,並在環回介面上對分割槽所做的測試,是與大多數生產環境截然不同的。
在環回介面上網路延遲幾乎可以被忽略的,而在不涉及到複製的情況下,接收 Leader 確認所需的時間則同樣會出現巨大的差異。
總結:
希望上述各項建議能夠有助於您更有效地去使用 Kafka。如果您想提高自己在 Kafka 方面的專業知識,請進一步查閱 Kafka 配套文件中的“操作”部分,其中包含了有關操作群集等實用資訊。