1. 程式人生 > >關於Kafka日誌留存策略的討論

關於Kafka日誌留存策略的討論

關於Kafka日誌留存(log retention)策略的介紹,網上已有很多文章。不過目前其策略已然發生了一些變化,故本文針對較新版本的Kafka做一次統一的討論。如果沒有顯式說明,本文一律以Kafka 1.0.0作為分析物件。

所謂日誌留存策略,就是Kafka儲存topic資料的規則,我將按照以下幾個方面分別介紹留存策略:

  • 留存策略型別
  • 留存機制及其工作原理

一、留存策略型別

目前,與日誌留存方式相關的策略型別主要有兩種:delete和compact。這兩種留存方式的機制完全不同。本文主要討論針對delete型別的留存策略。使用者可以通過設定broker端引數log.cleanup.policy來指定叢集上所有topic預設的策略型別。另外也可以通過topic級別引數cleanup.policy來為某些topic設定不同於預設值的策略型別。當前log.cleanup.policy引數的預設值是[delete,compact],這是一個list型別的引數,表示叢集上所有topic會同時開啟delete和compact兩種留存策略——這是0.10.1.0新引入的功能,在0.10.1.0之前,該引數只能兩選一,不能同時兼顧,但在實際使用中很多使用者都抱怨compact型別的topic存在過期key訊息未刪除的情況,故社群修改了該引數配置,允許一個topic同時開啟兩種留存策略。

再次強調下, 本文只討論delete型別的留存策略。

二、留存機制及其工作原理

在開始詳細介紹各種留存機制之前,先簡要說下Kafka是如何處理日誌留存的。每個Kafka broker啟動時,都會在後臺開啟一個定時任務,定期地去檢查並執行所有topic日誌留存,這個定時任務觸發的時間週期由broker端引數log.retention.check.interval.ms控制,預設是5分鐘,即每臺broker每5分鐘都會嘗試去檢查一下是否有可以刪除的日誌。因此如果你要縮短這個間隔,只需要調小log.retention.check.interval.ms即可。

鑑於日誌留存和日誌刪除實際上是一個問題的兩個方面,因而我們下面討論的是關於Kafka根據什麼規則來刪除日誌。但有一點要強調一下,待刪除的標的是日誌段,即LogSegment,也就是以.log結尾的一個個檔案,而非整個資料夾。另外還有一點也很重要,當前日誌段(active logsegment)是永遠不會被刪除的,不管使用者配置了哪種留存機制。

 

當前留存機制共有3種:

  1. 基於空間維度
  2. 基於時間維度
  3. 基於起始位移維度

前兩種策略相信大家已經耳熟能詳,而第三種策略由於新加入的時間不長,目前網上對其的介紹並不多。我們一個一個來看。

2.1 基於空間維度

也稱size-based retention,指的是Kafka定期為那些超過磁碟空間閾值的topic進行日誌段的刪除。這個閾值由broker端引數log.retention.bytes和topic級別引數retention.bytes控制,預設是-1,表示Kafka當前未開啟這個留存機制,即不管topic日誌量漲到多少,Kafka都不視其為“超過閾值”。如果使用者要開啟這種留存機制,必須顯式設定log.retention.bytes(或retention.bytes)。 

一旦使用者設定了閾值,那麼Kafka就會在定時任務中嘗試比較當前日誌量總大小是否超過閾值至少一個日誌段的大小。這裡所說的總大小是指所有日誌段檔案的大小,不包括索引檔案的大小!如果是則會嘗試從最老的日誌段檔案開始刪起。注意這裡的“超過閾值至少一個日誌段的大小”,這就是說超過閾值的部分必須要大於一個日誌段的大小,否則不會進行刪除的,原因就是因為刪除的標的是日誌段檔案——即檔案只能被當做一個整體進行刪除,無法刪除部分內容。

舉個例子來說明,假設日誌段大小是700MB,當前分割槽共有4個日誌段檔案,大小分別是700MB,700MB,700MB和1234B——顯然1234B那個檔案就是active日誌段。此時該分割槽總的日誌大小是3*700MB+1234B=2100MB+1234B,如果閾值設定為2000MB,那麼超出閾值的部分就是100MB+1234B,小於日誌段大小700MB,故Kafka不會執行任何刪除操作,即使總大小已經超過了閾值;反之如果閾值設定為1400MB,那麼超過閾值的部分就是700MB+1234B > 700MB,此時Kafka會刪除最老的那個日誌段檔案。

2.2 基於時間維度

也稱time-based retention,指的是Kafka定期未那些超過時間閾值的topic進行日誌段刪除操作。這個閾值由broker端引數log.retention.ms、log.retention.mintues、log.retention.hours以及topic級別引數retention.ms控制。如果同時設定了log.retention.ms、log.retention.mintues、log.retention.hours,以log.retention.ms優先順序為最高,log.retention.mintues次之,log.retention.hours最次。當前這三個引數的預設值依次是null, null和168,故Kafka為每個topic預設儲存7天的日誌。

