1. 程式人生 > 其它 >RocketMQ訊息儲存(三) - MappedFileQueue

RocketMQ訊息儲存(三) - MappedFileQueue

RocketMQ訊息儲存(三) - MappedFileQueue

上一篇 講解了 MappedFile 類, 其底層實際上是通過 MappedByteBuffer採用零拷貝的方式 來管理 檔案的讀寫 。

既然 MappedFile 是管理單個檔案的類, 那麼就會存在用來管理 這些 MappedFile的類:MappedFileQueue
我們可以把他們之間的關係形象的理解成 : 檔案(MappedFile)目錄(MappedFileQueue)

想要分析 MappedFileQueue ,剛開始會感覺很抽象。 這裡我考慮了一下,還是先把圖放在文章的開頭, 在腦子裡有個大概的 印象, 後面分析原始碼的時候,可以參考該圖來幫助理解:

1.屬性

那麼直接來看程式碼吧,老規矩,先分析屬性。 下面我貼了幾個核心的屬性:

    // 該MappedFileQueue 所管理的目錄路徑 
	//      1. CommitLog檔案目錄路徑為: ../store/commit/log
	//      2. ConsumeQueue檔案目錄路徑為: ../store/xxx_topic/x
    private final String storePath;

    // 目錄下每個檔案大小 
	//     1. commitLog檔案 預設1g 
	//     2. consumeQueue檔案 預設600w位元組)
    private final int mappedFileSize;

    //  目錄下所管理的所有 MappedFile 集合 
    private final CopyOnWriteArrayList<MappedFile> mappedFiles = new CopyOnWriteArrayList<MappedFile>();

    // 建立 MappedFile 的服務, 內部有自己的執行緒。 (通過該類能夠非同步建立 MappedFile)
    private final AllocateMappedFileService allocateMappedFileService;

    // 目錄的刷盤位點  
	//    (最後一個MappedFile.fileName +  最後一個MappedFile.flushPosition)
    private long flushedWhere = 0;

    // 當前目錄下最後一條msg儲存時間
    private volatile long storeTimestamp = 0;

上述屬性基本上都很簡單, 這裡需要強調其中一個 屬性 flushedWhere , 請結合上面的圖片來理解,

MappedFileQueue目錄中的 MappedFile檔案 是順序寫的, 當檔案寫滿了之後,才回去建立新的MappedFile , 其中MappedFile的檔名為 物理偏移量。

簡單舉個例子(僅作說明使用 ):假設 每個檔案大小為 64bytes 第一個檔名為 00000 , 當該檔案寫滿了 則需要建立第二個檔案,那麼這第二個檔案的檔名為 00064 , 此時寫也只能向第二個檔案中寫,那麼當寫了 32bytes後 的 flushedWhere = 00064 + 00032 = 00096 .

2. 核心方法

1.load

    /**
     *  broker啟動階段, 載入本地磁碟資料使用的。
     *  該方法會讀取 "storePath" 目錄下的檔案, 為對應的檔案建立mappedFile物件,並加入到List中
     */
    public boolean load() {
        // 建立目錄物件
        File dir = new File(this.storePath);

        // 獲取目錄下 所有的檔案
        File[] files = dir.listFiles();

        if (files != null) {
            // ascending order
            // 按照檔名排序
            Arrays.sort(files);
            for (File file : files) {

                if (file.length() != this.mappedFileSize) {
                    log.warn(file + "\t" + file.length()
                        + " length not matched message store config value, please check it manually");
                    return false;
                }

                try {
                    // 為當前File建立 對應的mappedFile物件
                    MappedFile mappedFile = new MappedFile(file.getPath(), mappedFileSize);

                    // 設定 wrotePosition 和 flushedPosition  (這裡給的值都是 mappedFileSize, 並不是準確值。 準確值需要recover階段設定)
                    mappedFile.setWrotePosition(this.mappedFileSize);
                    mappedFile.setFlushedPosition(this.mappedFileSize);
                    mappedFile.setCommittedPosition(this.mappedFileSize);

                    // 加入到 list中
                    this.mappedFiles.add(mappedFile);
                    log.info("load " + file.getPath() + " OK");
                } catch (IOException e) {
                    log.error("load file " + file + " error", e);
                    return false;
                }
            }
        }

        return true;
    }

