RocketMQ訊息儲存(三) - MappedFileQueue
RocketMQ訊息儲存(三) - MappedFileQueue
上一篇 講解了 MappedFile 類, 其底層實際上是通過 MappedByteBuffer採用零拷貝的方式 來管理 檔案的讀寫 。
既然 MappedFile 是管理單個檔案的類, 那麼就會存在用來管理 這些 MappedFile的類:MappedFileQueue。
我們可以把他們之間的關係形象的理解成 : 檔案(MappedFile) 和 目錄(MappedFileQueue)
想要分析 MappedFileQueue ,剛開始會感覺很抽象。 這裡我考慮了一下,還是先把圖放在文章的開頭, 在腦子裡有個大概的 印象, 後面分析原始碼的時候,可以參考該圖來幫助理解:
1.屬性
那麼直接來看程式碼吧,老規矩,先分析屬性。 下面我貼了幾個核心的屬性:
// 該MappedFileQueue 所管理的目錄路徑 // 1. CommitLog檔案目錄路徑為: ../store/commit/log // 2. ConsumeQueue檔案目錄路徑為: ../store/xxx_topic/x private final String storePath; // 目錄下每個檔案大小 // 1. commitLog檔案 預設1g // 2. consumeQueue檔案 預設600w位元組) private final int mappedFileSize; // 目錄下所管理的所有 MappedFile 集合 private final CopyOnWriteArrayList<MappedFile> mappedFiles = new CopyOnWriteArrayList<MappedFile>(); // 建立 MappedFile 的服務, 內部有自己的執行緒。 (通過該類能夠非同步建立 MappedFile) private final AllocateMappedFileService allocateMappedFileService; // 目錄的刷盤位點 // (最後一個MappedFile.fileName + 最後一個MappedFile.flushPosition) private long flushedWhere = 0; // 當前目錄下最後一條msg儲存時間 private volatile long storeTimestamp = 0;
上述屬性基本上都很簡單, 這裡需要強調其中一個 屬性 flushedWhere , 請結合上面的圖片來理解,
MappedFileQueue目錄中的 MappedFile檔案 是順序寫的, 當檔案寫滿了之後,才回去建立新的MappedFile , 其中MappedFile的檔名為 物理偏移量。
簡單舉個例子(僅作說明使用 ):假設 每個檔案大小為 64bytes 第一個檔名為 00000 , 當該檔案寫滿了 則需要建立第二個檔案,那麼這第二個檔案的檔名為 00064 , 此時寫也只能向第二個檔案中寫,那麼當寫了 32bytes後 的 flushedWhere = 00064 + 00032 = 00096 .
2. 核心方法
1.load
/**
* broker啟動階段, 載入本地磁碟資料使用的。
* 該方法會讀取 "storePath" 目錄下的檔案, 為對應的檔案建立mappedFile物件,並加入到List中
*/
public boolean load() {
// 建立目錄物件
File dir = new File(this.storePath);
// 獲取目錄下 所有的檔案
File[] files = dir.listFiles();
if (files != null) {
// ascending order
// 按照檔名排序
Arrays.sort(files);
for (File file : files) {
if (file.length() != this.mappedFileSize) {
log.warn(file + "\t" + file.length()
+ " length not matched message store config value, please check it manually");
return false;
}
try {
// 為當前File建立 對應的mappedFile物件
MappedFile mappedFile = new MappedFile(file.getPath(), mappedFileSize);
// 設定 wrotePosition 和 flushedPosition (這裡給的值都是 mappedFileSize, 並不是準確值。 準確值需要recover階段設定)
mappedFile.setWrotePosition(this.mappedFileSize);
mappedFile.setFlushedPosition(this.mappedFileSize);
mappedFile.setCommittedPosition(this.mappedFileSize);
// 加入到 list中
this.mappedFiles.add(mappedFile);
log.info("load " + file.getPath() + " OK");
} catch (IOException e) {
log.error("load file " + file + " error", e);
return false;
}
}
}
return true;
}
上述程式碼簡單易懂, 先總結下該方法主要做的事情,如下:
- 根據 指定檔案目錄( 如:../store/commit/log) , 構建 File 物件(注意:是個資料夾)。
- 遍歷 該資料夾下 所有的 檔案 並排序 得到
File[] files
陣列 (注意:這是檔案的集合)。 - 遍歷 排序後的檔案集合,為每個檔案建立 MappedFile物件 並賦上初始值,然後存入 MappedFiles集合中
其中第3條,給MappedFile 賦初始值, 注意:該值僅僅是初始值沒有任何作用 。
正常Broker 在啟動後, 會先呼叫 load() 方法 加載出目錄下所有的MappedFile, 然後再通過 recover的相關方法來重新賦上準確的值。
2. getLastMappedFile
該方法有 3個過載方法, 直接來看其中引數最多的那個。
/**
* 獲取當前正在順序寫的MappedFile物件
* (儲存訊息 或者 儲存ConsumeQueue資料時, 都需要獲取當前正在順序寫的MappedFile物件)
* 注意: 如果MappedFile寫滿了 或者 不存在查詢的MappedFile, 則建立新的MappedFile
*
* @param startOffset 檔案起始偏移量
* @param needCreate 當list為空時,是否建立 mappedFile
* @return
*/
public MappedFile getLastMappedFile(final long startOffset, boolean needCreate) {
// 該值 控制是否需要建立MappedFile ,當需要建立MappedFile時,它充當檔名
// 兩種情況 會建立:
// 1. list 內沒有mappedFIle
// 2. list最後一個mappedFile (當前順序寫的mappedFile)它寫滿了
long createOffset = -1;
// 獲取 list 中的最後一個 MappedFile
MappedFile mappedFileLast = getLastMappedFile();
// 情況1 list 內沒有mappedFile
if (mappedFileLast == null) {
// createOffset 取值必須是 mappedFileSize 的倍數 或者 0
createOffset = startOffset - (startOffset % this.mappedFileSize);
}
// 情況2 list最後一個mappedFile (當前順序寫的mappedFile)它寫滿了
if (mappedFileLast != null && mappedFileLast.isFull()) {
// 上一個檔名 轉Long + mappedFileSize
createOffset = mappedFileLast.getFileFromOffset() + this.mappedFileSize;
}
// 這裡是建立 新的 mappedFile 邏輯
if (createOffset != -1 && needCreate) {
// 獲取 下次待建立檔案的 絕對路徑
String nextFilePath = this.storePath + File.separator + UtilAll.offset2FileName(createOffset);
// 獲取 下下次待建立檔案的 絕對路徑
String nextNextFilePath = this.storePath + File.separator
+ UtilAll.offset2FileName(createOffset + this.mappedFileSize);
MappedFile mappedFile = null;
// 使用 allocateMappedFileService 來建立 MappedFile
if (this.allocateMappedFileService != null) {
// 當mappedFileSize >= 1g 的話, 這裡建立的mappedFile 會執行它的 預熱方法
mappedFile = this.allocateMappedFileService.putRequestAndReturnMappedFile(nextFilePath,
nextNextFilePath, this.mappedFileSize);
}
// 直接建立 MappedFile (這裡沒有預熱)
else {
try {
mappedFile = new MappedFile(nextFilePath, this.mappedFileSize);
} catch (IOException e) {
log.error("create mappedFile exception", e);
}
}
// 將建立的 mappedFile 新增到 list中 並返回
if (mappedFile != null) {
if (this.mappedFiles.isEmpty()) {
mappedFile.setFirstCreateInQueue(true);
}
this.mappedFiles.add(mappedFile);
}
return mappedFile;
}
// 走到這裡... 是無需建立 MappedFile時 返回。
return mappedFileLast;
}
上述程式碼很長,理解起來可能稍微有些困難。
首先要理解的是 該方法的目的是什麼? 獲取當前正在順序寫的MappedFile.
前面在 屬性小結中 著重講解了 flushedWhere
欄位,與它的例子類似, 當前正在順序寫的MappedFile 必定是 MappedFile集合中的末尾檔案。 因此程式碼中直接呼叫 getLastMappedFile()
方法獲取了末尾的MappedFile, 而此時 會存在 3中情況:
- 該 MappedFile 存在 且 MappedFile 內 還有剩餘可寫空間。(這也是最好的情況,正常返回就行了)
- 該 MappedFile 存在,但是該MappedFile 已經被寫滿了。 (需要建立 新的MappedFile)
- 該 MappedFile 不存在 ,也就說明 目錄下並沒有任何檔案。(需要建立 新的MappedFile)
其中 2 ,3 情況 需要 建立新的 MappedFile ,而建立 MappedFile 的方式分為了兩種:
- 通過 allocateMappedFileService 使用其它執行緒來建立。( MappedFile >= 1g 時 有預熱操作)
- 普通
new MappedFile()
方式建立。(無預熱操作)
預熱操作 會在 後面的文章中講解, 這裡就只要理解字面意思就行了。
下面再簡單總結梳理下該方法的步驟:
- 獲取目錄下 最後一個 mappedFileLast
- 根據 mappedFileLast 判斷是否需要建立新的 MappedFile
- 不需要建立新的MappedFile, 則直接返回 mappedFileLast
- 需要建立新的MappedFile, 此時會根據是否存在 allocateMappedFileService 來決定採用哪種建立方式:
- allocateMappedFileService 有預熱操作的
- 普通建立
3.deleteExpiredFileByTime
/**
* commitLog 目錄刪除過期檔案呼叫
* @param expiredTime 過期時間
* @param deleteFilesInterval 刪除兩個檔案之間的時間間隔
* @param intervalForcibly 強制關閉資源的時間間隔 mf.destory傳遞的引數
* @param cleanImmediately true 強制刪除,不考慮過期時間這個條件
* @return
*/
public int deleteExpiredFileByTime(final long expiredTime,
final int deleteFilesInterval,
final long intervalForcibly,
final boolean cleanImmediately) {
// 獲取mfs陣列 (實際上就是將MappedFile集合 轉成 陣列)
Object[] mfs = this.copyMappedFiles(0);
if (null == mfs)
return 0;
// 這裡 減-1 是保證 當前正在順序寫的MappedFile不被刪除
int mfsLength = mfs.length - 1;
// 記錄刪除的檔案數
int deleteCount = 0;
// 被刪除的檔案集合
List<MappedFile> files = new ArrayList<MappedFile>();
if (null != mfs) {
for (int i = 0; i < mfsLength; i++) {
MappedFile mappedFile = (MappedFile) mfs[i];
// 計算出當前檔案的存活時間截止點
long liveMaxTimestamp = mappedFile.getLastModifiedTimestamp() + expiredTime;
// 條件成立:
// 條件一: 檔案存活時間 達到上限
// 條件二: disk佔用率達到上限 會強制刪除
if (System.currentTimeMillis() >= liveMaxTimestamp || cleanImmediately) {
// 刪除檔案
if (mappedFile.destroy(intervalForcibly)) {
files.add(mappedFile);
deleteCount++; // 增加刪除檔案計數
if (files.size() >= DELETE_FILES_BATCH_MAX) {
break;
}
// 在刪除完檔案後 需要sleep,然後再去刪除下一個檔案
if (deleteFilesInterval > 0 && (i + 1) < mfsLength) {
try {
Thread.sleep(deleteFilesInterval);
} catch (InterruptedException e) {
}
}
} else {
break;
}
} else {
//avoid deleting files in the middle
break;
}
}
}
// 將滿足刪除條件的mf檔案 從 list內刪除
deleteExpiredFile(files);
return deleteCount;
}
上述程式碼 雖然長,但是很容易理解, 就是 遍歷 目錄下的 MappedFile 集合, 尋找出 滿足刪除條件的 MappedFile ,再呼叫 mf.destory()
方法進行刪除。
只需要注意的是: 該方法是供 刪除 CommitLog 檔案使用的。