1. 程式人生 > 其它 >Kafka訊息(儲存)格式及索引組織方式

Kafka訊息(儲存)格式及索引組織方式

要深入學習Kafka,理解Kafka的儲存機制是非常重要的。本文介紹Kafka儲存訊息的格式以及資料檔案和索引組織方式,以便更好的理解Kafka是如何工作的。

要深入學習Kafka,理解Kafka的儲存機制是非常重要的。本文介紹Kafka儲存訊息的格式以及資料檔案和索引組織方式,以便更好的理解Kafka是如何工作的。

Kafka訊息儲存格式

Kafka為了保證訊息的可靠性,服務端會將接收的訊息進行序列化並儲存到磁碟上(Kafka的多副本儲存機制),這裡涉及到訊息的儲存格式,即訊息編碼後落到磁碟檔案上的二進位制的資料格式。下圖是根據Kafka 3.0官方文件整理的訊息格式:

包含三個部分:BatchRecords、Record,以及Header的編碼格式。

BatchRecords是Kafka資料的儲存單元,一個BatchRecords中包含多個Record(即我們通常說的一條訊息)。BatchRecords中各個欄位的含義如下:

欄位名 含義
baseOffset 這批訊息的起始Offset
partitionLeaderEpoch 用於Partition的Recover時保護資料的一致性,具體場景可以見KIP101
batchLength BatchRecords的長度
magic 魔數字段,可以用於拓展儲存一些資訊,當前3.0版本的magic是2
crc crc校驗碼,包含從attributes開始到BatchRecords結束的資料的校驗碼
attributes int16,其中bit0~2中包含了使用的壓縮演算法,bit3是timestampType,bit4表示是否失誤,bit5表示是否是控制指令,bit6~15暫未使用
lastOffsetDelta BatchRecords中最後一個Offset,是相對baseOffset的值
firstTimestamp BatchRecords中最小的timestamp
maxTimestamp BatchRecords中最大的timestamp
producerId 傳送端的唯一ID,用於做訊息的冪等處理
producerEpoch 傳送端的Epoch,用於做訊息的冪等處理
baseSequence BatchRecords的序列號,用於做訊息的冪等處理
records 具體的訊息內容

一個BatchRecords中可以包含多條訊息,即上圖中的Record,而每條訊息又可以包含多個Header資訊,Header是Key-Value形式的。Record和Header的格式都非常簡單,就不對其中的欄位做解釋了。

Log Segment

在Kafka中,一個Topic會被分割成多個Partition,而Partition由多個更小的,稱作Segment的元素組成。

Kafka一個Partition下會包含類似上圖中的一些檔案,由log、index、timeindex三個檔案組成一個Segment,而檔名中的(0和35)表示的是一個Segment的起始Offset(Kafka會根據log.segment.bytes的配置來決定單個Segment檔案(log)的大小,當寫入資料達到這個大小時就會建立新的Segment)。log、index、timeindex中儲存的都是二進位制的資料(log中儲存的就是上一部分介紹的BatchRecords的內容,而index和timeindex分別是一些索引資訊。)

下圖是log檔案中資料解析後的示意圖(也就是本文第一部分BatchRecords格式)。其中16開頭的這一行表示一個第一條訊息的Offset是16的BatchRecord,而24開頭的這一行表示的是一個第一條訊息的Offset是24的BatchRecord。

索引

我們知道Kafka中每個Consumer消費一個Partition都會有一個關聯的Offset表示已經處理過的訊息的位置。通常Consumer會根據Offset連續的處理訊息。而通過Offset從儲存層中獲取訊息大致分為兩步:

  • 第一步,根據Offset找到所屬的Segment檔案

  • 第二步,從Segment中獲取對應Offset的訊息資料

其中第一步可以直接根據Segment的檔名進行查詢(上面已經介紹了Segment的檔案面就是它包含的資料的起始Offset)。第二步則需要一些索引資訊來快速定位目標資料在Segment中的位置,否則就要讀取整個Segment檔案了,這裡需要的索引資訊就是上面的index檔案儲存的內容。

index檔案中儲存的是Offset和Position(Offset對應的訊息在log檔案中的偏移量)的對應關係,這樣當有Offset時可以快速定位到Position讀取BatchRecord,然後再從BatchRecord中獲取某一條訊息。比如上述Offset25會被定位到24這個BatchRecord,然後再從這個BatchRecord中取出第二個Record(24這個BatchRecord包含了24、25兩個Record)。

注意,Kafka並不會為每個Record都儲存一個索引,而是根據log.index.interval.bytes等配置構建稀疏的索引資訊。

除了index索引檔案儲存Offset和Position的對映關係外,Kafka中還維護了timeindex,儲存了Timestamp和Offset的關係,用於應對一些場景需要根據timestamp來定位訊息。timeindex中的一個(timestampX,offsetY)元素的含義是所有建立時間大於timestampX的訊息的Offset都大於offsetY。

同樣的,timeindex也採用了稀疏索引的機制,使用和index相同的配置(log.index.interval.bytes),所以timeindex和index是一一對應的。

總結

本文首先介紹了Kafka訊息的儲存格式,然後介紹了Kafka是如何索引(index & timeindex)儲存的資料的。看完索引部分後遺留了一個疑問:每次讀取訊息都要先根據索引讀取Position資訊,然後再根據Position去讀資料,而索引又是稀疏索引(查詢索引也是要開銷的),這樣效率是否會比較低呢?如果利用Consumer總是順序讀取訊息的特性,每次讀取資料時都帶上一些上下文資訊(比如上一次Offset對應的Position資訊)直接去讀Log資料效率是否會更高?歡迎留言交流!

如果本文對您有幫助,點一下右下角的“推薦”