上述程式碼簡單易懂, 先總結下該方法主要做的事情,如下:

  1. 根據 指定檔案目錄( 如:../store/commit/log) , 構建 File 物件(注意:是個資料夾)。
  2. 遍歷 該資料夾下 所有的 檔案 並排序 得到 File[] files 陣列 (注意:這是檔案的集合)。
  3. 遍歷 排序後的檔案集合,為每個檔案建立 MappedFile物件 並賦上初始值,然後存入 MappedFiles集合

其中第3條,給MappedFile 賦初始值, 注意:該值僅僅是初始值沒有任何作用

正常Broker 在啟動後, 會先呼叫 load() 方法 加載出目錄下所有的MappedFile, 然後再通過 recover的相關方法來重新賦上準確的值。

2. getLastMappedFile

該方法有 3個過載方法, 直接來看其中引數最多的那個。

    /**
     * 獲取當前正在順序寫的MappedFile物件  
     *   (儲存訊息 或者 儲存ConsumeQueue資料時, 都需要獲取當前正在順序寫的MappedFile物件)
     *   注意: 如果MappedFile寫滿了 或者 不存在查詢的MappedFile, 則建立新的MappedFile
     *
     * @param startOffset   檔案起始偏移量  
     * @param needCreate    當list為空時,是否建立 mappedFile
     * @return
     */
    public MappedFile getLastMappedFile(final long startOffset, boolean needCreate) {

        // 該值 控制是否需要建立MappedFile ,當需要建立MappedFile時,它充當檔名
        // 兩種情況 會建立:
        //  1. list 內沒有mappedFIle
        //  2. list最後一個mappedFile (當前順序寫的mappedFile)它寫滿了
        long createOffset = -1;

		
        //  獲取 list 中的最後一個 MappedFile
        MappedFile mappedFileLast = getLastMappedFile();

        
         // 情況1 list 內沒有mappedFile
        if (mappedFileLast == null) { 
            
            // createOffset 取值必須是 mappedFileSize 的倍數 或者 0
            createOffset = startOffset - (startOffset % this.mappedFileSize);
        }

        
        // 情況2 list最後一個mappedFile (當前順序寫的mappedFile)它寫滿了
        if (mappedFileLast != null && mappedFileLast.isFull()) {  
            
            // 上一個檔名 轉Long + mappedFileSize
            createOffset = mappedFileLast.getFileFromOffset() + this.mappedFileSize;
        }


         // 這裡是建立 新的 mappedFile 邏輯
        if (createOffset != -1 && needCreate) {

            // 獲取 下次待建立檔案的 絕對路徑
            String nextFilePath = this.storePath + File.separator + UtilAll.offset2FileName(createOffset);

            // 獲取 下下次待建立檔案的 絕對路徑
            String nextNextFilePath = this.storePath + File.separator
                + UtilAll.offset2FileName(createOffset + this.mappedFileSize);
            MappedFile mappedFile = null;

			
            // 使用 allocateMappedFileService 來建立 MappedFile
            if (this.allocateMappedFileService != null) { 
                // 當mappedFileSize >= 1g 的話, 這裡建立的mappedFile 會執行它的 預熱方法
                mappedFile = this.allocateMappedFileService.putRequestAndReturnMappedFile(nextFilePath,
                    nextNextFilePath, this.mappedFileSize);
            } 
            
            // 直接建立 MappedFile (這裡沒有預熱)
            else {
                try {
                    mappedFile = new MappedFile(nextFilePath, this.mappedFileSize);
                } catch (IOException e) {
                    log.error("create mappedFile exception", e);
                }
            }

            
            // 將建立的 mappedFile 新增到 list中 並返回
            if (mappedFile != null) {
                if (this.mappedFiles.isEmpty()) {
                    mappedFile.setFirstCreateInQueue(true);
                }
                this.mappedFiles.add(mappedFile);
            }

            return mappedFile;
        }
        
		// 走到這裡... 是無需建立 MappedFile時 返回。
        return mappedFileLast;
    }

上述程式碼很長,理解起來可能稍微有些困難。

首先要理解的是 該方法的目的是什麼? 獲取當前正在順序寫的MappedFile.

