1. 程式人生 > >kafka 節點物理儲存機制

kafka 節點物理儲存機制

總覽:

儲採用了分割槽(partition)分段(LogSegment)稀疏索引這幾個手段來達到了高效性

分割槽:話題分割槽

分段:分割槽儲存檔案分段

稀疏索引:以分段對應索引採用相對偏移量進行檢索範圍的縮小(如一個分割槽分為五段,每個分段都會對應一個索引檔案.index,儲存訊息的相對偏移量和分割槽分段名稱)

術語解析:

Topic:主題

Partition:分割槽

Offset:偏移量

Segment:分段儲存物理檔案

Index:分段儲存物理索引檔案

Message:訊息

Kafka中的Message是以topic為基本單位組織的,不同的topic之間是相互獨立的。每個topic又可以分成幾個不同的partition(每個topic有幾個partition是在建立topic時指定的),每個partition儲存一部分Message。借用官方的一張圖,可以直觀地看到topic和partition的關係。

Partition的資料檔案

Partition中的每條Message由offset來表示它在這個partition中的偏移量,這個offset不是該Message在partition資料檔案中的實際儲存位置,而是邏輯上一個值,它唯一確定了partition中的一條Message。因此,可以認為offset是partition中Message的id。partition中的每條Message包含了以下三個屬性:

·        offset :偏移量 (long)

·        MessageSize:訊息大小(int32)

·        data::內容(和Kafka通訊協議中介紹的MessageSet格式是一致。)

Partition的資料檔案則包含了若干條上述格式的Message,按offset由小到大排列在一起。它的實現類為FileMessageSet,類圖如下:

它的主要方法如下:

·        append: 把給定的ByteBufferMessageSet中的Message寫入到這個資料檔案中。

·        searchFor: 從指定的startingPosition開始搜尋找到第一個Message其offset是大於或者等於指定的offset,並返回其在檔案中的位置Position。它的實現方式是從startingPosition開始讀取12個位元組,分別是當前MessageSet的offset和size。如果當前offset小於指定的offset,那麼將position向後移動LogOverHead+MessageSize(其中LogOverHead為offset+messagesize,為12個位元組)。

·        read:準確名字應該是slice,它擷取其中一部分返回一個新的FileMessageSet。它不保證擷取的位置資料的完整性。

·        sizeInBytes: 表示這個FileMessageSet佔有了多少位元組的空間。

·        truncateTo: 把這個檔案截斷,這個方法不保證截斷位置的Message的完整性。

·        readInto: 從指定的相對位置開始把檔案的內容讀取到對應的ByteBuffer中。

我們來思考一下,如果一個partition只有一個數據檔案會怎麼樣?

1.    新資料是新增在檔案末尾(呼叫FileMessageSet的append方法),不論檔案資料檔案有多大,這個操作永遠都是O(1)的。

2.    查詢某個offset的Message(呼叫FileMessageSet的searchFor方法)是順序查詢的。因此,如果資料檔案很大的話,查詢的效率就低。

那Kafka是如何解決查詢效率的的問題呢?有兩大法寶:1) 分段 2) 索引。

資料檔案的分段

Kafka解決查詢效率的手段之一是將資料檔案分段,比如有100條Message,它們的offset是從0到99。假設將資料檔案分成5段,第一段為0-19,第二段為20-39,以此類推,每段放在一個單獨的資料檔案裡面,資料檔案以該段中最小的offset命名。這樣在查詢指定offset的Message的時候,用二分查詢就可以定位到該Message在哪個段中

為資料檔案建索引

資料檔案分段使得可以在一個較小的資料檔案中查詢對應offset的Message了,但是這依然需要順序掃描才能找到對應offset的Message。為了進一步提高查詢的效率,Kafka為每個分段後的資料檔案建立了索引檔案,檔名與資料檔案的名字是一樣的,只是副檔名為.index。
索引檔案中包含若干個索引條目,每個條目表示資料檔案中一條Message的索引。索引包含兩個部分(均為4個位元組的數字),分別為相對offset和position。

