1. 程式人生 > >Kafka日誌儲存詳解

Kafka日誌儲存詳解

        在前面的文章中,我們對kafka的基本使用方式和架構原理進行了介紹,本文則主要介紹kafka中日誌的儲存原理,主要內容包括kafka日誌儲存格式、日誌檔案的管理方式、日誌索引檔案的格式和日誌壓縮等功能。

        作為一款訊息系統,日誌就是將訊息持久化到磁碟上的資料,這份資料的儲存方式將會極大的影響其吞吐量和擴充套件性,而kafka日誌由於其優秀的設計,為其實現這些特性提供了不可忽略的作用。總結來說,kafka日誌主要具有如下特點:

  • 極高的壓縮比例。kafka日誌不僅會對其key和value進行壓縮,而且還會對每條訊息的偏移量、時間戳等等元資料資訊進行壓縮;
  • batch的方式儲存資料。在儲存上,kafka日誌是以批次的方式進行資料的儲存,每個批次的大小預設為4KB,每個批次的元資料中會儲存其起始偏移量、時間戳和訊息長度等資訊;
  • 追加的方式寫入資料。由於kafka日誌都是寫入磁碟的,而磁碟的順序寫入效率是非常高的,kafka寫入採用的就是追加的方式寫入訊息,這樣可以避免磁頭的隨機移動,從而提升寫入速率;
  • 使用索引檔案提升查詢效能。kafka不僅會儲存訊息日誌檔案,還會為每個訊息日誌檔案建立一個索引檔案,而且索引都是以batch為單位進行儲存的,也即只會為batch的起始位移和時間戳建立索引,而不會為每條訊息都建立索引。

1. 日誌儲存格式

        最新版本的kafka日誌是以批為單位進行日誌儲存的,所謂的批指的是kafka會將多條日誌壓縮到同一個batch中,然後以batch為單位進行後續的諸如索引的建立和訊息的查詢等工作。對於每個批次而言,其預設大小為4KB,並且儲存了整個批次的起始位移和時間戳等元資料資訊,而對於每條訊息而言,其位移和時間戳等元資料儲存的則是相對於整個批次的元資料的增量,通過這種方式,kafka能夠減少每條訊息中資料佔用的磁碟空間。這裡我們首先展示一下每個批次的資料格式:

批次格式

        圖中訊息批次的每個元資料都有固定的長度大小,而只有最後面的訊息個數的是可變的。如下是batch中主要的屬性的含義:

  • 起始位移:佔用8位元組,其儲存了當前batch中第一條訊息的位移;
  • 長度:佔用了4個位元組,其儲存了整個batch所佔用的磁碟空間的大小,通過該欄位,kafka在進行訊息遍歷的時候,可以快速的跳躍到下一個batch進行資料讀取;
  • 分割槽leader版本號:記錄了當前訊息所在分割槽的leader的伺服器版本,主要用於進行一些資料版本的校驗和轉換工作;
  • CRC:對當前整個batch的資料的CRC校驗碼,主要是用於對資料進行差錯校驗的;
  • 屬性:佔用2個位元組,這個欄位的最低3位記錄了當前batch中訊息的壓縮方式,現在主要有GZIP、LZ4和Snappy三種。第4位記錄了時間戳的型別,第5和6位記錄了新版本引入的事務型別和控制型別;
  • 最大位移增量:最新的訊息的位移相對於第一條訊息的唯一增量;
  • 起始時間戳:佔用8個位元組,記錄了batch中第一條訊息的時間戳;
  • 最大時間戳:佔用8個位元組,記錄了batch中最新的一條訊息的時間戳;
  • PID、producer epoch和起始序列號:這三個引數主要是為了實現事務和冪等性而使用的,其中PID和producer epoch用於確定當前producer是否合法,而起始序列號則主要用於進行訊息的冪等校驗;
  • 訊息個數:佔用4個位元組,記錄當前batch中所有訊息的個數;

        通過上面的介紹可以看出,每個batch的頭部資料中佔用的位元組數固定為61個位元組,可變部分主要是與具體的訊息有關,下面我們來看一下batch中每條訊息的格式:

