1. 程式人生 > >Kafka 原始碼分析之LogSegment

Kafka 原始碼分析之LogSegment

這裡分析kafka LogSegment原始碼

通過一步步分析LogManager,Log原始碼之後就會發現,最終的log操作都在LogSegment上實現.LogSegment負責分片的讀寫恢復重新整理刪除等動作都在這裡實現.LogSegment程式碼同樣在原始碼目錄log下.

LogSegment是一個日誌分片的操作最小單元.直接作用與messages之上.負責實體訊息的讀寫追加等等.

LogSegment實際上是FileMessageSet類的代理類.LogSegment中的所有最終處理都在FileMessageSet類中實現.FileMessageSet類的最終操作建立在ByteBufferMessageSet這個訊息實體類的基礎上.通過操作FileChannel物件來實現訊息讀寫.

下面來看看主要的一些函式方法.

  初始化部分

 

class LogSegment(val log: FileMessageSet,     //實際構造是這個.
                 val index: OffsetIndex, 
                 val baseOffset: Long, 
                 val indexIntervalBytes: Int,
                 val rollJitterMs: Long,
                 time: Time) extends Logging {
  
  var created = time.milliseconds

  /* the number of bytes since we last added an entry in the offset index */
  private var bytesSinceLastIndexEntry = 0
  //在Log中被呼叫的構造是這個.可以看見是通過topicAndPartition路徑和startOffset來建立index和logfile的.
  def this(dir: File, startOffset: Long, indexIntervalBytes: Int, maxIndexSize: Int, rollJitterMs: Long, time: Time) =
    this(new FileMessageSet(file = Log.logFilename(dir, startOffset)), 
         new OffsetIndex(file = Log.indexFilename(dir, startOffset), baseOffset = startOffset, maxIndexSize = maxIndexSize),
         startOffset,
         indexIntervalBytes,
         rollJitterMs,
         time)

 

  新增訊息函式append

 

def append(offset: Long, messages: ByteBufferMessageSet) {
    if (messages.sizeInBytes > 0) { //判斷訊息不為空.
      trace("Inserting %d bytes at offset %d at position %d".format(messages.sizeInBytes, offset, log.sizeInBytes()))
      // append an entry to the index (if needed)
      if(bytesSinceLastIndexEntry > indexIntervalBytes) {
        index.append(offset, log.sizeInBytes())
        this.bytesSinceLastIndexEntry = 0
      }
      // append the messages
      log.append(messages) //呼叫FileMessageSet類的append方法想寫訊息.實際上最終呼叫的是ByteBufferMessageSet類方法來操作訊息實體的.
      this.bytesSinceLastIndexEntry += messages.sizeInBytes
    }
  }

 

  重新整理訊息到磁碟的flush函式

 

def flush() {
    LogFlushStats.logFlushTimer.time {
      log.flush()   //可以看見呼叫的FileMessageSet類的方法.最終FileMessageSet.flush方法呼叫channel.force方法重新整理儲存裝置.
      index.flush() //同上.
    }
  }

 

  讀取訊息的read函式

 

def read(startOffset: Long, maxOffset: Option[Long], maxSize: Int): FetchDataInfo = {
    if(maxSize < 0)
      throw new IllegalArgumentException("Invalid max size for log read (%d)".format(maxSize))

    val logSize = log.sizeInBytes // this may change, need to save a consistent copy
    val startPosition = translateOffset(startOffset)  //獲取對應offset的讀取點位置.

    // if the start position is already off the end of the log, return null
    if(startPosition == null) //沒有讀取點位置則返回空
      return null

    val offsetMetadata = new LogOffsetMetadata(startOffset, this.baseOffset, startPosition.position) //定義offsetMetadata

    // if the size is zero, still return a log segment but with zero size
    if(maxSize == 0)  //最大讀取尺寸是0的話.返回空訊息.
      return FetchDataInfo(offsetMetadata, MessageSet.Empty)

    // calculate the length of the message set to read based on whether or not they gave us a maxOffset
    val length =   //計算最大讀取的訊息總長度.
      maxOffset match {
        case None => //未設定maxoffset則使用maxsize.
          // no max offset, just use the max size they gave unmolested
          maxSize
        case Some(offset) => { //如果設定了Maxoffset,則計算對應的訊息長度.
          // there is a max offset, translate it to a file position and use that to calculate the max read size
          if(offset < startOffset)  //maxoffset小於startoffset則返回異常
            throw new IllegalArgumentException("Attempt to read with a maximum offset (%d) less than the start offset (%d).".format(offset, startOffset))
          val mapping = translateOffset(offset, startPosition.position) //獲取相對maxoffset讀取點.
          val endPosition = 
            if(mapping == null)
              logSize // the max offset is off the end of the log, use the end of the file
            else
              mapping.position
          min(endPosition - startPosition.position, maxSize) //用maxoffset讀取點減去開始的讀取點.獲取需要讀取的資料長度.如果長度比maxsize大則返回maxsize
        }
      }
    FetchDataInfo(offsetMetadata, log.read(startPosition.position, length)) //使用FileMessageSet.read讀取相應長度的資料返回FetchDataInfo的封裝物件.
  }

 

  讀取函式通過對映offset到讀取長度.來讀取多個offset.

 

private[log] def translateOffset(offset: Long, startingFilePosition: Int = 0): OffsetPosition = {  //用來將offset對映到讀取指標位置的函式.
    val mapping = index.lookup(offset) //通過查詢index獲取對應的指標物件.
    log.searchFor(offset, max(mapping.position, startingFilePosition)) //通過FileMessageSet獲取對應的指標位置.
  }

 

  recover函式.kafka啟動檢查時用到的各層呼叫的最後代理函式.

 

def recover(maxMessageSize: Int): Int = {
    index.truncate()
    index.resize(index.maxIndexSize)
    var validBytes = 0
    var lastIndexEntry = 0
    val iter = log.iterator(maxMessageSize)
    try {
      while(iter.hasNext) {
        val entry = iter.next
        entry.message.ensureValid()
        if(validBytes - lastIndexEntry > indexIntervalBytes) {
          // we need to decompress the message, if required, to get the offset of the first uncompressed message
          val startOffset =
            entry.message.compressionCodec match {
              case NoCompressionCodec =>
                entry.offset
              case _ =>
                ByteBufferMessageSet.decompress(entry.message).head.offset
          }
          index.append(startOffset, validBytes)
          lastIndexEntry = validBytes
        }
        validBytes += MessageSet.entrySize(entry.message)
      }
    } catch {
      case e: InvalidMessageException => 
        logger.warn("Found invalid messages in log segment %s at byte offset %d: %s.".format(log.file.getAbsolutePath, validBytes, e.getMessage))
    }
    val truncated = log.sizeInBytes - validBytes
    log.truncateTo(validBytes)
    index.trimToValidSize()
    truncated
  }

 

  分片刪除函式

 

 def delete() {
    val deletedLog = log.delete()   //最終是刪除檔案,關閉記憶體陣列.在FileMessageSet裡實現.
    val deletedIndex = index.delete() //同上.
    if(!deletedLog && log.file.exists)
      throw new KafkaStorageException("Delete of log " + log.file.getName + " failed.")
    if(!deletedIndex && index.file.exists)
      throw new KafkaStorageException("Delete of index " + index.file.getName + " failed.")
  }

到這裡LogSegment主要函式都分析完了.