1. 程式人生 > >Kafka LogManager詳解(六)

Kafka LogManager詳解(六)

文章目錄

一、LogManager結構

logDir:表示使用者配置的日誌存放路徑,通過log.dir配置,可以配置多個。LogManager會維護一個LogDir的列表。

Log: 每個partition的日誌目錄,代表topic的一個分割槽副本。LogManager會維護本broker上所有的Log物件。

**LogSegment:**partition中的日誌段物件,每個Log都會有N個日誌段。這個日誌段包括了日誌檔案和對應的索引檔案。

二、LogManager的建立

LogManager,即日誌管理元件,在kafka啟動時會建立並啟動。

private def createLogManager(zkClient: ZkClient, brokerState: BrokerState): LogManager = {
    val defaultProps = KafkaServer.copyKafkaConfigToLog(config)
    val defaultLogConfig = LogConfig(defaultProps)
	//從zk獲取各個topic的相關配置
    val configs = AdminUtils.fetchAllTopicConfigs(zkUtils).map { case (topic, configs) =>
      topic -> LogConfig.fromProps(defaultProps, configs)
    }
    // read the log configurations from zookeeper
    val cleanerConfig = CleanerConfig(numThreads = config.logCleanerThreads,
                                      dedupeBufferSize = config.logCleanerDedupeBufferSize,
                                      dedupeBufferLoadFactor = config.logCleanerDedupeBufferLoadFactor,
                                      ioBufferSize = config.logCleanerIoBufferSize,
                                      maxMessageSize = config.messageMaxBytes,
                                      maxIoBytesPerSecond = config.logCleanerIoMaxBytesPerSecond,
                                      backOffMs = config.logCleanerBackoffMs,
                                      enableCleaner = config.logCleanerEnable)
    new LogManager(logDirs = config.logDirs.map(new File(_)).toArray,
                   topicConfigs = configs,
                   defaultConfig = defaultLogConfig,
                   cleanerConfig = cleanerConfig,
                   ioThreads = config.numRecoveryThreadsPerDataDir,
                   flushCheckMs = config.logFlushSchedulerIntervalMs,
                   flushCheckpointMs = config.logFlushOffsetCheckpointIntervalMs,
                   retentionCheckMs = config.logCleanupIntervalMs,
                   scheduler = kafkaScheduler,
                   brokerState = brokerState,
                   time = time)
  }

LogManager建立後,會先後做兩件事

  1. 檢查日誌目錄
  2. 載入日誌目錄的檔案

檢查日誌目錄

  private def createAndValidateLogDirs(dirs: Seq[File]) {
    if(dirs.map(_.getCanonicalPath).toSet.size < dirs.size)
      throw new KafkaException("Duplicate log directory found: " + logDirs.mkString(", "))
    for(dir <- dirs) {
      if(!dir.exists) {
        info("Log directory '" + dir.getAbsolutePath + "' not found, creating it.")
        val created = dir.mkdirs()
        if(!created)
          throw new KafkaException("Failed to create data directory " + dir.getAbsolutePath)
      }
      if(!dir.isDirectory || !dir.canRead)
        throw new KafkaException(dir.getAbsolutePath + " is not a readable log directory.")
    }
  }
  1. 配置的日誌目錄是否有重複的
  2. 日誌目錄不存在的話就新建一個日誌目錄
  3. 檢查日誌目錄是否可讀