訊息格式

        這裡的訊息的頭部資料就與batch的大不相同,可以看到,其大部分資料的長度都是可變的。既然是可變的,這裡我們需要強調兩個問題:

  • 對於數字的儲存,kafka採用的是Zig-Zag的儲存方式,也即負數並不會使用補碼的方式進行編碼,而是將其轉換為對應的正整數,比如-1對映為11對映為2-2對映為32對映為4,關係圖如下所示:

    通過圖可以看出,在對資料反編碼的時候,我們只需要將對應的整數轉換成其原始值即可;

  • 在使用Zig-Zag編碼方式的時候,每個位元組最大為128,而其中一半要儲存正數,一半要儲存負數,還有一個0,也就是說每個位元組能夠表示的最大整數為64,此時如果有大於64的數字,kafka就會使用多個位元組進行儲存,而這多個位元組的表徵方式是通過將每個位元組的最大位作為保留位來實現的,如果最高位為1,則表示需要與後續位元組共同表徵目標數字,如果最高位為0,則表示當前位即可表示目標數字。

        kafka使用這種編碼方式的優點在於,大部分的資料增量都是非常小的數字,因此使用一個位元組即可儲存,這比直接使用原始型別的資料要節約大概七倍的記憶體。

        對於上面的每條訊息的格式,除了訊息key和value相關的欄位,其還有屬性欄位和header,屬性欄位的主要作用是儲存當前訊息key和value的壓縮方式,而header則供給使用者進行新增一些動態的屬性,從而實現一些定製化的工作。        通過對kafka訊息日誌的儲存格式我們可以看出,其使用batch的方式將一些公共資訊進行提取,從而保證其只需要儲存一份,雖然看起來每個batch的頭部資訊比較多,但其平攤到每條訊息上之後使用的位元組更少了;在訊息層面,kafka使用了資料增量的方式和Zig-Zag編碼方式對資料進行的壓縮,從而極大地減少其佔用的位元組數。總體而言,這種儲存方式極大的減少了kafka佔用的磁碟空間大小。

2. 日誌儲存方式

        在使用kafka時,訊息都是推送到某個topic中,然後由producer計算當前訊息會發送到哪個partition,在partition中,kafka會為每條訊息設定一個偏移量,也就是說,如果要唯一定位一條訊息,使用<topic, partition, offset>三元組即可。基於kafka的架構模式,其會將各個分割槽平均分配到每個broker中,也就是說每個broker會被分配用來提供一個或多個分割槽的日誌儲存服務。在broker伺服器上,kafka的日誌也是按照partition進行儲存的,其會在指定的日誌儲存目錄中為每個topic的partition分別建立一個目錄,目錄中儲存的就是這些分割槽的日誌資料,而目錄的名稱則會以<topic-patition>的格式進行建立。如下是kafka日誌的儲存目錄示意圖:

        這裡我們需要注意的是,圖中對於分割槽日誌的儲存,當前broker只會儲存分配給其的分割槽的日誌,比如圖中的connect-status就只有分割槽1和分割槽4的目錄,而沒有分割槽2和分割槽3的目錄,這是因為這些分割槽被分配在了叢集的其他節點上。在每個分割槽日誌目錄中,存在有三種類型的日誌檔案,即字尾分別為log、index和timeindex的檔案。其中log檔案就是真正儲存訊息日誌的檔案,index檔案儲存的是訊息的位移索引資料,而timeindex檔案則儲存的是時間索引資料。如下圖所示為一個分割槽的訊息日誌資料:

        從圖中可以看出,每種型別的日誌檔案都是分段的,這裡關於分段的規則主要有如下幾點需要說明:

  • 在為日誌進行分段時,每個檔案的檔名都是以該段中第一條訊息的位移的偏移量來命名的;
  • kafka會在每個log檔案的大小達到1G的時候關閉該檔案,而新開一個檔案進行資料的寫入。可以看到,圖中除了最新的log檔案外,其餘的log檔案的大小都是1G;
  • 對於index檔案和timeindex檔案,在每個log檔案進行分段之後,這兩個索引檔案也會進行分段,這也就是它們的檔名與log檔案一致的原因;
  • kafka日誌的留存時間預設是7天,也就是說,kafka會刪除儲存時間超過7天的日誌,但是對於某些檔案,其部分日誌儲存時間未達到7天,部分達到了7天,此時還是會保留該檔案,直至其所有的訊息都超過留存時間;

