Kafka 日誌儲存
在進行詳解之前,我想先宣告一下,本次我們進行講解說明的是 Kafka 訊息儲存的資訊檔案內容,不是所謂的 Kafka 伺服器執行產生的日誌檔案,這一點希望大家清楚。
Kafka 訊息是以主題為單位進行歸類,各個主題之間是彼此獨立的,互不影響。每個主題又可以分為一個或多個分割槽。每個分割槽各自存在一個記錄訊息資料的日誌檔案。也就是該文要著重關注的內容。我們根據如下的圖進行進一步說明:
圖中,建立了一個 demo-topic 主題,其存在 7 個 Parition,對應的每個 Parition 下存在一個 [Topic-Parition] 命名的訊息日誌檔案。在理想情況下,資料流量分攤到各個 Parition 中,實現了負載均衡的效果。在分割槽日誌檔案中,你會發現很多型別的檔案,比如:.index、.timestamp、.log、.snapshot 等,其中,檔名一致的檔案集合就稱為 LogSement。我們先留有這樣的一個整體的日誌結構概念,接下來我們一一的進行詳細的說明其中的設計。
LogSegment
我們已經知道分割槽日誌檔案中包含很多的 LogSegment ,Kafka 日誌追加是順序寫入的,LogSegment 可以減小日誌檔案的大小,進行日誌刪除的時候和資料查詢的時候可以快速定位。同時,ActiveLogSegment 也就是活躍的日誌分段擁有檔案擁有寫入許可權,其餘的 LogSegment 只有只讀的許可權。
日誌檔案存在多種字尾檔案,重點需要關注 .index、.timestamp、.log 三種類型。其他的日誌型別功能作用,請查詢下面圖表:
每個 LogSegment 都有一個基準偏移量,用來表示當前 LogSegment 中第一條訊息的 offset。偏移量是一個 64 位的長整形數,固定是20位數字,長度未達到,用 0 進行填補,索引檔案和日誌檔案都由該作為檔名命名規則(00000000000000000000.index、00000000000000000000.timestamp、00000000000000000000.log)。特別說明一下,如果日誌檔名為 00000000000000000121.log ,則當前日誌檔案的一條資料偏移量就是 121,偏移量是從 0 開始的。
如果想要檢視相應檔案內容可以通過 kafka-run-class.sh 指令碼檢視 .log :
/data/kafka/bin/kafka-run-class.sh kafka.tools.DumpLogSegments --files ./00000000000000000000.log
2.0 中可以使用 kafka-dump-log.sh 查 看.index 檔案
/data/kafka/bin/kafka-dump-log.sh --files ./00000000000000000000.index
日誌與索引檔案
偏移量索引檔案用於記錄訊息偏移量與實體地址之間的對映關係。時間戳索引檔案則根據時間戳查詢對應的偏移量。
Kafka 中的索引檔案是以稀疏索引的方式構造訊息的索引,他並不保證每一個訊息在索引檔案中都有對應的索引項。每當寫入一定量的訊息時,偏移量索引檔案和時間戳索引檔案分別增加一個偏移量索引項和時間戳索引項,通過修改 log.index.interval.bytes 的值,改變索引項的密度。
切分檔案
從上文中可知,日誌檔案和索引檔案都會存在多個檔案,組成多個 SegmentLog,那麼其切分的規則是怎樣的呢?
當滿足如下幾個條件中的其中之一,就會觸發檔案的切分:
- 當前日誌分段檔案的大小超過了 broker 端引數 log.segment.bytes 配置的值。log.segment.bytes 引數的預設值為 1073741824,即 1GB。
- 當前日誌分段中訊息的最大時間戳與當前系統的時間戳的差值大於 log.roll.ms 或 log.roll.hours 引數配置的值。如果同時配置了 log.roll.ms 和 log.roll.hours 引數,那麼 log.roll.ms 的優先順序高。預設情況下,只配置了 log.roll.hours 引數,其值為168,即 7 天。
- 偏移量索引檔案或時間戳索引檔案的大小達到 broker 端引數 log.index.size.max.bytes 配置的值。log.index.size.max.bytes 的預設值為 10485760,即 10MB。
- 追加的訊息的偏移量與當前日誌分段的偏移量之間的差值大於 Integer.MAX_VALUE,即要追加的訊息的偏移量不能轉變為相對偏移量。
為什麼是 Integer.MAX_VALUE ?
在偏移量索引檔案中,每個索引項共佔用 8 個位元組,並分為兩部分。相對偏移量和實體地址。
相對偏移量:表示訊息相對與基準偏移量的偏移量,佔 4 個位元組
實體地址:訊息在日誌分段檔案中對應的物理位置,也佔 4 個位元組
4 個位元組剛好對應 Integer.MAX_VALUE ,如果大於 Integer.MAX_VALUE ,則不能用 4 個位元組進行表示了。
索引檔案切分過程
索引檔案會根據 log.index.size.max.bytes 值進行預先分配空間,即檔案建立的時候就是最大值,當真正的進行索引檔案切分的時候,才會將其裁剪到實際資料大小的檔案。這一點是跟日誌檔案有所區別的地方。其意義降低了程式碼邏輯的複雜性。
查詢訊息
offset 查詢
偏移量索引由相對偏移量和實體地址組成。
可以通過如下命令解析.index 檔案
/data/kafka/bin/kafka-dump-log.sh --files ./00000000000000000000.index offset:0 position:0 offset:20 position:320 offset:43 position:1220
注意:offset 與 position 沒有直接關係哦,由於存在資料刪除和日誌清理。
e.g. 如何檢視 偏移量為 23 的訊息?
Kafka 中存在一個 ConcurrentSkipListMap 來儲存在每個日誌分段,通過跳躍表方式,定位到在 00000000000000000000.index ,通過二分法在偏移量索引檔案中找到不大於 23 的最大索引項,即 offset 20 那欄,然後從日誌分段檔案中的物理位置為320 開始順序查詢偏移量為 23 的訊息。
時間戳方式查詢
在上文已經有所提及,通過時間戳方式進行查詢訊息,需要通過查詢時間戳索引和偏移量索引兩個檔案。
時間戳索引索引格式
e.g. 查詢時間戳為 1557554753430 開始的訊息?
- 將 1557554753430 和每個日誌分段中最大時間戳 largestTimeStamp 逐一對比,直到找到不小於 1557554753430 所對應的日誌分段。日誌分段中的 largestTimeStamp 的計算是先查詢該日誌分段所對應時間戳索引檔案,找到最後一條索引項,若最後一條索引項的時間戳欄位值大於 0 ,則取該值,否則去該日誌分段的最近修改時間。
- 找到相應日誌分段之後,使用二分法進行定位,與偏移量索引方式類似,找到不大於 1557554753430 最大索引項,也就是 [1557554753420 430]。
- 拿著偏移量為 430 到偏移量索引檔案中使用二分法找到不大於 430 最大索引項,即 [20,320] 。
- 日誌檔案中從 320 的物理位置開始查詢不小於 1557554753430 資料。
注意:timestamp檔案中的 offset 與 index 檔案中的 relativeOffset 不是一一對應的哦。因為資料的寫入是各自追加。
在偏移量索引檔案中,索引資料都是順序記錄 offset ,但時間戳索引檔案中每個追加的索引時間戳必須大於之前追加的索引項,否則不予追加。在 Kafka 0.11.0.0 以後,訊息資訊中存在若干的時間戳資訊。如果 broker 端引數 log.message.timestamp.type 設定為 LogAppendTIme ,那麼時間戳必定能保持單調增長。反之如果是 CreateTime 則無法保證順序。
日誌清理
日誌清理,不是日誌刪除哦,這還是有所區別的,日誌刪除會在下文進行說明。
Kafka 提供兩種日誌清理策略:
日誌刪除:按照一定的刪除策略,將不滿足條件的資料進行資料刪除
日誌壓縮:針對每個訊息的 Key 進行整合,對於有相同 Key 的不同 Value 值,只保留最後一個版本。
Kafka 提供 log.cleanup.policy 引數進行相應配置,預設值:delete,還可以選擇 compact。
是否支援針對具體的 Topic 進行配置?
答案是肯定的,主題級別的配置項是 cleanup.policy 。
日誌刪除
Kafka 會週期性根據相應規則進行日誌資料刪除,保留策略有 3 種:基於時間的保留策略、基於日誌大小的保留策略和基於日誌其實偏移量的保留策略。
基於時間
日誌刪除任務會根據 log.retention.hours/log.retention.minutes/log.retention.ms 設定日誌保留的時間節點。如果超過該設定值,就需要進行刪除。預設是 7 天,log.retention.ms 優先順序最高。
如何查詢日誌分段檔案中已經過去的資料呢?
Kafka 依據日誌分段中最大的時間戳進行定位,首先要查詢該日誌分段所對應的時間戳索引檔案,查詢時間戳索引檔案中最後一條索引項,若最後一條索引項的時間戳欄位值大於 0,則取該值,否則取最近修改時間。
為什麼不直接選最近修改時間呢?
因為日誌檔案可以有意無意的被修改,並不能真實的反應日誌分段的最大時間資訊。
刪除過程
- 從日誌物件中所維護日誌分段的跳躍表中移除待刪除的日誌分段,保證沒有執行緒對這些日誌分段進行讀取操作。
- 這些日誌分段所有檔案新增 上 .delete 字尾。
- 交由一個以 "delete-file" 命名的延遲任務來刪除這些 .delete 為字尾的檔案。延遲執行時間可以通過 file.delete.delay.ms 進行設定
如果活躍的日誌分段中也存在需要刪除的資料時?
Kafka 會先切分出一個新的日誌分段作為活躍日誌分段,然後執行刪除操作。
基於日誌大小
日誌刪除任務會檢查當前日誌的大小是否超過設定值。設定項為 log.retention.bytes ,單個日誌分段的大小由 log.regment.bytes 進行設定。
刪除過程
- 計算需要被刪除的日誌總大小 (當前日誌檔案大小-retention值)。
- 從日誌檔案第一個 LogSegment 開始查詢可刪除的日誌分段的檔案集合。
- 執行刪除。
基於日誌起始偏移量
基於日誌起始偏移量的保留策略的判斷依據是某日誌分段的下一個日誌分段的起始偏移量是否大於等於日誌檔案的起始偏移量,若是,則可以刪除此日誌分段。
注意:日誌檔案的起始偏移量並不一定等於第一個日誌分段的基準偏移量,存在資料刪除,可能與之相等的那條資料已經被刪除了。
刪除過程
- 從頭開始變了每一個日誌分段,日誌分段 1 的下一個日誌分段的起始偏移量為 11,小於 logStartOffset,將 日誌分段 1 加入到刪除佇列中
- 日誌分段 2 的下一個日誌分段的起始偏移量為 23,小於 logStartOffset,將 日誌分段 2 加入到刪除佇列中
- 日誌分段 3 的下一個日誌分段的起始偏移量為 30,大於 logStartOffset,則不進行刪除。
- 如果想學習Java工程化、高效能及分散式、深入淺出。微服務、Spring,MyBatis,Netty原始碼分析的朋友可以加我的Java高階交流:787707172,群裡有阿里大牛直播講解技術,以及Java大型網際網路技術的視訊免費分