載入日誌目錄的檔案

 private def loadLogs(): Unit = {
    info("Loading logs.")
    val startMs = time.milliseconds
    val threadPools = mutable.ArrayBuffer.empty[ExecutorService]
    val jobs = mutable.Map.empty[File, Seq[Future[_]]]
    //logDirs和配置的日誌存放目錄路徑有關
    for (dir <- this.logDirs) {
      val pool = Executors.newFixedThreadPool(ioThreads)
      threadPools.append(pool)
      //檢查上一次關閉是否是正常關閉
      val cleanShutdownFile = new File(dir, Log.CleanShutdownFile)

      if (cleanShutdownFile.exists) {
        debug(
          "Found clean shutdown file. " +
          "Skipping recovery for all logs in data directory: " +
          dir.getAbsolutePath)
      } else {
        // log recovery itself is being performed by `Log` class during initialization
        brokerState.newState(RecoveringFromUncleanShutdown)
      }
      //讀取日誌檢查點
      var recoveryPoints = Map[TopicPartition, Long]()
      try {
        recoveryPoints = this.recoveryPointCheckpoints(dir).read
      } catch {
        case e: Exception =>
          warn("Error occured while reading recovery-point-offset-checkpoint file of directory " + dir, e)
          warn("Resetting the recovery checkpoint to 0")
      }

      val jobsForDir = for {
        dirContent <- Option(dir.listFiles).toList
        logDir <- dirContent if logDir.isDirectory
      } yield {
        CoreUtils.runnable {
          debug("Loading log '" + logDir.getName + "'")
          //根據目錄名解析partiton的資訊,比如test-0,解析等到的patition就是topic test下的0號分割槽
          val topicPartition = Log.parseTopicPartitionName(logDir)
          val config = topicConfigs.getOrElse(topicPartition.topic, defaultConfig)
          val logRecoveryPoint = recoveryPoints.getOrElse(topicPartition, 0L)

          val current = new Log(logDir, config, logRecoveryPoint, scheduler, time)
          if (logDir.getName.endsWith(Log.DeleteDirSuffix)) {
            this.logsToBeDeleted.add(current)
          } else {
            val previous = this.logs.put(topicPartition, current)
            //判斷是否有重複的分割槽資料目錄
            if (previous != null) {
              throw new IllegalArgumentException(
                "Duplicate log directories found: %s, %s!".format(
                  current.dir.getAbsolutePath, previous.dir.getAbsolutePath))
            }
          }
        }
      }

      jobs(cleanShutdownFile) = jobsForDir.map(pool.submit).toSeq
    }


    try {
      for ((cleanShutdownFile, dirJobs) <- jobs) {
        //等待所有任務完成
        dirJobs.foreach(_.get)
        cleanShutdownFile.delete()
      }
    } catch {
      case e: ExecutionException => {
        error("There was an error in one of the threads during logs loading: " + e.getCause)
        throw e.getCause
      }
    } finally {
      threadPools.foreach(_.shutdown())
    }

    info(s"Logs loading complete in ${time.milliseconds - startMs} ms.")
  }

遍歷每個日誌目錄時,會先讀取日誌檢查點檔案,然後讀取日誌目錄下的所有檔案,然後建立相關的Log物件。需要注意的是,由於載入過程比較慢,對於每個日誌目錄都會建立一個執行緒來載入,最後等所有執行緒都載入完畢後才會退出loadLogs()方法。

因此,建立LogManager的過程是阻塞的,當LogManager建立完成後,說明所有的分割槽目錄都載入進來了。

三、啟動LogManager

建立LogManager後,就會立馬呼叫startup()方法啟動。

def startup() {
    /* Schedule the cleanup task to delete old logs */
    if(scheduler != null) {
      info("Starting log cleanup with a period of %d ms.".format(retentionCheckMs))
      scheduler.schedule("kafka-log-retention",
                         cleanupLogs,
                         delay = InitialTaskDelayMs,
                         period = retentionCheckMs,
                         TimeUnit.MILLISECONDS)
      info("Starting log flusher with a default period of %d ms.".format(flushCheckMs))
      scheduler.schedule("kafka-log-flusher", 
                         flushDirtyLogs, 
                         delay = InitialTaskDelayMs, 
                         period = flushCheckMs, 
                         TimeUnit.MILLISECONDS)
      scheduler.schedule("kafka-recovery-point-checkpoint",
                         checkpointRecoveryPointOffsets,
                         delay = InitialTaskDelayMs,
                         period = flushCheckpointMs,
                         TimeUnit.MILLISECONDS)
      scheduler.schedule("kafka-delete-logs",
                         deleteLogs,
                         delay = InitialTaskDelayMs,
                         period = defaultConfig.fileDeleteDelayMs,
                         TimeUnit.MILLISECONDS)
    }
    if(cleanerConfig.enableCleaner)
      cleaner.startup()
}