3. 索引檔案

        kafka主要有兩種型別的索引檔案:位移索引檔案和時間戳索引檔案。位移索引檔案中儲存的是訊息的位移與該位移所對應的訊息的實體地址;時間戳索引檔案中則儲存的是訊息的時間戳與該訊息的位移值。也就是說,如果需要通過時間戳查詢訊息記錄,那麼其首先會通過時間戳索引檔案查詢該時間戳對應的位移值,然後通過位移值在位移索引檔案中查詢訊息具體的實體地址。關於位移索引檔案,這裡有兩點需要說明:

  • 由於kafka訊息都是以batch的形式進行儲存,因而索引檔案中索引元素的最小單元是batch,也就是說,通過位移索引檔案能夠定位到訊息所在的batch,而沒法定位到訊息在batch中的具體位置,查詢訊息的時候,還需要進一步對batch進行遍歷;

  • 位移索引檔案中記錄的位移值並不是訊息真正的位移值,而是該位移相對於該位移索引檔案的起始位移的偏移量,通過這種方式能夠極大的減小位移索引檔案的大小。如下圖所示為一個位移索引檔案的格式示意圖:

    如下則是具體的位移索引檔案的示例:

        關於時間戳索引檔案,由於時間戳的變化比位移的變化幅度要大一些,其即使採用了增量的方式儲存時間戳索引,但也沒法有效地使用Zig-Zag方式對資料進行編碼,因而時間戳索引檔案是直接儲存的訊息的時間戳資料,但是對於時間戳索引檔案中儲存的位移資料,由於其變化幅度不大,因而其還是使用相對位移的方式進行的儲存,並且這種儲存方式也可以直接對映到位移索引檔案中而無需進行計算。如下圖所示為時間戳索引檔案的格式圖:

        如下則是時間戳索引檔案的一個儲存示例:

        可以看到,如果需要通過時間戳來定位訊息,就需要首先在時間戳索引檔案中定位到具體的位移,然後通過位移在位移索引檔案中定位到訊息的具體實體地址。

4. 日誌壓縮

        所謂的日誌壓縮功能,其主要是針對這樣的場景的,比如對某個使用者的郵箱資料進行修改,其總共修改了三次,修改過程如下:

[email protected]
[email protected]
[email protected]

        在這麼進行修改之後,很明顯,我們主要需要關心的是最後一次修改,因為其是最終資料記錄,但是如果我們按順序處理上述訊息,則需要處理三次訊息。kafka的日誌壓縮就是為了解決這個問題而存在的,對於使用相同key的訊息,其會只保留最新的一條訊息的記錄,而中間過程的訊息都會被kafka cleaner給清理掉。但是需要注意的是,kafka並不會清理當前處於活躍狀態的日誌檔案中的訊息記錄。所謂當前處於活躍狀態的日誌檔案,也就是當前正在寫入資料的日誌檔案。如下圖所示為一個kafka進行日誌壓縮的示例圖:

        圖中K1的資料有V1、V3和V4,經過壓縮之後只有V4保留了下來,K2的資料則有V2、V6和V10,壓縮之後也只有V10保留了下來;同理可推斷其他的Key的資料。另外需要注意的是,kafka開啟日誌壓縮使用的是log.cleanup.policy,其預設值為delete,也即我們正常使用的策略,如果將其設定為compaction,則開啟了日誌壓縮策略,但是需要注意的是,開啟了日誌壓縮策略並不代表kafka會清理歷史資料,只有將log.cleaner.enable設定為true才會定時清理歷史資料。

        在kafka中,其本身也在使用日誌壓縮策略,主要體現在kafka訊息的偏移量儲存。在舊版本中,kafka將每個consumer分組當前消費的偏移量資訊儲存在zookeeper中,但是由於zookeeper是一款分散式協調工具,其對於讀操作具有非常高的效能,但是對於寫操作效能比較低,而consumer的位移提交動作是非常頻繁的,這勢必會導致zookeeper稱為kafka訊息消費的瓶頸。因而在最新版本中,kafka將分組消費的位移資料儲存在了一個特殊的topic中,即__consumer_offsets,由於每個分組group的位移資訊都會提交到該topic,因而kafka預設為其設定了非常多的分割槽,也即50個分割槽。另外,consumer在提交位移時,使用的key為groupId+topic+partition,而值則為當前提交的位移,也就是說,對於每一個分組所消費的topic的partition,其都只會保留最新的位移。如果consumer需要讀取位移,那麼只需要按照上述格式組裝key,然後在該topic中讀取最新的訊息資料即可。

5. 小結

        本文首先對kafka的日誌設計的優點進行了介紹,然後著重講解了日誌儲存格式、日誌目錄規劃、日誌索引檔案的格式以及日