1. 程式人生 > >Kafka訊息時間戳(kafka message timestamp)

Kafka訊息時間戳(kafka message timestamp)

最近碰到了訊息時間戳的問題,於是花了一些功夫研究了一下,特此記錄一下。 Kafka訊息的時間戳 在訊息中增加了一個時間戳欄位和時間戳型別。目前支援的時間戳型別有兩種: CreateTime 和 LogAppendTime 前者表示producer建立這條訊息的時間;後者表示broker接收到這條訊息的時間(嚴格來說,是leader broker將這條訊息寫入到log的時間) 為什麼要加入時間戳? 引入時間戳主要解決3個問題:
  • 日誌儲存(log retention)策略:Kafka目前會定期刪除過期日誌(log.retention.hours,預設是7天)。判斷的依據就是比較日誌段檔案(log segment file)的最新修改時間(last modification time)。倘若最近一次修改發生於7天前,那麼就會視該日誌段檔案為過期日誌,執行清除操作。但如果topic的某個分割槽曾經發生過分割槽副本的重分配(replica reassigment),那麼就有可能會在一個新的broker上建立日誌段檔案,並把該檔案的最新修改時間設定為最新時間,這樣設定的清除策略就無法執行了,儘管該日誌段中的資料其實已經滿足可以被清除的條件了。
  • 日誌切分(log rolling)策略:與日誌儲存是一樣的道理。當前日誌段檔案會根據規則對當前日誌進行切分——即,建立一個新的日誌段檔案,並設定其為當前啟用(active)日誌段。其中有一條規則就是基於時間的(log.roll.hours,預設是7天),即當前日誌段檔案的最新一次修改發生於7天前的話,就建立一個新的日誌段檔案,並設定為active日誌段。所以,它也有同樣的問題,即最近修改時間不是固定的,一旦發生分割槽副本重分配,該值就會發生變更,導致日誌無法執行切分。(注意:log.retention.hours及其家族與log.rolling.hours及其家族不會衝突的,因為Kafka不會清除當前啟用日誌段檔案)
  • 流式處理(Kafka streaming):流式處理中需要用到訊息的時間戳
訊息格式的變化 1 增加了timestamp欄位,表示時間戳 2 增加了timestamp型別欄位,儲存在attribute屬性低位的第四個位元上,0表示CreateTime;1表示LogAppendTime(低位前三個位元儲存訊息壓縮型別) 客戶端訊息格式的變化 ProducerRecord:增加了timestamp欄位,允許producer指定訊息的時間戳,如果不指定的話使用producer客戶端的當前時間 ConsumerRecord:增加了timestamp欄位,允許消費訊息時獲取到訊息的時間戳 ProducerResponse: 增加了timestamp欄位,如果是CreateTime返回-1;如果是LogAppendTime,返回寫入該條訊息時broker的本地時間 如何使用時間戳?
Kafka broker config提供了一個引數:log.message.timestamp.type來統一指定叢集中的所有topic使用哪種時間戳型別。使用者也可以為單個topic設定不同的時間戳型別,具體做法是建立topic時覆蓋掉全域性配置: 
bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic test --partitions 1 --replication-factor 1 --config message.timestamp.type=LogAppendTime
另外, producer在建立ProducerRecord時可以指定時間戳: 
record = new ProducerRecord<String, String>("my-topic", null, System.currentTimeMillis(), "key", "value");
Kafka內部如何處理時間戳?  說起來太麻煩,直接上圖吧:   值得一提的是上圖中的”指定閾值“ —— 有時候我們需要實現這樣的場景:比如某條訊息如果在5分鐘內還不能被創建出來那麼就不再需要建立了,直接丟棄之。Kafka提供了log.message.timestamp.difference.max.ms和message.timestamp.difference.max.ms引數來實現這樣的需求,當然只對CreateTime型別的時間戳有效,如果是LogAppendTime則該引數無效。 基於時間戳的功能 1 根據時間戳來定位訊息:之前的索引檔案是根據offset資訊的,從邏輯語義上並不方便使用,引入了時間戳之後,Kafka支援根據時間戳來查詢定位訊息 2 基於時間戳的日誌切分策略 3 基於時間戳的日誌清除策略 個人認為,第2,3點其實是引入時間戳的初衷,而第1點可以看做是時間戳的另一個功能應用。 基於時間戳的訊息定位 自0.10.0.1開始,Kafka為每個topic分割槽增加了新的索引檔案:基於時間的索引檔案:<segment基礎位移>.timeindex,索引項間隔由index.interval.bytes確定。 具體的格式是時間戳+位移 時間戳記錄的是該日誌段當前記錄的最大時間戳 位移資訊記錄的是插入新的索引項時的訊息位移資訊 該索引檔案中的每一行元組(時間戳T,位移offset)表示:該日誌段中比T晚的所有訊息的位移都比offset大。 由於建立了額外的索引檔案,所需的作業系統檔案控制代碼平均要增加1/3(原來需要2個檔案,現在需要3個),所以有可能需要調整檔案控制代碼的引數。