LogManager的啟動其實就是提交了4個定時任務,以及根據配置而定開啟一個日誌清理元件。

4個定時任務

  1. 舊的日誌段刪除任務
  2. 刷盤任務
  3. 檢查點任務
  4. 分割槽目錄刪除任務

四、舊的日誌段刪除任務

在LogManager啟動後,會提交一個週期性的日誌段刪除任務,用來處理一些超過一定時間以及大小的日誌段。這個任務的執行週期和log.retention.check.interval.ms有關係,預設值是300000,也就是每5分鐘執行一次刪除任務。執行的任務方法如下:

def cleanupLogs() {
    debug("Beginning log cleanup...")
    var total = 0
    val startMs = time.milliseconds
    for(log <- allLogs; if !log.config.compact) {
      debug("Garbage collecting '" + log.name + "'")
      //遍歷所有日誌,呼叫log元件的方法刪除日誌
      total += log.deleteOldSegments()
    }
    debug("Log cleanup completed. " + total + " files deleted in " +
                  (time.milliseconds - startMs) / 1000 + " seconds")
}

def deleteOldSegments(): Int = {
    if (!config.delete) return 0
    //一種是根據時間過期的策略刪除日誌,一種是根據大小去刪除日誌。
    deleteRetenionMsBreachedSegments() + deleteRetentionSizeBreachedSegments()
}

Kafka對於舊日誌段的處理方式有兩種

  1. 刪除:超過時間或大小閾值的舊 segment,直接進行刪除;
  2. 壓縮:不是直接刪除日誌分段,而是採用合併壓縮的方式進行。

Kafka刪除的檢查策略有兩種。一種根據時間過期的策略刪除過期的日誌,一種是根據日誌大小來刪除太大的日誌。

根據時間策略刪除相關日誌

該策略和配置retention.ms有關係

//根據時間策略刪除相關日誌段
private def deleteRetenionMsBreachedSegments() : Int = {
    if (config.retentionMs < 0) return 0
    val startMs = time.milliseconds
    //傳到deleteOldSegments方法中的引數是一個高階函式,後面的方法中,會遍歷所有的segment,並呼叫此方法
    //一般從最舊的segment開始遍歷
    deleteOldSegments(startMs - _.largestTimestamp > config.retentionMs)
}
private def deleteOldSegments(predicate: LogSegment => Boolean): Int = {
    lock synchronized {
      //遍歷所有的segment,如果目標segment的largestTimestamp已經到達過期時間了,就標記要刪除
      //另外,如果遍歷到的segment是最新的一個segment,並且該segment的大小是0,這個segment就不會被刪除
      val deletable = deletableSegments(predicate)
      val numToDelete = deletable.size
      if (numToDelete > 0) {
        //如果全部的segment都過期了,為了保證至少有一個segment在工作,我們需要新建一個segment
        if (segments.size == numToDelete)
          roll()
        //非同步刪除日誌段
        deletable.foreach(deleteSegment)
      }
      numToDelete
    }
}

上面的程式碼把所有過期的日誌段刪除,config.retentionMs取決於配置log.retention.hours預設為168個小時,也就是7天。刪除時要注意兩點:

  1. 對於那些大小為0並且是正在使用中的日誌段不會被刪除
  2. 如果掃描完發現全部的日誌段都過期了,就要馬上新生成一個新的日誌段來處理後面的訊息
  3. 日誌段的刪除時非同步的,此處只會標記一下,往日誌段檔案後面加上.delete字尾,然後開啟一個定時任務刪除檔案。定時任務的延遲時間和file.delete.delay.ms有關係。

根據日誌大小刪除相關日誌

該刪除策略和配置retention.bytes有關係。該策略可以保證分割槽目錄的大小始終保持在一個限制的範圍內。

