Kafka 原始碼分析之LogSegment
阿新 • • 發佈:2018-12-01
這裡分析kafka LogSegment原始碼
通過一步步分析LogManager,Log原始碼之後就會發現,最終的log操作都在LogSegment上實現.LogSegment負責分片的讀寫恢復重新整理刪除等動作都在這裡實現.LogSegment程式碼同樣在原始碼目錄log下.
LogSegment是一個日誌分片的操作最小單元.直接作用與messages之上.負責實體訊息的讀寫追加等等.
LogSegment實際上是FileMessageSet類的代理類.LogSegment中的所有最終處理都在FileMessageSet類中實現.FileMessageSet類的最終操作建立在ByteBufferMessageSet這個訊息實體類的基礎上.通過操作FileChannel物件來實現訊息讀寫.
下面來看看主要的一些函式方法.
初始化部分
class LogSegment(val log: FileMessageSet, //實際構造是這個. val index: OffsetIndex, val baseOffset: Long, val indexIntervalBytes: Int, val rollJitterMs: Long, time: Time) extends Logging { var created = time.milliseconds /* the number of bytes since we last added an entry in the offset index */ private var bytesSinceLastIndexEntry = 0 //在Log中被呼叫的構造是這個.可以看見是通過topicAndPartition路徑和startOffset來建立index和logfile的. def this(dir: File, startOffset: Long, indexIntervalBytes: Int, maxIndexSize: Int, rollJitterMs: Long, time: Time) = this(new FileMessageSet(file = Log.logFilename(dir, startOffset)), new OffsetIndex(file = Log.indexFilename(dir, startOffset), baseOffset = startOffset, maxIndexSize = maxIndexSize), startOffset, indexIntervalBytes, rollJitterMs, time)
新增訊息函式append
def append(offset: Long, messages: ByteBufferMessageSet) { if (messages.sizeInBytes > 0) { //判斷訊息不為空. trace("Inserting %d bytes at offset %d at position %d".format(messages.sizeInBytes, offset, log.sizeInBytes())) // append an entry to the index (if needed) if(bytesSinceLastIndexEntry > indexIntervalBytes) { index.append(offset, log.sizeInBytes()) this.bytesSinceLastIndexEntry = 0 } // append the messages log.append(messages) //呼叫FileMessageSet類的append方法想寫訊息.實際上最終呼叫的是ByteBufferMessageSet類方法來操作訊息實體的. this.bytesSinceLastIndexEntry += messages.sizeInBytes } }
重新整理訊息到磁碟的flush函式
def flush() { LogFlushStats.logFlushTimer.time { log.flush() //可以看見呼叫的FileMessageSet類的方法.最終FileMessageSet.flush方法呼叫channel.force方法重新整理儲存裝置. index.flush() //同上. } }
讀取訊息的read函式
def read(startOffset: Long, maxOffset: Option[Long], maxSize: Int): FetchDataInfo = { if(maxSize < 0) throw new IllegalArgumentException("Invalid max size for log read (%d)".format(maxSize)) val logSize = log.sizeInBytes // this may change, need to save a consistent copy val startPosition = translateOffset(startOffset) //獲取對應offset的讀取點位置. // if the start position is already off the end of the log, return null if(startPosition == null) //沒有讀取點位置則返回空 return null val offsetMetadata = new LogOffsetMetadata(startOffset, this.baseOffset, startPosition.position) //定義offsetMetadata // if the size is zero, still return a log segment but with zero size if(maxSize == 0) //最大讀取尺寸是0的話.返回空訊息. return FetchDataInfo(offsetMetadata, MessageSet.Empty) // calculate the length of the message set to read based on whether or not they gave us a maxOffset val length = //計算最大讀取的訊息總長度. maxOffset match { case None => //未設定maxoffset則使用maxsize. // no max offset, just use the max size they gave unmolested maxSize case Some(offset) => { //如果設定了Maxoffset,則計算對應的訊息長度. // there is a max offset, translate it to a file position and use that to calculate the max read size if(offset < startOffset) //maxoffset小於startoffset則返回異常 throw new IllegalArgumentException("Attempt to read with a maximum offset (%d) less than the start offset (%d).".format(offset, startOffset)) val mapping = translateOffset(offset, startPosition.position) //獲取相對maxoffset讀取點. val endPosition = if(mapping == null) logSize // the max offset is off the end of the log, use the end of the file else mapping.position min(endPosition - startPosition.position, maxSize) //用maxoffset讀取點減去開始的讀取點.獲取需要讀取的資料長度.如果長度比maxsize大則返回maxsize } } FetchDataInfo(offsetMetadata, log.read(startPosition.position, length)) //使用FileMessageSet.read讀取相應長度的資料返回FetchDataInfo的封裝物件. }
讀取函式通過對映offset到讀取長度.來讀取多個offset.
private[log] def translateOffset(offset: Long, startingFilePosition: Int = 0): OffsetPosition = { //用來將offset對映到讀取指標位置的函式. val mapping = index.lookup(offset) //通過查詢index獲取對應的指標物件. log.searchFor(offset, max(mapping.position, startingFilePosition)) //通過FileMessageSet獲取對應的指標位置. }
recover函式.kafka啟動檢查時用到的各層呼叫的最後代理函式.
def recover(maxMessageSize: Int): Int = { index.truncate() index.resize(index.maxIndexSize) var validBytes = 0 var lastIndexEntry = 0 val iter = log.iterator(maxMessageSize) try { while(iter.hasNext) { val entry = iter.next entry.message.ensureValid() if(validBytes - lastIndexEntry > indexIntervalBytes) { // we need to decompress the message, if required, to get the offset of the first uncompressed message val startOffset = entry.message.compressionCodec match { case NoCompressionCodec => entry.offset case _ => ByteBufferMessageSet.decompress(entry.message).head.offset } index.append(startOffset, validBytes) lastIndexEntry = validBytes } validBytes += MessageSet.entrySize(entry.message) } } catch { case e: InvalidMessageException => logger.warn("Found invalid messages in log segment %s at byte offset %d: %s.".format(log.file.getAbsolutePath, validBytes, e.getMessage)) } val truncated = log.sizeInBytes - validBytes log.truncateTo(validBytes) index.trimToValidSize() truncated }
分片刪除函式
def delete() { val deletedLog = log.delete() //最終是刪除檔案,關閉記憶體陣列.在FileMessageSet裡實現. val deletedIndex = index.delete() //同上. if(!deletedLog && log.file.exists) throw new KafkaStorageException("Delete of log " + log.file.getName + " failed.") if(!deletedIndex && index.file.exists) throw new KafkaStorageException("Delete of index " + index.file.getName + " failed.") }
到這裡LogSegment主要函式都分析完了.