kafka工作流程及檔案儲存機制
1、Kafka工作流程
kafka中訊息是以topic進行分類的,生產者生產訊息,消費者消費訊息,都是面向topic的
topic是邏輯上的概念,而partition是物理上的概念,每個partition對應一個log檔案,該log檔案中儲存的就是producer生產的資料。producer生產的資料會被不斷追加到log檔案的末端,且每條資料都有自己的offset
offset是一個long型的數字,通過這個offset可以確定一條在該partition下的唯一訊息。在partition下是保證有序的,但是在topic下面沒有保證有序性
消費者組中的每個消費者,都會實時記錄自己消費到哪個offset以便出錯恢復,從上次的位置繼續消費
2、檔案儲存機制
2.1 儲存機制
由於生產者生產的訊息會不斷追加到log檔案末端,為防止log檔案過大導致資料定位效率低,kafka採取了分片和索引機制,將每個partition分為多個segment(邏輯上的概念,index+log檔案)
每個partition(目錄)相當於一個巨型檔案被平均分配到多個大小相等的segment(片段)資料檔案中(每個segment檔案中訊息數量不一定相等),這種特性也方便old segment的刪除,即方便已被消費的訊息的清理,提高磁碟的利用率。每個partition只需要支援順序讀寫就行,segment的檔案生命週期由服務端配置引數(log.segment.bytes,log.roll.{ms,hours}等若干引數)決定
每個segment對應兩個檔案----“.index”和“.log”檔案。分別表示為segment索引檔案和資料檔案(引入索引檔案的目的就是便於利用二分查詢快速定位message位置)。這兩個檔案的命名規則為:
partition全域性的第一個segment從0開始,後續每個segment檔名以當前segment的第一條訊息的offset命名,數值大小為64位,20位數字字元長度,沒有數字用0填充。
這些檔案位於一個資料夾下(partition目錄),改資料夾的命名規則:topic名+分割槽序號。例如,first這個topic有三個分割槽,則其對應的資料夾為first-0,first-1,first-2
[root@bigdata-02 kafka-logs]# tree
.
# partition目錄(topic名稱+分割槽序號)
├── analyze-0
│ ├── 00000000000000000000.index
│ ├── 00000000000000000000.log
# 0.8版本之前的kafka沒有timeindex檔案,這是kafka的具體時間日誌
│ ├── 00000000000000000000.timeindex
│ └── leader-epoch-checkpoint
├── analyze-1
│ ├── 00000000000000000000.index
│ ├── 00000000000000000000.log
│ ├── 00000000000000000000.timeindex
│ └── leader-epoch-checkpoint
├── analyze-2
│ ├── 00000000000000000000.index
│ ├── 00000000000000000000.log
│ ├── 00000000000000000000.timeindex
│ └── leader-epoch-checkpoint
├── cleaner-offset-checkpoint
├── __consumer_offsets-0
│ ├── 00000000000000000000.index
│ ├── 00000000000000000000.log
│ ├── 00000000000000000000.timeindex
│ └── leader-epoch-checkpoint
├── __consumer_offsets-1
│ ├── 00000000000000000000.index
│ ├── 00000000000000000000.log
│ ├── 00000000000000000000.timeindex
│ └── leader-epoch-checkpoint
。
。
。
├── log-start-offset-checkpoint
├── meta.properties
├── recovery-point-offset-checkpoint
└── replication-offset-checkpoint
index和log檔案以當前segment的第一條訊息的offset命名。
2.2 index和log檔案
.index 索引檔案儲存大量的索引資訊,.log資料檔案儲存大量訊息資料(message),索引檔案中的元資料指向對應資料檔案中message的物理偏移地址。以index索引檔案中的元資料3497為例,依次在資料檔案中表示第三個message(在全域性Partition中表示第368772個message),以及該訊息的物理偏移地址為497
2.3 message的結構
Segment的Log檔案由多個Message組成,下面詳細說明Message的物理結構,如圖:
引數說明:
2.4 如何通過offset查詢message
先二分查詢獲取對應index索引檔案,獲取到對應的物理offset,拿著物理offset去log資料檔案順序查詢對應訊息,返回查詢到的訊息。
例如:讀取offset=368776的Message,需要通過如下兩個步驟。
第一步:查詢segment File
00000000000000000000.index表示最開始的檔案,起始偏移量(offset)為0;第二個檔案00000000000000368770.index的起始偏移量為368770,依次類推。以起始偏移量命名並排序這些檔案,只要根據offset二分查詢檔案列表,就可以快速定位到具體檔案。
當offset=368776時,定位到00000000000000368770.index|log。
第二步:通過segment File查詢Message
通過第一步定位到Segment File,當offset=368776時,依次定位到00000000000000368770.index的元資料物理位置和00000000000000368770.log的物理偏移地址,然後再通過00000000000000368770.log順序查詢,直到offset=368776為止。
segment index file採取稀疏索引儲存方式,可以減少索引檔案大小,通過Linux mmap介面可以直接進行記憶體操作。稀疏索引為資料檔案的每個對應message設定一個元資料指標,它比稠密索引節省了更多的儲存空間,但查詢起來需要消耗更多的時間
3、資料目錄結構
向主題topic-log中傳送一定量的訊息,某一時刻topic-log-0目錄中的佈局如下所示。
示例中第2個LogSegment對應的基準位移是133,也說明了該LogSegment中的第一條訊息的偏移量為133,同時可以反映出第一個LogSegment中共有133條訊息(偏移量從0至132的訊息)。
注意每個LogSegment中不只包含“.log”“.index”“.timeindex”這3種檔案,還可能包含 “.deleted”“.cleaned”“.swap”等臨時檔案,以及可能的“.snapshot”“.txnindex”“leader-epoch-checkpoint”等檔案。
Kafka 中的檔案不只上面提及的這些檔案,比如還有一些檢查點檔案,當一個Kafka服務第一次啟動的時候,預設的根目錄下就會建立以下5個檔案:
├── cleaner-offset-checkpoint
├── meta.properties
├── recovery-point-offset-checkpoint
├── replication-offset-checkpoint
├── log-start-offset-checkpoint
kafka0.8之後消費者提交的位移是儲存在 Kafka 內部的主題__consumer_offsets中的,初始情況下這個主題並不存在,當第一次有消費者消費訊息時會自動建立這個主題。
在某一時刻,Kafka 中的檔案目錄佈局如圖 所示。每一個根目錄都會包含最基本的 4個檢查點檔案(xxx-checkpoint)和 meta.properties 檔案。在建立主題的時候,如果當前 broker中不止配置了一個根目錄,那麼會挑選分割槽數最少的那個根目錄來完成本次建立任務。