private def deleteRetentionSizeBreachedSegments() : Int = {
    if (config.retentionSize < 0 || size < config.retentionSize) return 0
    //diff表示超出限制的大小
    var diff = size - config.retentionSize
    //這是一個高階函式,後面的方法中,會遍歷所有的segment,並呼叫此方法
    //一般從最舊的segment開始遍歷
    def shouldDelete(segment: LogSegment) = {
      if (diff - segment.size >= 0) {
        diff -= segment.size
        true
      } else {
        false
      }
    }
    deleteOldSegments(shouldDelete)
}
//和時間過期策略呼叫的是同一個方法,只是傳入的predicate函式不一樣
private def deleteOldSegments(predicate: LogSegment => Boolean): Int = {
    lock synchronized {
      //遍歷所有的segment,如果目標segment的largestTimestamp已經到達過期時間了,就標記要刪除
      //另外,如果遍歷到的segment是最新的一個segment,並且該segment的大小是0,這個segment就不會被刪除
      val deletable = deletableSegments(predicate)
      val numToDelete = deletable.size
      if (numToDelete > 0) {
        //如果全部的segment都過期了,為了保證至少有一個segment在工作,我們需要新建一個segment
        if (segments.size == numToDelete)
          roll()
        //非同步刪除日誌段
        deletable.foreach(deleteSegment)
      }
      numToDelete
    }
}

這個策略的掃描邏輯大概是這樣的

  1. 通過size-retentionSize算出diff
  2. 遍歷segment,對於大小超過diff的日誌段,就標記刪除。然後將diff的值設定為diff-segment.size

使用這種策略,當分割槽目錄下只有一個日誌段時,無論該日誌段多大,都不會被刪除。另外,和時間策略一樣,這個刪除也是非同步刪除

五、刷盤任務

kafka在處理Producer請求時,只是將日誌寫到快取,並沒有執行flush()方法刷到磁碟。因此,logManager中開啟了一個刷盤任務,定期檢查各個目錄,根據刷盤策略執行flush操作。這個任務保證了每隔多久kafka會執行一次刷盤操作。

  private def flushDirtyLogs() = {
    debug("Checking for dirty logs to flush...")

    for ((topicPartition, log) <- logs) {
      try {
        val timeSinceLastFlush = time.milliseconds - log.lastFlushTime
        debug("Checking if flush is needed on " + topicPartition.topic + " flush interval  " + log.config.flushMs +
              " last flushed " + log.lastFlushTime + " time since last flush: " + timeSinceLastFlush)
        if(timeSinceLastFlush >= log.config.flushMs)
          log.flush
      } catch {
        case e: Throwable =>
          error("Error flushing topic " + topicPartition.topic, e)
      }
    }
  }

當距離上次刷盤的時間超過了log.config.flushMs時間就會執行一次刷盤,將快取中的內容持久化到磁碟。但是kafka官方給刷盤頻率設定的預設值是Long的最大值,也就是說,kafka官方的建議是把刷盤操作交給作業系統來控制。

另外,這個刷盤任務這是控制指定時間刷盤一次。kafka還有一個關於刷盤的策略是根據日誌的條數來控制刷盤頻率的,也就是配置flush.messages。這個配置是在每次寫日誌完檢查的,當kafka處理Producer請求寫日誌到快取後,會檢查當前的offset和之前記錄的offset直接的差值,如果超過配置的值,就執行一次刷盤。不過flush.messages的預設值也是Long的最大值。

六、日誌恢復檢查點任務

kafka的recovery-checkpoint(檢查點)記錄了最後一次重新整理的offset,表示多少日誌已經落盤到磁碟上,然後在異常關閉後恢復日誌。

任務執行的方法

  def checkpointRecoveryPointOffsets() {
    this.logDirs.foreach(checkpointLogsInDir)
  }
  private def checkpointLogsInDir(dir: File): Unit = {
    val recoveryPoints = this.logsByDir.get(dir.toString)
    if (recoveryPoints.isDefined) {
        //recoveryPoint表示還未刷到磁碟的第一條offset,比如offset=100之前的訊息都刷到磁碟中了,那麼recoveryPoint就是101
   this.recoveryPointCheckpoints(dir).write(recoveryPoints.get.mapValues(_.recoveryPoint))
    }
  }