·        相對offset:因為資料檔案分段以後,每個資料檔案的起始offset不為0,相對offset表示這條Message相對於其所屬資料檔案中最小的offset的大小。舉例,分段後的一個數據檔案的offset是從20開始,那麼offset為25的Message在index檔案中的相對offset就是25-20 = 5。儲存相對offset可以減小索引檔案佔用的空間。

·        position,表示該條Message在資料檔案中的絕對位置。只要開啟檔案並移動檔案指標到這個position就可以讀取對應的Message了。

index檔案中並沒有為資料檔案中的每條Message建立索引,而是採用了稀疏儲存的方式,每隔一定位元組的資料建立一條索引。這樣避免了索引檔案佔用過多的空間,從而可以將索引檔案保留在記憶體中。但缺點是沒有建立索引的Message也不能一次定位到其在資料檔案的位置,從而需要做一次順序掃描,但是這次順序掃描的範圍就很小了。

在Kafka中,索引檔案的實現類為OffsetIndex,它的類圖如下:

主要的方法有:

·        append方法,新增一對offset和position到index檔案中,這裡的offset將會被轉成相對的offset。

·        lookup, 用二分查詢的方式去查詢小於或等於給定offset的最大的那個offset

小結

我們以幾張圖來總結一下Message是如何在Kafka中儲存的,以及如何查詢指定offset的Message的。

Message是按照topic來組織,每個topic可以分成多個的partition,比如:有5個partition的名為為page_visits的topic的目錄結構為:

partition是分段的,每個段叫Log Segment,包括了一個數據檔案和一個索引檔案,下圖是某個partition目錄下的檔案:

可以看到,這個partition有4個LogSegment。

借用博主@lizhitao部落格上的一張圖來展示是如何查詢Message的。

比如:要查詢絕對offset為7的Message:

1.    首先是用二分查詢確定它是在哪個LogSegment中,自然是在第一個Segment中。

2.    開啟這個Segment的index檔案,也是用二分查詢找到offset小於或者等於指定offset的索引條目中最大的那個offset。自然offset為6的那個索引是我們要找的,通過索引檔案我們知道offset為6的Message在資料檔案中的位置為9807。

3.    開啟資料檔案,從位置為9807的那個地方開始順序掃描直到找到offset為7的那條Message。

這套機制是建立在offset是有序的。索引檔案被對映到記憶體中,所以查詢的速度還是很快的。

一句話,KafkaMessage儲存採用了分割槽(partition),分段(LogSegment)和稀疏索引這幾個手段來達到了高效性。

其餘檔案

timeindex:時間戳檔案,基於時間戳檢索的index檔案,對應上面基於偏移量做的索引

基於時間戳的功能

1 根據時間戳來定位訊息:之前的索引檔案是根據offset資訊的,從邏輯語義上並不方便使用,引入了時間戳之後,Kafka支援根據時間戳來查詢定位訊息

2 基於時間戳的日誌切分策略

3 基於時間戳的日誌清除策略

個人認為,第2,3點其實是引入時間戳的初衷,而第1點可以看做是時間戳的另一個功能應用。

基於時間戳的訊息定位

自0.10.0.1開始,Kafka為每個topic分割槽增加了新的索引檔案:基於時間的索引檔案:<segment基礎位移>.timeindex,索引項間隔由index.interval.bytes確定。

具體的格式是時間戳+位移

時間戳記錄的是該日誌段當前記錄的最大時間戳

位移資訊記錄的是插入新的索引項時的訊息位移資訊

該索引檔案中的每一行元組(時間戳T,位移offset)表示:該日誌段中比T晚的所有訊息的位移都比offset大。

由於建立了額外的索引檔案,所需的作業系統檔案控制代碼平均要增加1/3(原來需要2個檔案,現在需要3個),所以有可能需要調整檔案控制代碼的引數。

Snapshot:快照檔案,暫不做分析

leader-epoch-checkpoint:leader儲存檔案,暫不做分析