1. 程式人生 > >Kafka原始碼解析(二)---Log分析

Kafka原始碼解析(二)---Log分析

上一篇文章講了LogSegment和Log的初始化,這篇來講講Log的主要操作有哪些。 一般來說Log 的常見操作分為 4 大部分。 1. 高水位管理操作 2. 日誌段管理 3. 關鍵位移值管理 4. 讀寫操作 其中關鍵位移值管理主要包含Log Start Offset 和 LEO等。 ## 高水位HighWatermark ### 高水位HighWatermark初始化 高水位是通過LogOffsetMetadata類來定義的: ```scala @volatile private var highWatermarkMetadata: LogOffsetMetadata = LogOffsetMetadata(logStartOffset) ``` 這裡傳入的初始值是logStartOffset,表明當首次構建高水位時,它會被賦值成 Log Start Offset 值。 我們再來看看LogOffsetMetadata類: ```scala case class LogOffsetMetadata(messageOffset: Long, segmentBaseOffset: Long = Log.UnknownOffset, relativePositionInSegment: Int = LogOffsetMetadata.UnknownFilePosition) { // check if this offset is already on an older segment compared with the given offset def onOlderSegment(that: LogOffsetMetadata): Boolean = { if (messageOffsetOnly) throw new KafkaException(s"$this cannot compare its segment info with $that since it only has message offset info") this.segmentBaseOffset < that.segmentBaseOffset } ... } ``` LogOffsetMetadata有三個初始值: messageOffset表示訊息位移值; segmentBaseOffset儲存訊息位移值所在日誌段的起始位移,用來判斷兩條訊息是否處於同一個日誌段的; relativePositionSegment儲存訊息位移值所在日誌段的物理磁碟位置; 上面的onOlderSegment表明,要比較哪個日誌段更老,只需要比較segmentBaseOffset的大小就可以了。 ### 高水位HighWatermark設值與更新 ```scala private def updateHighWatermarkMetadata(newHighWatermark: LogOffsetMetadata): Unit = { //高水位的值不可能小於零 if (newHighWatermark.messageOffset < 0) throw new IllegalArgumentException("High watermark offset should be non-negative") lock synchronized {// 保護Log物件修改的Monitor鎖 highWatermarkMetadata = newHighWatermark// 賦值新的高水位值 //事務相關,暫時忽略 producerStateManager.onHighWatermarkUpdated(newHighWatermark.messageOffset) //事務相關,暫時忽略 maybeIncrementFirstUnstableOffset() } trace(s"Setting high watermark $newHighWatermark") } ``` 設定高水位的值是很簡單的,首先校驗高水位的值是否大於零,然後通過直接加鎖之後更新高水位的值。 更新更新高水位值的方法有兩個:updateHighWatermark 和 maybeIncrementHighWatermark,我們分別分析。 **updateHighWatermark** ```scala def updateHighWatermark(hw: Long): Long = { //傳入的高水位的值如果小於logStartOffset,設定為logStartOffset val newHighWatermark = if (hw < logStartOffset) logStartOffset // 傳入的高水位的值如果大於LEO,那麼設定為LEO else if (hw > logEndOffset) logEndOffset else hw //將newHighWatermark封裝成一個LogOffsetMetadata然後更新高水位的值 updateHighWatermarkMetadata(LogOffsetMetadata(newHighWatermark)) //返回新的高水位的值 newHighWatermark } ``` 這個方法邏輯也很簡潔,因為高水位的值是不可能大於LEO,也不可能小於logStartOffset,所以需要對傳入的hw校驗然後設定成正確的值,然後呼叫上面的設定高水位的方法設值。 **maybeIncrementHighWatermark** ```scala /** * Update the high watermark to a new value if and only if it is larger than the old value. It is * an error to update to a value which is larger than the log end offset. * * This method is intended to be used by the leader to update the high watermark after follower * fetch offsets have been updated. * * @return the old high watermark, if updated by the new value */ // 當新的高水位的值大於舊的高水位的值時才做更新,如果新的高水位的值大於LEO,會報錯 // 這個方法是leader在確認Follower已經拉取了日誌之後才做更新 def maybeIncrementHighWatermark(newHighWatermark: LogOffsetMetadata): Option[LogOffsetMetadata] = { //如果新的高水位的值大於LEO,會報錯 if (newHighWatermark.messageOffset >
logEndOffset) throw new IllegalArgumentException(s"High watermark $newHighWatermark update exceeds current " + s"log end offset $logEndOffsetMetadata") lock.synchronized { // 獲取老的高水位值 val oldHighWatermark = fetchHighWatermarkMetadata // Ensure that the high watermark increases monotonically. We also update the high watermark when the new // offset metadata is on a newer segment, which occurs whenever the log is rolled to a new segment. //只有當新的高水位值大於老的值,因為要維護高水位的單調遞增性 //或者當新的高水位值和老的高水位值相等,但是新的高水位在一個新的日誌段上面時才做更新 if (oldHighWatermark.messageOffset < newHighWatermark.messageOffset || (oldHighWatermark.messageOffset == newHighWatermark.messageOffset && oldHighWatermark.onOlderSegment(newHighWatermark))) { updateHighWatermarkMetadata(newHighWatermark) Some(oldHighWatermark)// 返回老的高水位值 } else { None } } } ``` 這個方法我將這個方法的英文註釋貼出來了,這個註釋的說明我也寫到方法上了,邏輯很清楚,大家看看註釋應該能理解。 這兩個方法主要的區別是,updateHighWatermark 方法,主要用在 Follower 副本從 Leader 副本獲取到訊息後更新高水位值。而 maybeIncrementHighWatermark 方法,主要是用來更新 Leader 副本的高水位值。 上面的方法中通過呼叫fetchHighWatermarkMetadata來獲取高水位的值,我們下面看看這個方法: **fetchHighWatermarkMetadata** ```scala private def fetchHighWatermarkMetadata: LogOffsetMetadata = { // 讀取時確保日誌不能被關閉 checkIfMemoryMappedBufferClosed() val offsetMetadata = highWatermarkMetadata if (offsetMetadata.messageOffsetOnly) {//沒有獲得到完整的高水位元資料 lock.synchronized { // 通過讀日誌檔案的方式把完整的高水位元資料資訊拉出來 val fullOffset = convertToOffsetMetadataOrThrow(highWatermark) updateHighWatermarkMetadata(fullOffset) fullOffset } } else { offsetMetadata } } private def convertToOffsetMetadataOrThrow(offset: Long): LogOffsetMetadata = { //通過給的offset,去日誌檔案中找到相應的日誌資訊 val fetchDataInfo = read(offset, maxLength = 1, isolation = FetchLogEnd, minOneMessage = false) fetchDataInfo.fetchOffsetMetadata } ``` 然後我們提前看一下日誌的read方法,是如何根據索引讀取資料的: ## 日誌段操作 ### 日誌讀取操作 **read** ```scala def read(startOffset: Long, maxLength: Int, isolation: FetchIsolation, minOneMessage: Boolean): FetchDataInfo = { maybeHandleIOException(s"Exception while reading from $topicPartition in dir ${dir.getParent}") { trace(s"Reading $maxLength bytes from offset $startOffset of length $size bytes") //convertToOffsetMetadataOrThrow傳進來是FetchLogEnd,所以這裡是false val includeAbortedTxns = isolation == FetchTxnCommitted // 由於沒有使用鎖,所以使用變數快取當前的nextOffsetMetadata狀態 val endOffsetMetadata = nextOffsetMetadata val endOffset = endOffsetMetadata.messageOffset // 到日欄位中根據索引尋找最近的日誌段 var segmentEntry = segments.floorEntry(startOffset) // return error on attempt to read beyond the log end offset or read below log start offset // 這裡給出了幾種異常場景: // 1. 給的日誌索引大於最大值; // 2. 通過索引找的日誌段為空; // 3. 給的日誌索引小於logStartOffset if (startOffset >
endOffset || segmentEntry == null || startOffset < logStartOffset) throw new OffsetOutOfRangeException(s"Received request for offset $startOffset for partition $topicPartition, " + s"but we only have log segments in the range $logStartOffset to $endOffset.") //convertToOffsetMetadataOrThrow傳進來是FetchLogEnd,所以最大值是endOffsetMetadata // 檢視一下讀取隔離級別設定。 // 普通消費者能夠看到[Log Start Offset, LEO)之間的訊息 // 事務型消費者只能看到[Log Start Offset, Log Stable Offset]之間的訊息。Log Stable Offset(LSO)是比LEO值小的位移值,為Kafka事務使用 // Follower副本消費者能夠看到[Log Start Offset,高水位值]之間的訊息 val maxOffsetMetadata = isolation match { case FetchLogEnd => endOffsetMetadata case FetchHighWatermark => fetchHighWatermarkMetadata case FetchTxnCommitted => fetchLastStableOffsetMetadata } //如果尋找的索引等於maxOffsetMetadata,那麼直接返回 if (startOffset == maxOffsetMetadata.messageOffset) { return emptyFetchDataInfo(maxOffsetMetadata, includeAbortedTxns) //如果尋找的索引大於maxOffsetMetadata,返回空的訊息集合,因為沒法讀取任何訊息 } else if (startOffset >
maxOffsetMetadata.messageOffset) { val startOffsetMetadata = convertToOffsetMetadataOrThrow(startOffset) return emptyFetchDataInfo(startOffsetMetadata, includeAbortedTxns) } // 開始遍歷日誌段物件,直到讀出東西來或者讀到日誌末尾 while (segmentEntry != null) { val segment = segmentEntry.getValue // 找到日誌段中最大的日誌位移 val maxPosition = { if (maxOffsetMetadata.segmentBaseOffset == segment.baseOffset) { maxOffsetMetadata.relativePositionInSegment } else { segment.size } } // 根據位移資訊從日誌段中讀取日誌資訊 val fetchInfo = segment.read(startOffset, maxLength, maxPosition, minOneMessage) // 如果找不到日誌資訊,那麼去日誌段集合中找更大的日誌位移的日誌段 if (fetchInfo == null) { segmentEntry = segments.higherEntry(segmentEntry.getKey) } else { return if (includeAbortedTxns) addAbortedTransactions(startOffset, segmentEntry, fetchInfo) else fetchInfo } } //找了所有日誌段的位移依然找不到,這可能是因為大於指定的日誌位移的訊息都被刪除了,這種情況返回空 FetchDataInfo(nextOffsetMetadata, MemoryRecords.EMPTY) } } ``` read方法,有四個引數,分別是: * startOffset:讀取的日誌索引位置。 * maxLength:讀取資料量長度。 * isolation:隔離級別,多用於 Kafka 事務。 * minOneMessage:是否至少返回一條訊息。設想如果訊息很大,超過了 maxLength,正常情況下 read 方法永遠不會返回任何訊息。但如果設定了該引數為 true,read 方法就保證至少能夠返回一條訊息。 程式碼中使用了segments,來根據位移查詢日誌段: ```scala private val segments: ConcurrentNavigableMap[java.lang.Long, LogSegment] = new ConcurrentSkipListMap[java.lang.Long, LogSegment] ``` 我們下面看看read方法具體做了哪些事: 1. 由於沒有使用鎖,所以使用變數快取當前的nextOffsetMetadata狀態,作為最大索引LEO; 2. 去日誌段集合裡尋找小於或等於指定索引的日誌段; 3. 校驗異常情況: 1. startOffset是不是超過了LEO; 2. 是不是日誌段集合裡沒有索引小於startOffset; 3. startOffset小於Log Start Offset; 4. 接下來獲取一下隔離級別; 5. 如果尋找的索引等於LEO,那麼返回空; 6. 如果尋找的索引大於LEO,返回空的訊息集合,因為沒法讀取任何訊息; 7. 開始遍歷日誌段物件,直到讀出東西來或者讀到日誌末尾; 1. 首先找到日誌段中最大的位置; 2. 根據位移資訊從日誌段中讀取日誌資訊(這個read方法我們上一篇已經講解過了); 3. 如果找不到日誌資訊,那麼讀取日誌段集合中下一個日誌段; 8. 找了所有日誌段的位移依然找不到,這可能是因為大於指定的日誌位移的訊息都被刪除了,這種情況返回空; 我們在上面的read操作中可以看到,使用了segments來查詢日誌。我們主要看看刪除操作 ### 刪除日誌 刪除日誌的入口是:**deleteOldSegments** ```scala // 如果topic deletion開關是開啟的,那麼會刪去過期的日誌段以及超過設定保留日誌大小的日誌 // 無論是否開啟刪除規則,都會刪除在log start offset之前的日誌段 def deleteOldSegments(): Int = { if (config.delete) { deleteRetentionMsBreachedSegments() + deleteRetentionSizeBreachedSegments() + deleteLogStartOffsetBreachedSegments() } else { deleteLogStartOffsetBreachedSegments() } } ``` deleteOldSegments方法會判斷是否開啟刪除規則,如果開啟,那麼會分別呼叫: deleteRetentionMsBreachedSegments刪除segment的時間戳超過了設定時間的日誌段; deleteRetentionSizeBreachedSegments刪除日誌段空間超過設定空間大小的日誌段; deleteLogStartOffsetBreachedSegments刪除日誌段的baseOffset小於logStartOffset的日誌段; 我這裡列舉一下這三個方法主要是怎麼實現的: ```scala private def deleteRetentionMsBreachedSegments(): Int = { if (config.retentionMs < 0) return 0 val startMs = time.milliseconds //呼叫deleteOldSegments方法,並傳入匿名函式,判斷當前的segment的時間戳是否超過了設定時間 deleteOldSegments((segment, _) => startMs - segment.largestTimestamp > config.retentionMs, reason = s"retention time ${config.retentionMs}ms breach") } private def deleteRetentionSizeBreachedSegments(): Int = { if (config.retentionSize < 0 || size < config.retentionSize) return 0 var diff = size - config.retentionSize //判斷日誌段空間是否超過設定空間大小 //shouldDelete函式會將傳入的日誌段去減diff,直到小於等於零 def shouldDelete(segment: LogSegment, nextSegmentOpt: Option[LogSegment]) = { if (diff - segment.size >= 0) { diff -= segment.size true } else { false } } deleteOldSegments(shouldDelete, reason = s"retention size in bytes ${config.retentionSize} breach") } private def deleteLogStartOffsetBreachedSegments(): Int = { //shouldDelete函式主要判斷日誌段的baseOffset是否小於logStartOffset def shouldDelete(segment: LogSegment, nextSegmentOpt: Option[LogSegment]) = nextSegmentOpt.exists(_.baseOffset <= logStartOffset) deleteOldSegments(shouldDelete, reason = s"log start offset $logStartOffset breach") } ``` 這種寫程式碼的方式非常的靈活,通過不同方法設定不同的函式來實現程式碼複用的目的,最後都是通過呼叫deleteOldSegments來實現刪除日誌段的目的。 下面我們來看一下deleteOldSegments的操作: **deleteOldSegments** 這個deleteOldSegments方法和上面的入口方法傳入的引數是不一致的,這個方法傳入了一個predicate函式,用於判斷哪些日誌段是可以被刪除的,reason用來說明被刪除的原因。 ```scala private def deleteOldSegments(predicate: (LogSegment, Option[LogSegment]) => Boolean, reason: String): Int = { //刪除任何匹配到predicate規則的日誌段 lock synchronized { val deletable = deletableSegments(predicate) if (deletable.nonEmpty) info(s"Found deletable segments with base offsets [${deletable.map(_.baseOffset).mkString(",")}] due to $reason") deleteSegments(deletable) } } ``` 這個方法呼叫了兩個主要的方法,一個是deletableSegments,用於獲取可以被刪除的日誌段的集合;deleteSegments用於刪除日誌段。 **deletableSegments** ```scala private def deletableSegments(predicate: (LogSegment, Option[LogSegment]) => Boolean): Iterable[LogSegment] = { //如果日誌段是空的,那麼直接返回 if (segments.isEmpty) { Seq.empty } else { val deletable = ArrayBuffer.empty[LogSegment] var segmentEntry = segments.firstEntry //如果日誌段集合不為空,找到第一個日誌段 while (segmentEntry != null) { val segment = segmentEntry.getValue //獲取下一個日誌段 val nextSegmentEntry = segments.higherEntry(segmentEntry.getKey) val (nextSegment, upperBoundOffset, isLastSegmentAndEmpty) = if (nextSegmentEntry != null) (nextSegmentEntry.getValue, nextSegmentEntry.getValue.baseOffset, false) else (null, logEndOffset, segment.size == 0) //如果下一個日誌段的位移沒有大於或等於HW,並且日誌段是匹配predicate函式的,下一個日誌段也不是空的 //那麼將這個日誌段放入可刪除集合中,然後遍歷下一個日誌段 if (highWatermark >= upperBoundOffset && predicate(segment, Option(nextSegment)) && !isLastSegmentAndEmpty) { deletable += segment segmentEntry = nextSegmentEntry } else { segmentEntry = null } } deletable } } ``` 這個方法邏輯十分清晰,主要做了如下幾件事: 1. 判斷日誌段集合是否為空,為空那麼直接返回空集合; 2. 如果日誌段集合不為空,那麼從日誌段集合的第一個日誌段開始遍歷; 3. 判斷當前被遍歷日誌段是否能夠被刪除 1. 日誌段的下一個日誌段的位移有沒有大於或等於HW; 2. 日誌段是否能夠通過predicate函式校驗; 3. 日誌段是否是最後一個日誌段; 4. 將符合條件的日誌段都加入到deletable集合中,並返回。 接下來呼叫deleteSegments函式: ```scala private def deleteSegments(deletable: Iterable[LogSegment]): Int = { maybeHandleIOException(s"Error while deleting segments for $topicPartition in dir ${dir.getParent}") { val numToDelete = deletable.size if (numToDelete > 0) { // we must always have at least one segment, so if we are going to delete all the segments, create a new one first // 我們至少保證要存在一個日誌段,如果要刪除所有的日誌; //所以呼叫roll方法建立一個全新的日誌段物件,並且關閉當前寫入的日誌段物件; if (segments.size == numToDelete) roll() lock synchronized { // 確保Log物件沒有被關閉 checkIfMemoryMappedBufferClosed() // remove the segments for lookups // 刪除給定的日誌段物件以及底層的物理檔案 removeAndDeleteSegments(deletable, asyncDelete = true) // 嘗試更新日誌的Log Start Offset值 maybeIncrementLogStartOffset(segments.firstEntry.getValue.baseOffset) } } numToDelete } } ``` ### 寫日誌 寫日誌的方法主要有兩個: **appendAsLeader** ```scala def appendAsLeader(records: MemoryRecords, leaderEpoch: Int, isFromClient: Boolean = true, interBrokerProtocolVersion: ApiVersion = ApiVersion.latestVersion): LogAppendInfo = { append(records, isFromClient, interBrokerProtocolVersion, assignOffsets = true, leaderEpoch) } ``` **appendAsFollower** ```scala def appendAsFollower(records: MemoryRecords): LogAppendInfo = { append(records, isFromClient = false, interBrokerProtocolVersion = ApiVersion.latestVersion, assignOffsets = false, leaderEpoch = -1) } ``` appendAsLeader 是用於寫 Leader 副本的,appendAsFollower 是用於 Follower 副本同步的。它們的底層都呼叫了 append 方法 **append** ```scala private def append(records: MemoryRecords, isFromClient: Boolean, interBrokerProtocolVersion: ApiVersion, assignOffsets: Boolean, leaderEpoch: Int): LogAppendInfo = { maybeHandleIOException(s"Error while appending records to $topicPartition in dir ${dir.getParent}") { // 第1步:分析和驗證待寫入訊息集合,並返回校驗結果 val appendInfo = analyzeAndValidateRecords(records, isFromClient = isFromClient) // return if we have no valid messages or if this is a duplicate of the last appended entry // 如果壓根就不需要寫入任何訊息,直接返回即可 if (appendInfo.shallowCount == 0) return appendInfo // trim any invalid bytes or partial messages before appending it to the on-disk log // 第2步:訊息格式規整,即刪除無效格式訊息或無效位元組 var validRecords = trimInvalidBytes(records, appendInfo) // they are valid, insert them in the log lock synchronized { // 確保Log物件未關閉 checkIfMemoryMappedBufferClosed() //需要分配位移值 if (assignOffsets) { // assign offsets to the message set // 第3步:使用當前LEO值作為待寫入訊息集合中第一條訊息的位移值,nextOffsetMetadata為LEO值 val offset = new LongRef(nextOffsetMetadata.messageOffset) appendInfo.firstOffset = Some(offset.value) val now = time.milliseconds val validateAndOffsetAssignResult = try { LogValidator.validateMessagesAndAssignOffsets(validRecords, topicPartition, offset, time, now, appendInfo.sourceCodec, appendInfo.targetCodec, config.compact, config.messageFormatVersion.recordVersion.value, config.messageTimestampType, config.messageTimestampDifferenceMaxMs, leaderEpoch, isFromClient, interBrokerProtocolVersion, brokerTopicStats) } catch { case e: IOException => throw new KafkaException(s"Error validating messages while appending to log $name", e) } // 更新校驗結果物件類LogAppendInfo validRecords = validateAndOffsetAssignResult.validatedRecords appendInfo.maxTimestamp = validateAndOffsetAssignResult.maxTimestamp appendInfo.offsetOfMaxTimestamp = validateAndOffsetAssignResult.shallowOffsetOfMaxTimestamp appendInfo.lastOffset = offset.value - 1 appendInfo.recordConversionStats = validateAndOffsetAssignResult.recordConversionStats if (config.messageTimestampType == TimestampType.LOG_APPEND_TIME) appendInfo.logAppendTime = now // re-validate message sizes if there's a possibility that they have changed (due to re-compression or message // format conversion) // 第4步:驗證訊息,確保訊息大小不超限 if (validateAndOffsetAssignResult.messageSizeMaybeChanged) { for (batch <- validRecords.batches.asScala) { if (batch.sizeInBytes > config.maxMessageSize) { // we record the original message set size instead of the trimmed size // to be consistent with pre-compression bytesRejectedRate recording brokerTopicStats.topicStats(topicPartition.topic).bytesRejectedRate.mark(records.sizeInBytes) brokerTopicStats.allTopicsStats.bytesRejectedRate.mark(records.sizeInBytes) throw new RecordTooLargeException(s"Message batch size is ${batch.sizeInBytes} bytes in append to" + s"partition $topicPartition which exceeds the maximum configured size of ${config.maxMessageSize}.") } } } // 直接使用給定的位移值,無需自己分配位移值 } else { // we are taking the offsets we are given if (!appendInfo.offsetsMonotonic)// 確保訊息位移值的單調遞增性 throw new OffsetsOutOfOrderException(s"Out of order offsets found in append to $topicPartition: " + records.records.asScala.map(_.offset)) if (appendInfo.firstOrLastOffsetOfFirstBatch < nextOffsetMetadata.messageOffset) { // we may still be able to recover if the log is empty // one example: fetching from log start offset on the leader which is not batch aligned, // which may happen as a result of AdminClient#deleteRecords() val firstOffset = appendInfo.firstOffset match { case Some(offset) => offset case None => records.batches.asScala.head.baseOffset() } val firstOrLast = if (appendInfo.firstOffset.isDefined) "First offset" else "Last offset of the first batch" throw new UnexpectedAppendOffsetException( s"Unexpected offset in append to $topicPartition. $firstOrLast " + s"${appendInfo.firstOrLastOffsetOfFirstBatch} is less than the next offset ${nextOffsetMetadata.messageOffset}. " + s"First 10 offsets in append: ${records.records.asScala.take(10).map(_.offset)}, last offset in" + s" append: ${appendInfo.lastOffset}. Log start offset = $logStartOffset", firstOffset, appendInfo.lastOffset) } } // update the epoch cache with the epoch stamped onto the message by the leader // 第5步:更新Leader Epoch快取 validRecords.batches.asScala.foreach { batch => if (batch.magic >= RecordBatch.MAGIC_VALUE_V2) { maybeAssignEpochStartOffset(batch.partitionLeaderEpoch, batch.baseOffset) } else { // In partial upgrade scenarios, we may get a temporary regression to the message format. In // order to ensure the safety of leader election, we clear the epoch cache so that we revert // to truncation by high watermark after the next leader election. leaderEpochCache.filter(_.nonEmpty).foreach { cache => warn(s"Clearing leader epoch cache after unexpected append with message format v${batch.magic}") cache.clearAndFlush() } } } // check messages set size may be exceed config.segmentSize // 第6步:確保訊息大小不超限 if (validRecords.sizeInBytes > config.segmentSize) { throw new RecordBatchTooLargeException(s"Message batch size is ${validRecords.sizeInBytes} bytes in append " + s"to partition $topicPartition, which exceeds the maximum configured segment size of ${config.segmentSize}.") } // maybe roll the log if this segment is full // 第7步:執行日誌切分。當前日誌段剩餘容量可能無法容納新訊息集合,因此有必要建立一個新的日誌段來儲存待寫入的所有訊息 //下面情況將會執行日誌切分: //logSegment 已經滿了 //日誌段中的第一個訊息的maxTime已經過期 //index索引滿了 val segment = maybeRoll(validRecords.sizeInBytes, appendInfo) val logOffsetMetadata = LogOffsetMetadata( messageOffset = appendInfo.firstOrLastOffsetOfFirstBatch, segmentBaseOffset = segment.baseOffset, relativePositionInSegment = segment.size) // now that we have valid records, offsets assigned, and timestamps updated, we need to // validate the idempotent/transactional state of the producers and collect some metadata // 第8步:驗證事務狀態 val (updatedProducers, completedTxns, maybeDuplicate) = analyzeAndValidateProducerState( logOffsetMetadata, validRecords, isFromClient) maybeDuplicate.foreach { duplicate => appendInfo.firstOffset = Some(duplicate.firstOffset) appendInfo.lastOffset = duplicate.lastOffset appendInfo.logAppendTime = duplicate.timestamp appendInfo.logStartOffset = logStartOffset return appendInfo } // 第9步:執行真正的訊息寫入操作,主要呼叫日誌段物件的append方法實現 segment.append(largestOffset = appendInfo.lastOffset, largestTimestamp = appendInfo.maxTimestamp, shallowOffsetOfMaxTimestamp = appendInfo.offsetOfMaxTimestamp, records = validRecords) // Increment the log end offset. We do this immediately after the append because a // write to the transaction index below may fail and we want to ensure that the offsets // of future appends still grow monotonically. The resulting transaction index inconsistency // will be cleaned up after the log directory is recovered. Note that the end offset of the // ProducerStateManager will not be updated and the last stable offset will not advance // if the append to the transaction index fails. // 第10步:更新LEO物件,其中,LEO值是訊息集合中最後一條訊息位移值+1 // 前面說過,LEO值永遠指向下一條不存在的訊息 updateLogEndOffset(appendInfo.lastOffset + 1) // update the producer state // 第11步:更新事務狀態 for (producerAppendInfo <- updatedProducers.values) { producerStateManager.update(producerAppendInfo) } // update the transaction index with the true last stable offset. The last offset visible // to consumers using READ_COMMITTED will be limited by this value and the high watermark. for (completedTxn <- completedTxns) { val lastStableOffset = producerStateManager.lastStableOffset(completedTxn) segment.updateTxnIndex(completedTxn, lastStableOffset) producerStateManager.completeTxn(completedTxn) } // always update the last producer id map offset so that the snapshot reflects the current offset // even if there isn't any idempotent data being written producerStateManager.updateMapEndOffset(appendInfo.lastOffset + 1) // update the first unstable offset (which is used to compute LSO) maybeIncrementFirstUnstableOffset() trace(s"Appended message set with last offset: ${appendInfo.lastOffset}, " + s"first offset: ${appendInfo.firstOffset}, " + s"next offset: ${nextOffsetMetadata.messageOffset}, " + s"and messages: $validRecords") // 是否需要手動落盤。一般情況下我們不需要設定Broker端引數log.flush.interval.messages // 落盤操作交由作業系統來完成。但某些情況下,可以設定該引數來確保高可靠性 if (unflushedMessages >= config.flushInterval) flush() // 第12步:返回寫入結果 appendInfo } } } ``` 上面程式碼的主要步驟如下: ![image-20200621142323971](https://img.luozhiyun.com/blog20200621160217.png) 我們下面看看analyzeAndValidateRecords是如何進行訊息校驗的: **analyzeAndValidateRecords** ```scala private def analyzeAndValidateRecords(records: MemoryRecords, isFromClient: Boolean): LogAppendInfo = { var shallowMessageCount = 0 var validBytesCount = 0 var firstOffset: Option[Long] = None var lastOffset = -1L var sourceCodec: CompressionCodec = NoCompressionCodec var monotonic = true var maxTimestamp = RecordBatch.NO_TIMESTAMP var offsetOfMaxTimestamp = -1L var readFirstMessage = false var lastOffsetOfFirstBatch = -1L for (batch <- records.batches.asScala) { // we only validate V2 and higher to avoid potential compatibility issues with older clients // 訊息格式Version 2的訊息批次,起始位移值必須從0開始 if (batch.magic >= RecordBatch.MAGIC_VALUE_V2 && isFromClient && batch.baseOffset != 0) throw new InvalidRecordException(s"The baseOffset of the record batch in the append to $topicPartition should " + s"be 0, but it is ${batch.baseOffset}") // update the first offset if on the first message. For magic versions older than 2, we use the last offset // to avoid the need to decompress the data (the last offset can be obtained directly from the wrapper message). // For magic version 2, we can get the first offset directly from the batch header. // When appending to the leader, we will update LogAppendInfo.baseOffset with the correct value. In the follower // case, validation will be more lenient. // Also indicate whether we have the accurate first offset or not if (!readFirstMessage) { if (batch.magic >= RecordBatch.MAGIC_VALUE_V2) firstOffset = Some(batch.baseOffset) // 更新firstOffset欄位 lastOffsetOfFirstBatch = batch.lastOffset // 更新lastOffsetOfFirstBatch欄位 readFirstMessage = true } // check that offsets are monotonically increasing // 一旦出現當前lastOffset不小於下一個batch的lastOffset,說明上一個batch中有訊息的位移值大於後面batch的訊息 // 這違反了位移值單調遞增性 if (lastOffset >= batch.lastOffset) monotonic = false // update the last offset seen // 使用當前batch最後一條訊息的位移值去更新lastOffset lastOffset = batch.lastOffset // Check if the message sizes are valid. val batchSize = batch.sizeInBytes // 檢查訊息批次總位元組數大小是否超限,即是否大於Broker端引數max.message.bytes值 if (batchSize > config.maxMessageSize) { brokerTopicStats.topicStats(topicPartition.topic).bytesRejectedRate.mark(records.sizeInBytes) brokerTopicStats.allTopicsStats.bytesRejectedRate.mark(records.sizeInBytes) throw new RecordTooLargeException(s"The record batch size in the append to $topicPartition is $batchSize bytes " + s"which exceeds the maximum configured value of ${config.maxMessageSize}.") } // check the validity of the message by checking CRC // 執行訊息批次校驗,包括格式是否正確以及CRC校驗 if (!batch.isValid) { brokerTopicStats.allTopicsStats.invalidMessageCrcRecordsPerSec.mark() throw new CorruptRecordException(s"Record is corrupt (stored crc = ${batch.checksum()}) in topic partition $topicPartition.") } // 更新maxTimestamp欄位和offsetOfMaxTimestamp if (batch.maxTimestamp > maxTimestamp) { maxTimestamp = batch.maxTimestamp offsetOfMaxTimestamp = lastOffset } // 累加訊息批次計數器以及有效位元組數,更新shallowMessageCount欄位 shallowMessageCount += 1 validBytesCount += batchSize // 從訊息批次中獲取壓縮器型別 val messageCodec = CompressionCodec.getCompressionCodec(batch.compressionType.id) if (messageCodec != NoCompressionCodec) sourceCodec = messageCodec } // Apply broker-side compression if any // 獲取Broker端設定的壓縮器型別,即Broker端引數compression.type值。 // 該引數預設值是producer,表示sourceCodec用的什麼壓縮器,targetCodec就用什麼 val targetCodec = BrokerCompressionCodec.getTargetCompressionCodec(config.compressionType, sourceCodec) // 最後生成LogAppendInfo物件並返回 LogAppendInfo(firstOffset, lastOffset, maxTimestamp, offsetOfMaxTimestamp, RecordBatch.NO_TIMESTAMP, logStartOffset, RecordConversionStats.EMPTY, sourceCodec, targetCodec, shallowMessageCount, validBytesCount, monotonic, lastOffsetOfFirstBatch)