這個任務做的事情很簡單,就是遍歷所有的LogDir,然後將記憶體中維護的recovery-checkpoint寫到檔案上。

offset-checkpoint的儲存

每個LogDir日誌目錄下,都會有一個檔案recovery-point-offset-checkpoint,存放了各個Log(Partiton)當前的checkpoint是多少:

0
54
__consumer_offsets 22 0
__consumer_offsets 30 0
__consumer_offsets 8 0
__consumer_offsets 21 0
...

第一行的數字表示當前版本,第二行的數字表示該LogDir目錄下有多少個partition目錄。接著就是topic partition編號 recovery-checkpoint

何時重新整理recovery-checkpoint

kafka會在每次flush的時候更新對應Log的recovery-checkpoint。但是由於kafka的定時flush預設是交給作業系統來執行的。所以只有在新建一個新的segment時,以及對partition進行truncat時(如果replica的offset比leader還大,replica就要執行一次truncate,把超出的那些offset砍掉),才會更新recovery-checkpoint。

這種情況就會造成日誌落盤了很多,但是recovery-checkpoint一直沒更新的情況,不過由於recovery-checkpoint只是用來在broker啟動時恢復日誌用的,這一點倒無關緊要。另外,在正常關閉broker,kafka會保證將最新的offset寫入recovery-checkpoint檔案中。

如何利用recovery-checkpoint恢復日誌

首先,恢復點是異常關閉時用來恢復資料的。如果資料目錄下有.kafka_cleanshutdown檔案就表示不是異常關閉,就用不上恢復點了。如果上一次關閉時異常關閉的,kafka就會利用checkpoint來修復日誌了。

日誌的恢復程式碼

//Log.scala  
private def recoverLog() {
    //如果上一次是正常關閉,重新設定一下checkpoint
    if(hasCleanShutdownFile) {
      this.recoveryPoint = activeSegment.nextOffset
      return
    }

    // 根據recovery-checkpoint 找出那些需要恢復的segment
    val unflushed = logSegments(this.recoveryPoint, Long.MaxValue).iterator
    while(unflushed.hasNext) {
      val curr = unflushed.next
      info("Recovering unflushed segment %d in log %s.".format(curr.baseOffset, name))
      val truncatedBytes =
        try {
            //呼叫segment的recover()方法
          curr.recover(config.maxMessageSize)
        } catch {
          case _: InvalidOffsetException =>
            val startOffset = curr.baseOffset
            warn("Found invalid offset during recovery for log " + dir.getName +". Deleting the corrupt segment and " +
                 "creating an empty one with starting offset " + startOffset)
            curr.truncateTo(startOffset)
        }
      if(truncatedBytes > 0) {
        // we had an invalid message, delete all remaining log
          //只要有一條日誌出了問題,就要將這之後的所有segment都刪去
        warn("Corruption found in segment %d of log %s, truncating to offset %d.".format(curr.baseOffset, name, curr.nextOffset))
        unflushed.foreach(deleteSegment)
      }
    }
  }

