1. 程式人生 > >Kafka通過timestamp獲取offset的機制詳解

Kafka通過timestamp獲取offset的機制詳解

1、入口

Kafka Server 處理 Client 傳送來的請求的入口在

資料夾:  core/src/main/scala/kafka/server

類:kafka.server.KafkaApis

方法: handle

處理offset請求的函式: handleOffsetRequest

2、處理邏輯

處理邏輯主要分為四步

  • 獲取partition
  • 從partition中獲取offset
  • high water mark 處理(這一段的資料太少了)
  • 異常處理

由於request中包含查詢多個partition的offset的請求。所以最終會返回一個map,儲存有每個partition對應的offset

這裡主要介紹從某一個partition中獲取offset的邏輯,程式碼位置

  • kafka.log.Log#getOffsetsBefore(timestamp, maxNumOffsets)

從一個partition中獲取offset

1、建立offset與timestamp的對應關係,並儲存到資料中

//每個Partition由多個segment file組成。獲取當前partition中的segment列表

val segsArray = segments.view

// 建立陣列

var offsetTimeArray: Array[(Long, Long)] =null

if(segsArray.last.size >0)

offsetTimeArray =new

Array[(Long, Long)](segsArray.length +1)

else

offsetTimeArray =newArray[(Long, Long)](segsArray.length)

// 將 offset 與 timestamp 的對應關係新增到陣列中

for(i <-0until segsArray.length)

// 資料中的每個元素是一個二元組,(segment file 的起始 offset,segment file的最近修改時間)

offsetTimeArray(i) = (segsArray(i).start, segsArray(i).messageSet.file.lastModified)

if(segsArray.last.size >0)

// 如果最近一個 segment file 不為空,將(最近的 offset, 當前之間)也新增到該陣列中

offsetTimeArray(segsArray.length) = (logEndOffset, time.milliseconds)

通過這段邏輯,獲的一個數據 offsetTimeArray,每個元素是一個二元組,二元組內容是(offset, timestamp)

2、找到最近的最後一個滿足 timestamp < target_timestamp 的 index。

var startIndex = -1

timestamp match {

// 需要查詢的 timestamp 是 -1 或者 -2時,特殊處理

caseOffsetRequest.LatestTime =>  // OffsetRequest.LatestTime = -1

startIndex = offsetTimeArray.length -1

caseOffsetRequest.EarliestTime => // OffsetRequest.EarliestTime = -2

startIndex =0

case_ =>

var isFound =false

debug("Offset time array = "+ offsetTimeArray.foreach(o =>"%d, %d".format(o._1, o._2)))

startIndex = offsetTimeArray.length -1// 從最後一個元素反向找

while(startIndex >=0&& !isFound) {    // 找到滿足條件或者

if(offsetTimeArray(startIndex)._2 <= timestamp)  // offsetTimeArray 的每個元素是二元組,第二個位置是 timestamp

isFound =true

      else

startIndex -=1

    }

}

通過這段邏輯,實際找到的是 “最近修改時間早於目標timestamp的最近修改的segment file的起始offset”

但是獲取offset的邏輯並沒有結束,後續仍有處理

3、找到滿足該條件的offset陣列

實際上這個函式的功能是找到一組offset,而不是一個offset。第二個引數 maxNumOffsets 指定最多找幾個滿足條件的 offset。

獲取一組offset的邏輯

// 返回的資料的長度 = min(maxNumOffsets, startIndex + 1),startIndex是邏輯2中找到的index

val retSize = maxNumOffsets.min(startIndex +1)

val ret = newArray[Long](retSize)

// 逐個將滿足條件的offset新增到返回的資料中

for(j <-0until retSize) {

ret(j) = offsetTimeArray(startIndex)._1

startIndex -=1

}

// 降序排序返回。offset 越大資料越新。

// ensure that the returned seq is in descending order of offsets

ret.toSeq.sortBy(- _)

最終返回這個陣列

3、注意事項

  • 實際找到的offset並不是從目標timestamp開始的第一個offset。需要注意
  • 當 timestamp 小於最老的資料檔案的最近修改時間時,返回值是一個空陣列。可能會導致使用時的問題。
  • 調整segment file檔案拆分策略的配置時,需要注意可能會造成的影響。
引用公司董董wiki分享