Kafka LogManager詳解(六)
文章目錄
一、LogManager結構
logDir:表示使用者配置的日誌存放路徑,通過log.dir配置,可以配置多個。LogManager會維護一個LogDir的列表。
Log: 每個partition的日誌目錄,代表topic的一個分割槽副本。LogManager會維護本broker上所有的Log物件。
**LogSegment:**partition中的日誌段物件,每個Log都會有N個日誌段。這個日誌段包括了日誌檔案和對應的索引檔案。
二、LogManager的建立
LogManager,即日誌管理元件,在kafka啟動時會建立並啟動。
private def createLogManager(zkClient: ZkClient, brokerState: BrokerState): LogManager = { val defaultProps = KafkaServer.copyKafkaConfigToLog(config) val defaultLogConfig = LogConfig(defaultProps) //從zk獲取各個topic的相關配置 val configs = AdminUtils.fetchAllTopicConfigs(zkUtils).map { case (topic, configs) => topic -> LogConfig.fromProps(defaultProps, configs) } // read the log configurations from zookeeper val cleanerConfig = CleanerConfig(numThreads = config.logCleanerThreads, dedupeBufferSize = config.logCleanerDedupeBufferSize, dedupeBufferLoadFactor = config.logCleanerDedupeBufferLoadFactor, ioBufferSize = config.logCleanerIoBufferSize, maxMessageSize = config.messageMaxBytes, maxIoBytesPerSecond = config.logCleanerIoMaxBytesPerSecond, backOffMs = config.logCleanerBackoffMs, enableCleaner = config.logCleanerEnable) new LogManager(logDirs = config.logDirs.map(new File(_)).toArray, topicConfigs = configs, defaultConfig = defaultLogConfig, cleanerConfig = cleanerConfig, ioThreads = config.numRecoveryThreadsPerDataDir, flushCheckMs = config.logFlushSchedulerIntervalMs, flushCheckpointMs = config.logFlushOffsetCheckpointIntervalMs, retentionCheckMs = config.logCleanupIntervalMs, scheduler = kafkaScheduler, brokerState = brokerState, time = time) }
LogManager建立後,會先後做兩件事
- 檢查日誌目錄
- 載入日誌目錄的檔案
檢查日誌目錄
private def createAndValidateLogDirs(dirs: Seq[File]) {
if(dirs.map(_.getCanonicalPath).toSet.size < dirs.size)
throw new KafkaException("Duplicate log directory found: " + logDirs.mkString(", "))
for(dir <- dirs) {
if(!dir.exists) {
info("Log directory '" + dir.getAbsolutePath + "' not found, creating it.")
val created = dir.mkdirs()
if(!created)
throw new KafkaException("Failed to create data directory " + dir.getAbsolutePath)
}
if(!dir.isDirectory || !dir.canRead)
throw new KafkaException(dir.getAbsolutePath + " is not a readable log directory.")
}
}
- 配置的日誌目錄是否有重複的
- 日誌目錄不存在的話就新建一個日誌目錄
- 檢查日誌目錄是否可讀
載入日誌目錄的檔案
private def loadLogs(): Unit = {
info("Loading logs.")
val startMs = time.milliseconds
val threadPools = mutable.ArrayBuffer.empty[ExecutorService]
val jobs = mutable.Map.empty[File, Seq[Future[_]]]
//logDirs和配置的日誌存放目錄路徑有關
for (dir <- this.logDirs) {
val pool = Executors.newFixedThreadPool(ioThreads)
threadPools.append(pool)
//檢查上一次關閉是否是正常關閉
val cleanShutdownFile = new File(dir, Log.CleanShutdownFile)
if (cleanShutdownFile.exists) {
debug(
"Found clean shutdown file. " +
"Skipping recovery for all logs in data directory: " +
dir.getAbsolutePath)
} else {
// log recovery itself is being performed by `Log` class during initialization
brokerState.newState(RecoveringFromUncleanShutdown)
}
//讀取日誌檢查點
var recoveryPoints = Map[TopicPartition, Long]()
try {
recoveryPoints = this.recoveryPointCheckpoints(dir).read
} catch {
case e: Exception =>
warn("Error occured while reading recovery-point-offset-checkpoint file of directory " + dir, e)
warn("Resetting the recovery checkpoint to 0")
}
val jobsForDir = for {
dirContent <- Option(dir.listFiles).toList
logDir <- dirContent if logDir.isDirectory
} yield {
CoreUtils.runnable {
debug("Loading log '" + logDir.getName + "'")
//根據目錄名解析partiton的資訊,比如test-0,解析等到的patition就是topic test下的0號分割槽
val topicPartition = Log.parseTopicPartitionName(logDir)
val config = topicConfigs.getOrElse(topicPartition.topic, defaultConfig)
val logRecoveryPoint = recoveryPoints.getOrElse(topicPartition, 0L)
val current = new Log(logDir, config, logRecoveryPoint, scheduler, time)
if (logDir.getName.endsWith(Log.DeleteDirSuffix)) {
this.logsToBeDeleted.add(current)
} else {
val previous = this.logs.put(topicPartition, current)
//判斷是否有重複的分割槽資料目錄
if (previous != null) {
throw new IllegalArgumentException(
"Duplicate log directories found: %s, %s!".format(
current.dir.getAbsolutePath, previous.dir.getAbsolutePath))
}
}
}
}
jobs(cleanShutdownFile) = jobsForDir.map(pool.submit).toSeq
}
try {
for ((cleanShutdownFile, dirJobs) <- jobs) {
//等待所有任務完成
dirJobs.foreach(_.get)
cleanShutdownFile.delete()
}
} catch {
case e: ExecutionException => {
error("There was an error in one of the threads during logs loading: " + e.getCause)
throw e.getCause
}
} finally {
threadPools.foreach(_.shutdown())
}
info(s"Logs loading complete in ${time.milliseconds - startMs} ms.")
}
遍歷每個日誌目錄時,會先讀取日誌檢查點檔案,然後讀取日誌目錄下的所有檔案,然後建立相關的Log物件。需要注意的是,由於載入過程比較慢,對於每個日誌目錄都會建立一個執行緒來載入,最後等所有執行緒都載入完畢後才會退出loadLogs()
方法。
因此,建立LogManager的過程是阻塞的,當LogManager建立完成後,說明所有的分割槽目錄都載入進來了。
三、啟動LogManager
建立LogManager後,就會立馬呼叫startup()
方法啟動。
def startup() {
/* Schedule the cleanup task to delete old logs */
if(scheduler != null) {
info("Starting log cleanup with a period of %d ms.".format(retentionCheckMs))
scheduler.schedule("kafka-log-retention",
cleanupLogs,
delay = InitialTaskDelayMs,
period = retentionCheckMs,
TimeUnit.MILLISECONDS)
info("Starting log flusher with a default period of %d ms.".format(flushCheckMs))
scheduler.schedule("kafka-log-flusher",
flushDirtyLogs,
delay = InitialTaskDelayMs,
period = flushCheckMs,
TimeUnit.MILLISECONDS)
scheduler.schedule("kafka-recovery-point-checkpoint",
checkpointRecoveryPointOffsets,
delay = InitialTaskDelayMs,
period = flushCheckpointMs,
TimeUnit.MILLISECONDS)
scheduler.schedule("kafka-delete-logs",
deleteLogs,
delay = InitialTaskDelayMs,
period = defaultConfig.fileDeleteDelayMs,
TimeUnit.MILLISECONDS)
}
if(cleanerConfig.enableCleaner)
cleaner.startup()
}
LogManager的啟動其實就是提交了4個定時任務,以及根據配置而定開啟一個日誌清理元件。
4個定時任務
- 舊的日誌段刪除任務
- 刷盤任務
- 檢查點任務
- 分割槽目錄刪除任務
四、舊的日誌段刪除任務
在LogManager啟動後,會提交一個週期性的日誌段刪除任務,用來處理一些超過一定時間以及大小的日誌段。這個任務的執行週期和log.retention.check.interval.ms
有關係,預設值是300000,也就是每5分鐘執行一次刪除任務。執行的任務方法如下:
def cleanupLogs() {
debug("Beginning log cleanup...")
var total = 0
val startMs = time.milliseconds
for(log <- allLogs; if !log.config.compact) {
debug("Garbage collecting '" + log.name + "'")
//遍歷所有日誌,呼叫log元件的方法刪除日誌
total += log.deleteOldSegments()
}
debug("Log cleanup completed. " + total + " files deleted in " +
(time.milliseconds - startMs) / 1000 + " seconds")
}
def deleteOldSegments(): Int = {
if (!config.delete) return 0
//一種是根據時間過期的策略刪除日誌,一種是根據大小去刪除日誌。
deleteRetenionMsBreachedSegments() + deleteRetentionSizeBreachedSegments()
}
Kafka對於舊日誌段的處理方式有兩種
- 刪除:超過時間或大小閾值的舊 segment,直接進行刪除;
- 壓縮:不是直接刪除日誌分段,而是採用合併壓縮的方式進行。
Kafka刪除的檢查策略有兩種。一種根據時間過期的策略刪除過期的日誌,一種是根據日誌大小來刪除太大的日誌。
根據時間策略刪除相關日誌
該策略和配置retention.ms
有關係
//根據時間策略刪除相關日誌段
private def deleteRetenionMsBreachedSegments() : Int = {
if (config.retentionMs < 0) return 0
val startMs = time.milliseconds
//傳到deleteOldSegments方法中的引數是一個高階函式,後面的方法中,會遍歷所有的segment,並呼叫此方法
//一般從最舊的segment開始遍歷
deleteOldSegments(startMs - _.largestTimestamp > config.retentionMs)
}
private def deleteOldSegments(predicate: LogSegment => Boolean): Int = {
lock synchronized {
//遍歷所有的segment,如果目標segment的largestTimestamp已經到達過期時間了,就標記要刪除
//另外,如果遍歷到的segment是最新的一個segment,並且該segment的大小是0,這個segment就不會被刪除
val deletable = deletableSegments(predicate)
val numToDelete = deletable.size
if (numToDelete > 0) {
//如果全部的segment都過期了,為了保證至少有一個segment在工作,我們需要新建一個segment
if (segments.size == numToDelete)
roll()
//非同步刪除日誌段
deletable.foreach(deleteSegment)
}
numToDelete
}
}
上面的程式碼把所有過期的日誌段刪除,config.retentionMs
取決於配置log.retention.hours
預設為168個小時,也就是7天。刪除時要注意兩點:
- 對於那些大小為0並且是正在使用中的日誌段不會被刪除
- 如果掃描完發現全部的日誌段都過期了,就要馬上新生成一個新的日誌段來處理後面的訊息
- 日誌段的刪除時非同步的,此處只會標記一下,往日誌段檔案後面加上
.delete
字尾,然後開啟一個定時任務刪除檔案。定時任務的延遲時間和file.delete.delay.ms
有關係。
根據日誌大小刪除相關日誌
該刪除策略和配置retention.bytes
有關係。該策略可以保證分割槽目錄的大小始終保持在一個限制的範圍內。
private def deleteRetentionSizeBreachedSegments() : Int = {
if (config.retentionSize < 0 || size < config.retentionSize) return 0
//diff表示超出限制的大小
var diff = size - config.retentionSize
//這是一個高階函式,後面的方法中,會遍歷所有的segment,並呼叫此方法
//一般從最舊的segment開始遍歷
def shouldDelete(segment: LogSegment) = {
if (diff - segment.size >= 0) {
diff -= segment.size
true
} else {
false
}
}
deleteOldSegments(shouldDelete)
}
//和時間過期策略呼叫的是同一個方法,只是傳入的predicate函式不一樣
private def deleteOldSegments(predicate: LogSegment => Boolean): Int = {
lock synchronized {
//遍歷所有的segment,如果目標segment的largestTimestamp已經到達過期時間了,就標記要刪除
//另外,如果遍歷到的segment是最新的一個segment,並且該segment的大小是0,這個segment就不會被刪除
val deletable = deletableSegments(predicate)
val numToDelete = deletable.size
if (numToDelete > 0) {
//如果全部的segment都過期了,為了保證至少有一個segment在工作,我們需要新建一個segment
if (segments.size == numToDelete)
roll()
//非同步刪除日誌段
deletable.foreach(deleteSegment)
}
numToDelete
}
}
這個策略的掃描邏輯大概是這樣的
- 通過
size-retentionSize
算出diff - 遍歷segment,對於大小超過diff的日誌段,就標記刪除。然後將diff的值設定為
diff-segment.size
使用這種策略,當分割槽目錄下只有一個日誌段時,無論該日誌段多大,都不會被刪除。另外,和時間策略一樣,這個刪除也是非同步刪除
五、刷盤任務
kafka在處理Producer請求時,只是將日誌寫到快取,並沒有執行flush()方法刷到磁碟。因此,logManager中開啟了一個刷盤任務,定期檢查各個目錄,根據刷盤策略執行flush操作。這個任務保證了每隔多久kafka會執行一次刷盤操作。
private def flushDirtyLogs() = {
debug("Checking for dirty logs to flush...")
for ((topicPartition, log) <- logs) {
try {
val timeSinceLastFlush = time.milliseconds - log.lastFlushTime
debug("Checking if flush is needed on " + topicPartition.topic + " flush interval " + log.config.flushMs +
" last flushed " + log.lastFlushTime + " time since last flush: " + timeSinceLastFlush)
if(timeSinceLastFlush >= log.config.flushMs)
log.flush
} catch {
case e: Throwable =>
error("Error flushing topic " + topicPartition.topic, e)
}
}
}
當距離上次刷盤的時間超過了log.config.flushMs
時間就會執行一次刷盤,將快取中的內容持久化到磁碟。但是kafka官方給刷盤頻率設定的預設值是Long的最大值,也就是說,kafka官方的建議是把刷盤操作交給作業系統來控制。
另外,這個刷盤任務這是控制指定時間刷盤一次。kafka還有一個關於刷盤的策略是根據日誌的條數來控制刷盤頻率的,也就是配置flush.messages
。這個配置是在每次寫日誌完檢查的,當kafka處理Producer請求寫日誌到快取後,會檢查當前的offset和之前記錄的offset直接的差值,如果超過配置的值,就執行一次刷盤。不過flush.messages
的預設值也是Long的最大值。
六、日誌恢復檢查點任務
kafka的recovery-checkpoint(檢查點)記錄了最後一次重新整理的offset,表示多少日誌已經落盤到磁碟上,然後在異常關閉後恢復日誌。
任務執行的方法
def checkpointRecoveryPointOffsets() {
this.logDirs.foreach(checkpointLogsInDir)
}
private def checkpointLogsInDir(dir: File): Unit = {
val recoveryPoints = this.logsByDir.get(dir.toString)
if (recoveryPoints.isDefined) {
//recoveryPoint表示還未刷到磁碟的第一條offset,比如offset=100之前的訊息都刷到磁碟中了,那麼recoveryPoint就是101
this.recoveryPointCheckpoints(dir).write(recoveryPoints.get.mapValues(_.recoveryPoint))
}
}
這個任務做的事情很簡單,就是遍歷所有的LogDir,然後將記憶體中維護的recovery-checkpoint寫到檔案上。
offset-checkpoint的儲存
每個LogDir日誌目錄下,都會有一個檔案recovery-point-offset-checkpoint,存放了各個Log(Partiton)當前的checkpoint是多少:
0
54
__consumer_offsets 22 0
__consumer_offsets 30 0
__consumer_offsets 8 0
__consumer_offsets 21 0
...
第一行的數字表示當前版本,第二行的數字表示該LogDir目錄下有多少個partition目錄。接著就是topic partition編號 recovery-checkpoint
。
何時重新整理recovery-checkpoint
kafka會在每次flush的時候更新對應Log的recovery-checkpoint。但是由於kafka的定時flush預設是交給作業系統來執行的。所以只有在新建一個新的segment時,以及對partition進行truncat時(如果replica的offset比leader還大,replica就要執行一次truncate,把超出的那些offset砍掉),才會更新recovery-checkpoint。
這種情況就會造成日誌落盤了很多,但是recovery-checkpoint一直沒更新的情況,不過由於recovery-checkpoint只是用來在broker啟動時恢復日誌用的,這一點倒無關緊要。另外,在正常關閉broker,kafka會保證將最新的offset寫入recovery-checkpoint檔案中。
如何利用recovery-checkpoint恢復日誌
首先,恢復點是異常關閉時用來恢復資料的。如果資料目錄下有.kafka_cleanshutdown
檔案就表示不是異常關閉,就用不上恢復點了。如果上一次關閉時異常關閉的,kafka就會利用checkpoint來修復日誌了。
日誌的恢復程式碼
//Log.scala
private def recoverLog() {
//如果上一次是正常關閉,重新設定一下checkpoint
if(hasCleanShutdownFile) {
this.recoveryPoint = activeSegment.nextOffset
return
}
// 根據recovery-checkpoint 找出那些需要恢復的segment
val unflushed = logSegments(this.recoveryPoint, Long.MaxValue).iterator
while(unflushed.hasNext) {
val curr = unflushed.next
info("Recovering unflushed segment %d in log %s.".format(curr.baseOffset, name))
val truncatedBytes =
try {
//呼叫segment的recover()方法
curr.recover(config.maxMessageSize)
} catch {
case _: InvalidOffsetException =>
val startOffset = curr.baseOffset
warn("Found invalid offset during recovery for log " + dir.getName +". Deleting the corrupt segment and " +
"creating an empty one with starting offset " + startOffset)
curr.truncateTo(startOffset)
}
if(truncatedBytes > 0) {
// we had an invalid message, delete all remaining log
//只要有一條日誌出了問題,就要將這之後的所有segment都刪去
warn("Corruption found in segment %d of log %s, truncating to offset %d.".format(curr.baseOffset, name, curr.nextOffset))
unflushed.foreach(deleteSegment)
}
}
}
//LogSegment.scala
//檢查segment中的訊息是否合法
def recover(maxMessageSize: Int): Int = {
index.truncate()
index.resize(index.maxIndexSize)
timeIndex.truncate()
timeIndex.resize(timeIndex.maxIndexSize)
var validBytes = 0
var lastIndexEntry = 0
maxTimestampSoFar = Record.NO_TIMESTAMP
try {
//遍歷所有的shallow message
//這裡shallow message並不一定是我們理解的一條訊息,kafka可能會將多條訊息壓縮成一條訊息
//所以shallow message可能是一條訊息,也可能是多條訊息組裝成一條訊息
for (entry <- log.shallowEntries(maxMessageSize).asScala) {
val record = entry.record
record.ensureValid()
// The max timestamp should have been put in the outer message, so we don't need to iterate over the inner messages.
if (record.timestamp > maxTimestampSoFar) {
maxTimestampSoFar = record.timestamp
offsetOfMaxTimestamp = entry.offset
}
// Build offset index
if(validBytes - lastIndexEntry > indexIntervalBytes) {
val startOffset = entry.firstOffset
index.append(startOffset, validBytes)
timeIndex.maybeAppend(maxTimestampSoFar, offsetOfMaxTimestamp)
lastIndexEntry = validBytes
}
validBytes += entry.sizeInBytes()
}
} catch {
case e: CorruptRecordException =>
logger.warn("Found invalid messages in log segment %s at byte offset %d: %s."
.format(log.file.getAbsolutePath, validBytes, e.getMessage))
}
val truncated = log.sizeInBytes - validBytes
log.truncateTo(validBytes)
index.trimToValidSize()
// A normally closed segment always appends the biggest timestamp ever seen into log segment, we do this as well.
timeIndex.maybeAppend(maxTimestampSoFar, offsetOfMaxTimestamp, skipFullCheck = true)
timeIndex.trimToValidSize()
truncated
}
上面程式碼中的recoverLog()是kafka在啟動LogManager初試化一個個Log物件時,Log在初試化過程中會執行的一個方法。這個方法主要做了幾件事
- 通過檢查是否有.kafka_cleanshutdown檔案來判斷上一次是否是正常關閉,如果是的話,就不用恢復什麼了,直接更新recovery-checkpoint。
- 如果上次是非正常關閉,通過當前的recovery-checkpoint找出這個recovery-checkpoint之後的所有segment(包括recovery-checkpoint所在的segment)。然後遍歷這些segment,一條一條訊息檢查過去,並重建索引,之後如果有segment的訊息格式不正確,就執行非同步刪除操作,將後面的segment全部刪除掉。
**要注意的是,這些檢查的segment中,只要有一條訊息時invalid,kafka就會刪除所有檢查的segment。**這點是我一直想不通的地方,直到2.0版本也是這樣的邏輯,希望有知道原因的朋友告知一下。
七、分割槽目錄刪除任務
該任務執行的任務主要是刪除分割槽目錄,同時刪除底下的segment資料檔案。
private def deleteLogs(): Unit = {
try {
var failed = 0
//遍歷待刪除列表的元素,逐一刪除分割槽目錄
while (!logsToBeDeleted.isEmpty && failed < logsToBeDeleted.size()) {
val removedLog = logsToBeDeleted.take()
if (removedLog != null) {
try {
removedLog.delete()
info(s"Deleted log for partition ${removedLog.topicPartition} in ${removedLog.dir.getAbsolutePath}.")
} catch {
case e: Throwable =>
error(s"Exception in deleting $removedLog. Moving it to the end of the queue.", e)
failed = failed + 1
logsToBeDeleted.put(removedLog)
}
}
}
} catch {
case e: Throwable =>
error(s"Exception in kafka-delete-logs thread.", e)
}
做的事情主要就是遍歷logsToBeDeleted列表,然後遍歷刪除元素。
那麼什麼時候分割槽會被加到logsToBeDeleted中待刪除呢?
- LogManager啟動時會掃描所有分割槽目錄名結尾是’-delete’的分割槽,加入到logsToBeDeleted中
- 分割槽被刪除的時候走的都是非同步刪除策略,會先被加入到logsToBeDeleted中等待刪除。
在kafka中,要刪除分割槽需要往broker傳送StopReplica請求。broker收到StopReplica請求後,判斷是否需要刪除分割槽,如果要刪除就執行非同步刪除步驟,非同步刪除的程式碼主要如下
def asyncDelete(topicPartition: TopicPartition) = {
//從記憶體中刪去相關資料
val removedLog: Log = logCreationOrDeletionLock synchronized {
logs.remove(topicPartition)
}
if (removedLog != null) {
//We need to wait until there is no more cleaning task on the log to be deleted before actually deleting it.
if (cleaner != null) {
cleaner.abortCleaning(topicPartition)
cleaner.updateCheckpoints(removedLog.dir.getParentFile)
}
//往分割槽目錄名稱的最後加上 '-delete',表示準備刪除
val dirName = Log.logDeleteDirName(removedLog.name)
//關閉分割槽目錄
removedLog.close()
val renamedDir = new File(removedLog.dir.getParent, dirName)
val renameSuccessful = removedLog.dir.renameTo(renamedDir)
if (renameSuccessful) {
removedLog.dir = renamedDir
// change the file pointers for log and index file
for (logSegment <- removedLog.logSegments) {
logSegment.log.setFile(new File(renamedDir, logSegment.log.file.getName))
logSegment.index.file = new File(renamedDir, logSegment.index.file.getName)
}
//加入待刪除列表
logsToBeDeleted.add(removedLog)
removedLog.removeLogMetrics()
info(s"Log for partition ${removedLog.topicPartition} is renamed to ${removedLog.dir.getAbsolutePath} and is scheduled for deletion")
} else {
throw new KafkaStorageException("Failed to rename log directory from " + removedLog.dir.getAbsolutePath + " to " + renamedDir.getAbsolutePath)
}
}
}
- 需要先把分割槽目錄標記一下,在後綴加上’-delete’表示該分割槽準備刪除了。這樣做可以防止如果刪除時間沒到就宕機,下次重啟時可以掃描’-delete’結尾的分割槽再刪除
- 把分割槽目錄新增到logsToBeDeleted中待刪除
八、多磁碟選擇機制
當配置了多個磁碟時,kafka是怎麼保證資料均勻分佈在各個磁碟呢?
這裡多磁碟只的是配置log.dirs
中配置了多個目錄。
這個問題和kafka建立一個新的partition時,如何選擇目錄有關係,下面是kafka建立partition的程式碼
def createLog(topicPartition: TopicPartition, config: LogConfig): Log = {
logCreationOrDeletionLock synchronized {
// create the log if it has not already been created in another thread
getLog(topicPartition).getOrElse {
//選擇新的partition要放在哪個資料目錄上
val dataDir = nextLogDir()
val dir = new File(dataDir, topicPartition.topic + "-" + topicPartition.partition)
dir.mkdirs()
val log = new Log(dir, config, recoveryPoint = 0L, scheduler, time)
logs.put(topicPartition, log)
info("Created log for partition [%s,%d] in %s with properties {%s}."
.format(topicPartition.topic,
topicPartition.partition,
dataDir.getAbsolutePath,
config.originals.asScala.mkString(", ")))
log
}
}
}
private def nextLogDir(): File = {
if(logDirs.size == 1) {
logDirs(0)
} else {
//各個資料目錄的檔案數量
val logCounts = allLogs.groupBy(_.dir.getParent).mapValues(_.size)
//有一些資料目錄底下可能沒有partition目錄
val zeros = logDirs.map(dir => (dir.getPath, 0)).toMap
var dirCounts = (zeros ++ logCounts).toBuffer
//排序後,取當前檔案數量最小的那個資料目錄
val leastLoaded = dirCounts.sortBy(_._2).head
new File(leastLoaded._1)
}
}
從nextLogDir()
程式碼中可以看出,當新建一個新的partition目錄時,主要還是取partition檔案最少的那個資料目錄。
這樣在極端情況下可能會有一些問題,可能兩個資料目錄底下的partition檔案數一樣,但是其中一個數據目錄資料量非常大的情況(各個partition的資料量不一樣)。因此,在選擇多磁碟時也要注意一下,避免造成資源浪費。