Jafka源碼分析——LogManager
在Kafka中,LogManager負責管理broker上全部的Log(每個topic-partition為一個Log)。
通過閱讀源碼可知其詳細完畢的功能例如以下:
1. 依照預設規則對消息隊列進行清理。
2. 依照預設規則對消息隊列進行持久化(flush操作)。
3. 連接ZooKeeper進行broker、topic、partition相關的ZooKeeper操作。
4. 管理broker上全部的Log。
以下一一對這些功能的實現進行具體的解析。
一、對於Log的管理
LogManager包括成員變量logs。logs的key是topic,value是Pool<Integer,Log>
private final Pool<String, Pool<Integer, Log>> logs = new Pool<String, Pool<Integer, Log>>();
LogManager在初始化之後。須要依據配置文件配置的消息隊列根文件夾進行遍歷。
通過遍歷,查找並生成Log。該遍歷的詳細實如今方法load中:
① 獲取消息隊列根文件夾下的全部文件
② 對於根文件夾下的每個文件進行例如以下操作
1.假設是文件夾。則有可能是一個Log,否則不是並忽略
2.對於通過1的文件夾分析其文件名稱,文件夾的文件名稱由兩部分組成:topic-partition
3.對於通過2的文件夾。用文件夾、解析出的topic、解析出的partition生成Log
4.將3生成的Log放入logs日誌池
5.最後,推斷文件夾解析的partition與配置文件裏配置的partition的大小,假設配置文件較小。則更新配置
二、消息隊列清理
消息隊列的清理由Scheduler周期性的調用,詳細的調用在load函數中,基本的刪除實如今cleanLogs函數中。
消息隊列的清理分為兩種情況:一種是超過預設的時間則刪除。二是超過預設的大小則刪除。分別相應兩個函數cleanupExpiredSegments和cleanupSegmentsToMaintainSize。第一種情況比較簡單,由於每個segment相應一個文件,通過對照文件的lastModifiedTime和系統的如今時間來確定其是否超時,假設超時則刪除。對於另外一種情況,首先比較Log的大小與配置的大小。假設小於配置的大小則不刪除。假設大於了配置的大小,則計算超過配置大小的長度(定為差值);然後將小於該差值的segment刪除(這地方有點疑惑。這樣刪除會不會把一些最新的消息隊列給刪除了)。
if (this.scheduler != null) { this.scheduler.scheduleWithRate(new Runnable() { public void run() { try { cleanupLogs(); } catch (IOException e) { logger.error("cleanup log failed.", e); } } }, 60 * 1000, logCleanupIntervalMs); }
三、對於消息隊列的持久化
對消息隊列的flush操作相同由單獨的線程來完畢。該線程通過比較Log上一次的flush時間和當前的系統時間來確定是否須要flush。假設須要則持久化到文件。
註意,消息的隊列的持久化在新增消息的時候也會推斷,假設一個Log保存的新增消息的條數超過了預設值則進行flush操作。
在Kafka中,LogManager負責管理broker上全部的Log(每個topic-partition為一個Log)。通過閱讀源碼可知其詳細完畢的功能例如以下:
1. 依照預設規則對消息隊列進行清理。
2. 依照預設規則對消息隊列進行持久化(flush操作)。
3. 連接ZooKeeper進行broker、topic、partition相關的ZooKeeper操作。
4. 管理broker上全部的Log。
以下一一對這些功能的實現進行具體的解析。
一、對於Log的管理
LogManager包括成員變量logs。logs的key是topic,value是Pool<Integer,Log>(該value又是一個Map,主鍵是partition。value是該partition所相應的Log)。
因此LogManager通過logs保存該broker上全部的消息隊列。
private final Pool<String, Pool<Integer, Log>> logs = new Pool<String, Pool<Integer, Log>>();
LogManager在初始化之後,須要依據配置文件配置的消息隊列根文件夾進行遍歷。通過遍歷,查找並生成Log。該遍歷的詳細實如今方法load中:
① 獲取消息隊列根文件夾下的全部文件
② 對於根文件夾下的每個文件進行例如以下操作
1.假設是文件夾。則有可能是一個Log。否則不是並忽略
2.對於通過1的文件夾分析其文件名稱,文件夾的文件名稱由兩部分組成:topic-partition
3.對於通過2的文件夾。用文件夾、解析出的topic、解析出的partition生成Log
4.將3生成的Log放入logs日誌池
5.最後。推斷文件夾解析的partition與配置文件裏配置的partition的大小,假設配置文件較小,則更新配置
二、消息隊列清理
消息隊列的清理由Scheduler周期性的調用,詳細的調用在load函數中。基本的刪除實如今cleanLogs函數中。消息隊列的清理分為兩種情況:一種是超過預設的時間則刪除,二是超過預設的大小則刪除,分別相應兩個函數cleanupExpiredSegments和cleanupSegmentsToMaintainSize。第一種情況比較簡單,由於每個segment相應一個文件,通過對照文件的lastModifiedTime和系統的如今時間來確定其是否超時。假設超時則刪除。對於另外一種情況。首先比較Log的大小與配置的大小。假設小於配置的大小則不刪除;假設大於了配置的大小,則計算超過配置大小的長度(定為差值)。然後將小於該差值的segment刪除(這地方有點疑惑。這樣刪除會不會把一些最新的消息隊列給刪除了)。
if (this.scheduler != null) { this.scheduler.scheduleWithRate(new Runnable() { public void run() { try { cleanupLogs(); } catch (IOException e) { logger.error("cleanup log failed.", e); } } }, 60 * 1000, logCleanupIntervalMs); }
三、對於消息隊列的持久化
對消息隊列的flush操作相同由單獨的線程來完畢。該線程通過比較Log上一次的flush時間和當前的系統時間來確定是否須要flush,假設須要則持久化到文件。註意,消息的隊列的持久化在新增消息的時候也會推斷,假設一個Log保存的新增消息的條數超過了預設值則進行flush操作。
Jafka源碼分析——LogManager