這裡需要討論下這“7天”是如何界定的?在0.10.0.0之前,Kafka每次檢查時都會將當前時間與每個日誌段檔案的最新修改時間做比較,如果兩者的差值超過了上面設定的閾值(比如上面說的7天),那麼Kafka就會嘗試刪除該檔案。不過這種界定方法是有問題的,因為檔案的最新修改時間是可變動的——比如使用者在終端通過touch命令檢視該日誌段檔案或Kafka對該檔案切分時都可能導致最新修改時間的變化從而擾亂了該規則的判定,因此自0.10.0.0版本起,Kafka在訊息體中引入了時間戳欄位(當然不是單純為了修復這個問題),並且為每個日誌段檔案都維護一個最大時間戳欄位。通過將當前時間與該最大時間戳欄位進行比較來判定是否過期。使用當前最大時間戳欄位的好處在於它對使用者是透明的,使用者在外部無法直接修改它,故不會造成判定上的混亂。

最大時間戳欄位的更新機制也很簡單,每次日誌段寫入新的訊息時,都會嘗試更新該欄位。因為訊息時間戳通常是遞增的,故每次寫入操作時都會保證最大時間戳欄位是會被更新的,而一旦一個日誌段寫滿了被切分之後它就不再接收任何新的訊息,其最大時間戳欄位的值也將保持不變。倘若該值距離當前時間超過了設定的閾值,那麼該日誌段檔案就會被刪除。

2.3 基於起始位移維度

使用者對前兩種留存機制實際上是相當熟悉的,下面我們討論下第三種留存機制:基於日誌起始位移(log start offset)。這實際上是0.11.0.0版本新增加的功能。其實增加這個功能的初衷主要是為了Kafka流處理應用——在流處理應用中存在著大量的中間訊息,這些訊息可能已經被處理過了,但依然儲存在topic日誌中,佔用了大量的磁碟空間。如果通過設定基於時間維度的機制來刪除這些訊息就需要使用者設定很小的時間閾值,這可能導致這些訊息尚未被下游操作運算元(operator)處理就被刪除;如果設定得過大,則極大地增加了空間佔用。故社群在0.11.0.0引入了第三種留存機制:基於起始位移

 

所謂起始位移,就是指分割槽日誌的當前起始位移——注意它是分割槽級別的值,而非日誌段級別。故每個分割槽都只維護一個起始位移值。該值在初始化時被設定為最老日誌段檔案的基礎位移(base offset),隨著日誌段的不斷刪除,該值會被更新當前最老日誌段的基礎位移。另外Kafka提供提供了一個指令碼命令幫助使用者設定指定分割槽的起始位移:kafka-delete-records.sh。

 

該留存機制是預設開啟的,不需要使用者任何配置。Kafka會為每個日誌段做這樣的檢查:1. 獲取日誌段A的下一個日誌段B的基礎位移;2. 如果該值小於分割槽當前起始位移則刪除此日誌段A。

依然拿例子還說明,假設我有一個topic,名字是test,該topic只有1個分割槽,該分割槽下有5個日誌段檔案,分別是A1.log, A2.log, A3.log, A4.log和A5.log,其中A5.log是active日誌段。這5個日誌段檔案中訊息範圍分別是0~9999,10000~19999,20000~29999,30000~39999和40000~43210(A5未寫滿)。如果此時我確信前3個日誌段檔案中的訊息已經被處理過了,於是想刪除這3個日誌段,此時我應該怎麼做呢?由於我無法預知這些日誌段檔案產生的速度以及被消費的速度,因此不管是基於時間的刪除機制還是基於空間的刪除機制都是不適用的。此時我便可以使用kafka-delete-records.sh指令碼將該分割槽的起始位移設定為A4.log的起始位移,即40000。為了做到這點,我需要首先建立一個JSON檔案a.json,內容如下:

{"partitions":[{"topic": "test", "partition": 0,"offset": 40000}],"version":1}

然後執行下列命令:

bin/kafka-delete-records.sh --bootstrap-server localhost:9092 --offset-json-file a.json 

如果一切正常,應該可以看到類似於這樣的輸出:

Executing records delete operation

Records delete operation completed:

partition: test-0 low_watermark: 40000

此時test的分割槽0的起始位移被手動調整為40000,那麼理論上所有最大訊息位移< 40000的日誌段都可以被刪除了。有了這個機制,使用者可以實現更為靈活的留存策略。

 

以上就是關於當前Kafka針對於delete留存型別的topic的3種留存機制。也許在未來社群會增加更多的留存策略,我們拭目以待~