1. 程式人生 > >Kafka follower從leader複製資料

Kafka follower從leader複製資料

Kafka處理leaderAndIsr請求中提到follower會不停地從leader那裡複製資料,這次介紹的是如何複製。

  1. follower會根據自身擁有多少個需要同步的topicPartition來建立相對應的partitionFetchState,這個東西記錄了從leader的哪個offset開始獲取資料
  2. follower會根據leader的brokerId和topicPartition經過hash計算的partitioinId來建立複製執行緒ReplicaFetcherThread
  3. ReplicaFetcherThread會根據partitionFetchState提供的資訊不停地從leader獲取資料,每次成功複製後,都會更新partitionFetchState的fetchOffset
  4. 如果fetchOffset越界(fetchOffset < leaderStartOffset or fetchOffset > leaderEndOffset),則會對follower partition log進行truncate,然後從leader處重新獲取可用的offset
  5. 每次log的增加,都可能會觸發一個新的logSegment的產生,原因可以是log or index or timeindex的容量滿了,或者是離上次append log的時間相隔太久,超過了閾值

先看ReplicaFetcherThread的執行流程:
ReplicaFetcherThread flow graph

每個topicPartition都先從leader那裡,獲取對應自己當前leaderEpoch的offset。如果獲取不到就使用自身的highWatermark,如果獲取到就取leaderEpochOffset和自己logEndOffset的最小值作為truncatePoint(截斷點),然後截斷大於這個truncatePoint的記錄,再往後的記錄複製就從這個truncatePoint開始獲取。

maybeTruncate的流程
maybeTruncate flow graph

在複製partition資料的時候,如果出現了OFFSET_OUT_OF_RANGE的錯誤,就需要重新計算有效的offset。但是這個過程有可能連續出現多次,因為如果獲取的是leader的earliest offset,在下次複製資料時,leader的startOffset可能已經因為再一次的定時清理而不存在,因此有需要重新計算有效的offset。

handleOffsetOutOfRange的流程
handleOffsetOutOfRange

Log的截斷過程指的是對於指定的targetOffset,截斷其後的記錄,一共分為3種情況:
1. logEndOffset < targetOffset,沒記錄可截斷
2. targetOffset < log的startOffset(log的第一個segment的baseOffset),意味著整個log都要刪掉,走的是truncateFullyAndStartAt流程
3. targetOffset落在log中(處於log第一個segment的baseOffset 和 log最後一個segment的endOffset的中間),則需要把baseOffset大於targetOffset的segment都刪除,然後再檢查targetOffset是否還落在最後一個segment中,如果是則還需要對最後一個segment進行相應截斷。

Log的truncateTo流程
Log.truncateTo

Log的truncateFullyAndStartAt流程
Log.truncateFullyAndStartAt

Partition的appendRecordsToFollower流程
Partition.appendRecordsToFollower

追加記錄時,需要驗證記錄,可能需要分派offset(對於非複製的請求),如果記錄的版本和當前節點支援的最新版本不一致,還需要進行格式轉換。
另外,需要檢查當前啟用的segment(log的最後一個segment)是否能容納這批記錄,取決於以下幾個條件:
1. 距離上次追加記錄的時間超過了配置的時間
2. 啟用的segment的size+記錄的size超過了segment配置的最大size
3. segment的index,timeindex容量滿了(預設為10M,追加過程中需要插入index和timeindex)
4. 相對offset溢位(記錄的最大offset - segment的baseOffset),超過了Integer.MAX_VALUE(index中的offset是相對offset,其型別為int)

Log的append流程
Log.append

LogValidator的validateMessageAndAssignOffsets流程
LogValidator.validateMessageAndAssignOffsets

MemoryRecordsBuilder用於給記錄重新分派offset,併兼容遺留格式的記錄,內含了一塊ByteBuffer,其大小為記錄的size的大概值(對於沒有壓縮的記錄是準確值,對於壓縮了的是大概值)

MemoryRecordsBuilder的appendWithOffset流程
MemoryRecordsBuilder.appendWithOffset

計算所有記錄中的maxTimestamp和offsetOfMaxTimestamp

MemoryRecordsBuilder的get info流程
MemoryRecordsBuilder.info

roll會生成新的segment,並對log的其他segment進行flush。

Log的roll segment流程
Log.roll