1. 程式人生 > >Jafka源碼分析——LogManager

Jafka源碼分析——LogManager

flush fontsize ger 一個 日誌 style sni 配置文件 article



Kafka中,LogManager負責管理broker上全部的Log(每個topic-partition為一個Log)。

通過閱讀源碼可知其詳細完畢的功能例如以下:

1. 依照預設規則對消息隊列進行清理。

2. 依照預設規則對消息隊列進行持久化(flush操作)。

3. 連接ZooKeeper進行brokertopicpartition相關的ZooKeeper操作。

4. 管理broker上全部的Log

以下一一對這些功能的實現進行具體的解析。

一、對於Log的管理

LogManager包括成員變量logslogskeytopicvaluePool<Integer,Log>

(該value又是一個Map。主鍵是partitionvalue是該partition所相應的Log)。因此LogManager通過logs保存該broker上全部的消息隊列。

private final Pool<String, Pool<Integer, Log>> logs = new Pool<String, 	Pool<Integer, Log>>();

LogManager在初始化之後。須要依據配置文件配置的消息隊列根文件夾進行遍歷。

通過遍歷,查找並生成Log。該遍歷的詳細實如今方法load中:

① 獲取消息隊列根文件夾下的全部文件

② 對於根文件夾下的每個文件進行例如以下操作

1.假設是文件夾。則有可能是一個Log,否則不是並忽略

2.對於通過1的文件夾分析其文件名稱,文件夾的文件名稱由兩部分組成:topic-partition

3.對於通過2的文件夾。用文件夾、解析出的topic、解析出的partition生成Log

4.3生成的Log放入logs日誌池

5.最後,推斷文件夾解析的partition與配置文件裏配置的partition的大小,假設配置文件較小。則更新配置

技術分享

二、消息隊列清理

消息隊列的清理由Scheduler周期性的調用,詳細的調用在load函數中,基本的刪除實如今cleanLogs函數中。

消息隊列的清理分為兩種情況:一種是超過預設的時間則刪除。二是超過預設的大小則刪除。分別相應兩個函數cleanupExpiredSegments和cleanupSegmentsToMaintainSize。第一種情況比較簡單,由於每個segment相應一個文件,通過對照文件的lastModifiedTime和系統的如今時間來確定其是否超時,假設超時則刪除。對於另外一種情況,首先比較Log的大小與配置的大小。假設小於配置的大小則不刪除。假設大於了配置的大小,則計算超過配置大小的長度(定為差值);然後將小於該差值的segment刪除(這地方有點疑惑。這樣刪除會不會把一些最新的消息隊列給刪除了)。

 if (this.scheduler != null) {
 	this.scheduler.scheduleWithRate(new Runnable() {
                public void run() {
                    try {
                        cleanupLogs();
                    } catch (IOException e) {
                        logger.error("cleanup log failed.", e);
                    }
                }
            }, 60 * 1000, logCleanupIntervalMs);
}


三、對於消息隊列的持久化

對消息隊列的flush操作相同由單獨的線程來完畢。該線程通過比較Log上一次的flush時間和當前的系統時間來確定是否須要flush。假設須要則持久化到文件。

註意,消息的隊列的持久化在新增消息的時候也會推斷,假設一個Log保存的新增消息的條數超過了預設值則進行flush操作。


Kafka中,LogManager負責管理broker上全部的Log(每個topic-partition為一個Log)。通過閱讀源碼可知其詳細完畢的功能例如以下:

1. 依照預設規則對消息隊列進行清理。

2. 依照預設規則對消息隊列進行持久化(flush操作)。

3. 連接ZooKeeper進行brokertopicpartition相關的ZooKeeper操作。

4. 管理broker上全部的Log

以下一一對這些功能的實現進行具體的解析。

一、對於Log的管理

LogManager包括成員變量logslogskeytopicvaluePool<Integer,Log>(該value又是一個Map,主鍵是partitionvalue是該partition所相應的Log)。

因此LogManager通過logs保存該broker上全部的消息隊列。

private final Pool<String, Pool<Integer, Log>> logs = new Pool<String, 	Pool<Integer, Log>>();

LogManager在初始化之後,須要依據配置文件配置的消息隊列根文件夾進行遍歷。通過遍歷,查找並生成Log。該遍歷的詳細實如今方法load中:

① 獲取消息隊列根文件夾下的全部文件

② 對於根文件夾下的每個文件進行例如以下操作

1.假設是文件夾。則有可能是一個Log。否則不是並忽略

2.對於通過1的文件夾分析其文件名稱,文件夾的文件名稱由兩部分組成:topic-partition

3.對於通過2的文件夾。用文件夾、解析出的topic、解析出的partition生成Log

4.3生成的Log放入logs日誌池

5.最後。推斷文件夾解析的partition與配置文件裏配置的partition的大小,假設配置文件較小,則更新配置


二、消息隊列清理

消息隊列的清理由Scheduler周期性的調用,詳細的調用在load函數中。基本的刪除實如今cleanLogs函數中。消息隊列的清理分為兩種情況:一種是超過預設的時間則刪除,二是超過預設的大小則刪除,分別相應兩個函數cleanupExpiredSegments和cleanupSegmentsToMaintainSize。第一種情況比較簡單,由於每個segment相應一個文件,通過對照文件的lastModifiedTime和系統的如今時間來確定其是否超時。假設超時則刪除。對於另外一種情況。首先比較Log的大小與配置的大小。假設小於配置的大小則不刪除;假設大於了配置的大小,則計算超過配置大小的長度(定為差值)。然後將小於該差值的segment刪除(這地方有點疑惑。這樣刪除會不會把一些最新的消息隊列給刪除了)。

 if (this.scheduler != null) {
 	this.scheduler.scheduleWithRate(new Runnable() {
                public void run() {
                    try {
                        cleanupLogs();
                    } catch (IOException e) {
                        logger.error("cleanup log failed.", e);
                    }
                }
            }, 60 * 1000, logCleanupIntervalMs);
}


三、對於消息隊列的持久化

對消息隊列的flush操作相同由單獨的線程來完畢。該線程通過比較Log上一次的flush時間和當前的系統時間來確定是否須要flush,假設須要則持久化到文件。註意,消息的隊列的持久化在新增消息的時候也會推斷,假設一個Log保存的新增消息的條數超過了預設值則進行flush操作。


Jafka源碼分析——LogManager