Kafka通過timestamp獲取offset的機制詳解
1、入口
Kafka Server 處理 Client 傳送來的請求的入口在
資料夾: core/src/main/scala/kafka/server
類:kafka.server.KafkaApis
方法: handle
處理offset請求的函式: handleOffsetRequest
2、處理邏輯
處理邏輯主要分為四步
- 獲取partition
- 從partition中獲取offset
- high water mark 處理(這一段的資料太少了)
- 異常處理
由於request中包含查詢多個partition的offset的請求。所以最終會返回一個map,儲存有每個partition對應的offset
這裡主要介紹從某一個partition中獲取offset的邏輯,程式碼位置
- kafka.log.Log#getOffsetsBefore(timestamp, maxNumOffsets)
從一個partition中獲取offset
1、建立offset與timestamp的對應關係,並儲存到資料中
//每個Partition由多個segment file組成。獲取當前partition中的segment列表 val segsArray = segments.view // 建立陣列 var offsetTimeArray: Array[(Long, Long)] =null if(segsArray.last.size >0) offsetTimeArray =new else offsetTimeArray =newArray[(Long, Long)](segsArray.length) // 將 offset 與 timestamp 的對應關係新增到陣列中 for(i <-0until segsArray.length) // 資料中的每個元素是一個二元組,(segment file 的起始 offset,segment file的最近修改時間) offsetTimeArray(i) = (segsArray(i).start, segsArray(i).messageSet.file.lastModified) if(segsArray.last.size >0) // 如果最近一個 segment file 不為空,將(最近的 offset, 當前之間)也新增到該陣列中 offsetTimeArray(segsArray.length) = (logEndOffset, time.milliseconds) |
通過這段邏輯,獲的一個數據 offsetTimeArray,每個元素是一個二元組,二元組內容是(offset, timestamp)
2、找到最近的最後一個滿足 timestamp < target_timestamp 的 index。
var startIndex = -1 timestamp match { // 需要查詢的 timestamp 是 -1 或者 -2時,特殊處理 caseOffsetRequest.LatestTime => // OffsetRequest.LatestTime = -1 startIndex = offsetTimeArray.length -1 caseOffsetRequest.EarliestTime => // OffsetRequest.EarliestTime = -2 startIndex =0 case_ => var isFound =false debug("Offset time array = "+ offsetTimeArray.foreach(o =>"%d, %d".format(o._1, o._2))) startIndex = offsetTimeArray.length -1// 從最後一個元素反向找 while(startIndex >=0&& !isFound) { // 找到滿足條件或者 if(offsetTimeArray(startIndex)._2 <= timestamp) // offsetTimeArray 的每個元素是二元組,第二個位置是 timestamp isFound =true else startIndex -=1 } } |
通過這段邏輯,實際找到的是 “最近修改時間早於目標timestamp的最近修改的segment file的起始offset”
但是獲取offset的邏輯並沒有結束,後續仍有處理
3、找到滿足該條件的offset陣列
實際上這個函式的功能是找到一組offset,而不是一個offset。第二個引數 maxNumOffsets 指定最多找幾個滿足條件的 offset。
獲取一組offset的邏輯
// 返回的資料的長度 = min(maxNumOffsets, startIndex + 1),startIndex是邏輯2中找到的index val retSize = maxNumOffsets.min(startIndex +1) val ret = newArray[Long](retSize) // 逐個將滿足條件的offset新增到返回的資料中 for(j <-0until retSize) { ret(j) = offsetTimeArray(startIndex)._1 startIndex -=1 } // 降序排序返回。offset 越大資料越新。 // ensure that the returned seq is in descending order of offsets ret.toSeq.sortBy(- _) |
最終返回這個陣列
3、注意事項
- 實際找到的offset並不是從目標timestamp開始的第一個offset。需要注意
- 當 timestamp 小於最老的資料檔案的最近修改時間時,返回值是一個空陣列。可能會導致使用時的問題。
- 調整segment file檔案拆分策略的配置時,需要注意可能會造成的影響。