前面在 屬性小結中 著重講解了 flushedWhere 欄位,與它的例子類似, 當前正在順序寫的MappedFile 必定是 MappedFile集合中的末尾檔案。 因此程式碼中直接呼叫 getLastMappedFile() 方法獲取了末尾的MappedFile, 而此時 會存在 3中情況:

  1. 該 MappedFile 存在 且 MappedFile 內 還有剩餘可寫空間。(這也是最好的情況,正常返回就行了)
  2. 該 MappedFile 存在,但是該MappedFile 已經被寫滿了。 (需要建立 新的MappedFile)
  3. 該 MappedFile 不存在 ,也就說明 目錄下並沒有任何檔案。(需要建立 新的MappedFile)

其中 2 ,3 情況 需要 建立新的 MappedFile ,而建立 MappedFile 的方式分為了兩種:

  1. 通過 allocateMappedFileService 使用其它執行緒來建立。( MappedFile >= 1g 時 有預熱操作)
  2. 普通 new MappedFile() 方式建立。(無預熱操作)

預熱操作 會在 後面的文章中講解, 這裡就只要理解字面意思就行了。

下面再簡單總結梳理下該方法的步驟:

  1. 獲取目錄下 最後一個 mappedFileLast
  2. 根據 mappedFileLast 判斷是否需要建立新的 MappedFile
    1. 不需要建立新的MappedFile, 則直接返回 mappedFileLast
    2. 需要建立新的MappedFile, 此時會根據是否存在 allocateMappedFileService 來決定採用哪種建立方式:
      1. allocateMappedFileService 有預熱操作的
      2. 普通建立

3.deleteExpiredFileByTime

    /**
     * commitLog 目錄刪除過期檔案呼叫   
     * @param expiredTime  過期時間
     * @param deleteFilesInterval  刪除兩個檔案之間的時間間隔
     * @param intervalForcibly  強制關閉資源的時間間隔  mf.destory傳遞的引數
     * @param cleanImmediately  true 強制刪除,不考慮過期時間這個條件
     * @return
     */
    public int deleteExpiredFileByTime(final long expiredTime,
        final int deleteFilesInterval,
        final long intervalForcibly,
        final boolean cleanImmediately) {

        // 獲取mfs陣列 (實際上就是將MappedFile集合 轉成 陣列) 
        Object[] mfs = this.copyMappedFiles(0);

        if (null == mfs)
            return 0;
		
        // 這裡 減-1 是保證 當前正在順序寫的MappedFile不被刪除
        int mfsLength = mfs.length - 1;

        // 記錄刪除的檔案數
        int deleteCount = 0;

        // 被刪除的檔案集合
        List<MappedFile> files = new ArrayList<MappedFile>();
        
        if (null != mfs) {
            for (int i = 0; i < mfsLength; i++) {
                MappedFile mappedFile = (MappedFile) mfs[i];

                // 計算出當前檔案的存活時間截止點
                long liveMaxTimestamp = mappedFile.getLastModifiedTimestamp() + expiredTime;

                // 條件成立:
                //     條件一: 檔案存活時間 達到上限
                //     條件二: disk佔用率達到上限  會強制刪除
                if (System.currentTimeMillis() >= liveMaxTimestamp || cleanImmediately) {

                    // 刪除檔案
                    if (mappedFile.destroy(intervalForcibly)) {
                        files.add(mappedFile);
                        deleteCount++; // 增加刪除檔案計數

                        if (files.size() >= DELETE_FILES_BATCH_MAX) {
                            break;
                        }
						
                        // 在刪除完檔案後 需要sleep,然後再去刪除下一個檔案
                        if (deleteFilesInterval > 0 && (i + 1) < mfsLength) {
                            try {
                                Thread.sleep(deleteFilesInterval);
                            } catch (InterruptedException e) {
                            }
                        }
                    } else {
                        break;
                    }
                } else {
                    //avoid deleting files in the middle
                    break;
                }
            }
        }

        // 將滿足刪除條件的mf檔案 從 list內刪除
        deleteExpiredFile(files);

        return deleteCount;
    }

上述程式碼 雖然長,但是很容易理解, 就是 遍歷 目錄下的 MappedFile 集合, 尋找出 滿足刪除條件的 MappedFile ,再呼叫 mf.destory() 方法進行刪除。

只需要注意的是: 該方法是供 刪除 CommitLog 檔案使用的。