//LogSegment.scala
//檢查segment中的訊息是否合法
  def recover(maxMessageSize: Int): Int = {
    index.truncate()
    index.resize(index.maxIndexSize)
    timeIndex.truncate()
    timeIndex.resize(timeIndex.maxIndexSize)
    var validBytes = 0
    var lastIndexEntry = 0
    maxTimestampSoFar = Record.NO_TIMESTAMP
    try {
        //遍歷所有的shallow message
        //這裡shallow message並不一定是我們理解的一條訊息,kafka可能會將多條訊息壓縮成一條訊息
        //所以shallow message可能是一條訊息,也可能是多條訊息組裝成一條訊息
      for (entry <- log.shallowEntries(maxMessageSize).asScala) {
        val record = entry.record
        record.ensureValid()

        // The max timestamp should have been put in the outer message, so we don't need to iterate over the inner messages.
        if (record.timestamp > maxTimestampSoFar) {
          maxTimestampSoFar = record.timestamp
          offsetOfMaxTimestamp = entry.offset
        }

        // Build offset index
        if(validBytes - lastIndexEntry > indexIntervalBytes) {
          val startOffset = entry.firstOffset
          index.append(startOffset, validBytes)
          timeIndex.maybeAppend(maxTimestampSoFar, offsetOfMaxTimestamp)
          lastIndexEntry = validBytes
        }
        validBytes += entry.sizeInBytes()
      }
    } catch {
      case e: CorruptRecordException =>
        logger.warn("Found invalid messages in log segment %s at byte offset %d: %s."
          .format(log.file.getAbsolutePath, validBytes, e.getMessage))
    }
    val truncated = log.sizeInBytes - validBytes
    log.truncateTo(validBytes)
    index.trimToValidSize()
    // A normally closed segment always appends the biggest timestamp ever seen into log segment, we do this as well.
    timeIndex.maybeAppend(maxTimestampSoFar, offsetOfMaxTimestamp, skipFullCheck = true)
    timeIndex.trimToValidSize()
    truncated
  }

上面程式碼中的recoverLog()是kafka在啟動LogManager初試化一個個Log物件時,Log在初試化過程中會執行的一個方法。這個方法主要做了幾件事

  1. 通過檢查是否有.kafka_cleanshutdown檔案來判斷上一次是否是正常關閉,如果是的話,就不用恢復什麼了,直接更新recovery-checkpoint。
  2. 如果上次是非正常關閉,通過當前的recovery-checkpoint找出這個recovery-checkpoint之後的所有segment(包括recovery-checkpoint所在的segment)。然後遍歷這些segment,一條一條訊息檢查過去,並重建索引,之後如果有segment的訊息格式不正確,就執行非同步刪除操作,將後面的segment全部刪除掉。

**要注意的是,這些檢查的segment中,只要有一條訊息時invalid,kafka就會刪除所有檢查的segment。**這點是我一直想不通的地方,直到2.0版本也是這樣的邏輯,希望有知道原因的朋友告知一下。

七、分割槽目錄刪除任務

