漫遊Kafka實現篇之訊息和日誌
訊息格式
日誌
一個叫做“my_topic”且有兩個分割槽的的topic,它的日誌有兩個資料夾組成,my_topic_0和my_topic_1,每個資料夾裡放著具體的資料檔案,每個資料檔案都是一系列的日誌實體,每個日誌實體有一個4個位元組的整數N標註訊息的長度,後邊跟著N個位元組的訊息。
每個訊息都可以由一個64位的整數offset標註,offset標註了這條訊息在傳送到這個分割槽的訊息流中的起始位置。每個日誌檔案的名稱都是這個檔案第一條日誌的offset.所以第一個日誌檔案的名字就是00000000000.kafka.所以每相鄰的兩個檔名字的差就是一個數字S,S差不多就是配置檔案中指定的日誌檔案的最大容量。
訊息的格式都由一個統一的介面維護,所以訊息可以在producer,broker和consumer之間無縫的傳遞。儲存在硬碟上的訊息格式如下所示:
訊息長度: 4 bytes (value: 1+4+n)
版本號: 1 byte
CRC校驗碼: 4 bytes
具體的訊息: n bytes
寫操作
訊息被不斷的追加到最後一個日誌的末尾,當日志的大小達到一個指定的值時就會產生一個新的檔案。對於寫操作有兩個引數,一個規定了訊息的數量達到這個值時必須將資料重新整理到硬碟上,另外一個規定了重新整理到硬碟的時間間隔,這對資料的永續性是個保證,在系統崩潰的時候只會丟失一定數量的訊息或者一個時間段的訊息。
讀操作
讀操作需要兩個引數:一個64位的offset和一個S位元組的最大讀取量。S通常比單個訊息的大小要大,但在一些個別訊息比較大的情況下,S會小於單個訊息的大小。這種情況下讀操作會不斷重試,每次重試都會將讀取量加倍,直到讀取到一個完整的訊息。可以配置單個訊息的最大值,這樣伺服器就會拒絕大小超過這個值的訊息。也可以給客戶端指定一個嘗試讀取的最大上限,避免為了讀到一個完整的訊息而無限次的重試。
在實際執行讀取操縱時,首先需要定位資料所在的日誌檔案,然後根據offset計算出在這個日誌中的offset(前面的的offset是整個分割槽的offset),然後在這個offset的位置進行讀取。定位操作是由二分查詢法完成的,Kafka在記憶體中為每個檔案維護了offset的範圍。
下面是傳送給consumer的結果的格式:
MessageSetSend (fetch result) total length : 4 bytes error code : 2 bytes message 1 : x bytes ... message n : x bytes
MultiMessageSetSend (multiFetch result) total length : 4 bytes error code : 2 bytes messageSetSend 1 ... messageSetSend n
刪除
日誌管理器允許定製刪除策略。目前的策略是刪除修改時間在N天之前的日誌(按時間刪除),也可以使用另外一個策略:保留最後的N GB資料的策略(按大小刪除)。為了避免在刪除時阻塞讀操作,採用了copy-on-write形式的實現,刪除操作進行時,讀取操作的二分查詢功能實際是在一個靜態的快照副本上進行的,這類似於Java的CopyOnWriteArrayList。
可靠性保證
日誌檔案有一個可配置的引數M,快取超過這個數量的訊息將被強行重新整理到硬碟。一個日誌矯正執行緒將迴圈檢查最新的日誌檔案中的訊息確認每個訊息都是合法的。合法的標準為:所有檔案的大小的和最大的offset小於日誌檔案的大小,並且訊息的CRC32校驗碼與儲存在訊息實體中的校驗碼一致。如果在某個offset發現不合法的訊息,從這個offset到下一個合法的offset之間的內容將被移除。
有兩種情況必須考慮:1,當發生崩潰時有些資料塊未能寫入。2,寫入了一些空白資料塊。第二種情況的原因是,對於每個檔案,作業系統都有一個inode(inode是指在許多“類Unix檔案系統”中的一種資料結構。每個inode儲存了檔案系統中的一個檔案系統物件,包括檔案、目錄、大小、裝置檔案、socket、管道, 等等),但無法保證更新inode和寫入資料的順序,當inode儲存的大小資訊被更新了,但寫入資料時發生了崩潰,就產生了空白資料塊。CRC校驗碼可以檢查這些塊並移除,當然因為崩潰而未寫入的資料塊也就丟失了。