該任務執行的任務主要是刪除分割槽目錄,同時刪除底下的segment資料檔案。

  private def deleteLogs(): Unit = {
    try {
      var failed = 0
        //遍歷待刪除列表的元素,逐一刪除分割槽目錄
      while (!logsToBeDeleted.isEmpty && failed < logsToBeDeleted.size()) {
        val removedLog = logsToBeDeleted.take()
        if (removedLog != null) {
          try {
            removedLog.delete()
            info(s"Deleted log for partition ${removedLog.topicPartition} in ${removedLog.dir.getAbsolutePath}.")
          } catch {
            case e: Throwable =>
              error(s"Exception in deleting $removedLog. Moving it to the end of the queue.", e)
              failed = failed + 1
              logsToBeDeleted.put(removedLog)
          }
        }
      }
    } catch {
      case e: Throwable => 
        error(s"Exception in kafka-delete-logs thread.", e)
}

做的事情主要就是遍歷logsToBeDeleted列表,然後遍歷刪除元素。

那麼什麼時候分割槽會被加到logsToBeDeleted中待刪除呢?

  1. LogManager啟動時會掃描所有分割槽目錄名結尾是’-delete’的分割槽,加入到logsToBeDeleted中
  2. 分割槽被刪除的時候走的都是非同步刪除策略,會先被加入到logsToBeDeleted中等待刪除。

在kafka中,要刪除分割槽需要往broker傳送StopReplica請求。broker收到StopReplica請求後,判斷是否需要刪除分割槽,如果要刪除就執行非同步刪除步驟,非同步刪除的程式碼主要如下

  def asyncDelete(topicPartition: TopicPartition) = {
    //從記憶體中刪去相關資料
    val removedLog: Log = logCreationOrDeletionLock synchronized {
        logs.remove(topicPartition)
    }
    if (removedLog != null) {
      //We need to wait until there is no more cleaning task on the log to be deleted before actually deleting it.
      if (cleaner != null) {
        cleaner.abortCleaning(topicPartition)
        cleaner.updateCheckpoints(removedLog.dir.getParentFile)
      }
      //往分割槽目錄名稱的最後加上 '-delete',表示準備刪除
      val dirName = Log.logDeleteDirName(removedLog.name)
      //關閉分割槽目錄
      removedLog.close()
      val renamedDir = new File(removedLog.dir.getParent, dirName)
      val renameSuccessful = removedLog.dir.renameTo(renamedDir)
      if (renameSuccessful) {
        removedLog.dir = renamedDir
        // change the file pointers for log and index file
        for (logSegment <- removedLog.logSegments) {
          logSegment.log.setFile(new File(renamedDir, logSegment.log.file.getName))
          logSegment.index.file = new File(renamedDir, logSegment.index.file.getName)
        }
        //加入待刪除列表
        logsToBeDeleted.add(removedLog)
        removedLog.removeLogMetrics()
        info(s"Log for partition ${removedLog.topicPartition} is renamed to ${removedLog.dir.getAbsolutePath} and is scheduled for deletion")
      } else {
        throw new KafkaStorageException("Failed to rename log directory from " + removedLog.dir.getAbsolutePath + " to " + renamedDir.getAbsolutePath)
      }
    }
  }
  1. 需要先把分割槽目錄標記一下,在後綴加上’-delete’表示該分割槽準備刪除了。這樣做可以防止如果刪除時間沒到就宕機,下次重啟時可以掃描’-delete’結尾的分割槽再刪除
  2. 把分割槽目錄新增到logsToBeDeleted中待刪除

八、多磁碟選擇機制

當配置了多個磁碟時,kafka是怎麼保證資料均勻分佈在各個磁碟呢?

這裡多磁碟只的是配置log.dirs中配置了多個目錄。

這個問題和kafka建立一個新的partition時,如何選擇目錄有關係,下面是kafka建立partition的程式碼

  def createLog(topicPartition: TopicPartition, config: LogConfig): Log = {
    logCreationOrDeletionLock synchronized {
      // create the log if it has not already been created in another thread
      getLog(topicPartition).getOrElse {
          //選擇新的partition要放在哪個資料目錄上
        val dataDir = nextLogDir()
        val dir = new File(dataDir, topicPartition.topic + "-" + topicPartition.partition)
        dir.mkdirs()
        val log = new Log(dir, config, recoveryPoint = 0L, scheduler, time)
        logs.put(topicPartition, log)
        info("Created log for partition [%s,%d] in %s with properties {%s}."
          .format(topicPartition.topic,
            topicPartition.partition,
            dataDir.getAbsolutePath,
            config.originals.asScala.mkString(", ")))
        log
      }
    }
  }

  private def nextLogDir(): File = {
    if(logDirs.size == 1) {
      logDirs(0)
    } else {
        //各個資料目錄的檔案數量
      val logCounts = allLogs.groupBy(_.dir.getParent).mapValues(_.size)
        //有一些資料目錄底下可能沒有partition目錄
      val zeros = logDirs.map(dir => (dir.getPath, 0)).toMap
      var dirCounts = (zeros ++ logCounts).toBuffer
    
      //排序後,取當前檔案數量最小的那個資料目錄
      val leastLoaded = dirCounts.sortBy(_._2).head
      new File(leastLoaded._1)
    }
  }

nextLogDir()程式碼中可以看出,當新建一個新的partition目錄時,主要還是取partition檔案最少的那個資料目錄。

這樣在極端情況下可能會有一些問題,可能兩個資料目錄底下的partition檔案數一樣,但是其中一個數據目錄資料量非常大的情況(各個partition的資料量不一樣)。因此,在選擇多磁碟時也要注意一下,避免造成資源浪費。