1. 程式人生 > 其它 >RocketMQ刷盤機制

RocketMQ刷盤機制

概覽

RocketMQ的儲存讀寫是基於JDK NIO的記憶體對映機制的,訊息儲存時首先將訊息追加到記憶體中。在根據不同的刷盤策略在不同的時間進行刷盤

。如果是同步刷盤,訊息追加到記憶體後,將同步呼叫MappedByteBuffer的force()方法,同步等待刷盤結果,進行刷盤結果返回。如果是非同步刷盤,

在訊息追加到記憶體後立刻,不等待刷盤結果立刻返回儲存成功結果給訊息傳送端。RocketMQ使用一個單獨的執行緒按照一個設定的頻率執行刷盤操作。

通過在broker配置檔案中配置flushDiskType來設定刷盤方式,ASYNC_FLUSH(非同步刷盤)、SYNC_FLUSH(同步刷盤)。預設為非同步刷盤。

本次以Commitlog檔案刷盤機制為例來講解刷盤機制。Consumequeue、IndexFile刷盤原理和Commitlog一直。索引檔案的刷盤機制並不是採取定時刷盤機制,

而是每更新一次索引檔案就會將上一次的改動刷寫到磁碟。

刷盤服務是將commitlog、consumequeue兩者中的MappedFile檔案中的MappedByteBuffer或者FileChannel中的記憶體中的資料,刷寫到磁碟。

還有將IndexFile中的MappedByteBuffer(this.mappedByteBuffer = this.mappedFile.getMappedByteBuffer())中記憶體的資料刷寫到磁碟。

刷盤服務的入口

刷盤服務的入口是CommitLog類物件,FlushCommitLogService是刷盤服務物件,如果是同步刷盤它被賦值為GroupCommitService,

如果是非同步刷盤它被賦值為FlushRealTimeService;還有一個FlushCommitLogService的commitLogService物件,這個是將 TransientStorePoll 中的直接記憶體ByteBuffer,

寫到FileChannel對映的磁碟檔案中的服務。

// 非同步、同步刷盤服務初始化
if (FlushDiskType.SYNC_FLUSH == defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {
    // 同步刷盤服務為 GroupCommitService
    this.flushCommitLogService = new GroupCommitService();
} else {
    // 非同步刷盤服務為 FlushRealTimeService
    this.flushCommitLogService = new FlushRealTimeService();
}

// 定時將 transientStorePoll 中的直接記憶體 ByteBuffer,提交到記憶體對映 MappedByteBuffer 中
this.commitLogService = new CommitRealTimeService();
刷盤方法呼叫入口

putMessage()方法,將訊息寫入記憶體的方式不同,呼叫的刷盤方式也不同。如果是asyncPutMessage()非同步將訊息寫入記憶體,submitFlushRequest()方法是刷盤入口。

如果是putMessage()同步將訊息寫入記憶體,handleDiskFlush()方法是刷盤入口。handleDiskFlush()和submitFlushRequest()都包含有同步刷盤和非同步刷盤的方法。

// 非同步的方式存放訊息
public CompletableFuture<PutMessageResult> asyncPutMessage(final MessageExtBrokerInner msg) {

    // 非同步儲存訊息,提交刷盤請求
    CompletableFuture<PutMessageStatus> flushResultFuture = submitFlushRequest(result, putMessageResult, msg);
    CompletableFuture<PutMessageStatus> replicaResultFuture = submitReplicaRequest(result, putMessageResult, msg);
    // 根據刷盤結果副本結果,返回存放訊息的結果
    return flushResultFuture.thenCombine(replicaResultFuture, (flushStatus, replicaStatus) -> {
        if (flushStatus != PutMessageStatus.PUT_OK) {
            putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_DISK_TIMEOUT);
        }
        if (replicaStatus != PutMessageStatus.PUT_OK) {
            putMessageResult.setPutMessageStatus(replicaStatus);
        }
        return putMessageResult;
    });
}
// 同步方式存放訊息
public PutMessageResult putMessage(final MessageExtBrokerInner msg) {

    // handle 硬碟重新整理
    handleDiskFlush(result, putMessageResult, msg);
    // handle 高可用
    handleHA(result, putMessageResult, msg);
    // 返回儲存訊息的結果
    return putMessageResult;
}

同步刷盤

一條訊息呼叫一次刷盤服務,等待刷盤結果返回,然後再將結果返回;才能處理下一條刷盤訊息。以handleDiskFlush()方法來介紹同步刷盤和非同步刷盤,

這裡是區分刷盤方式的分水嶺。

/**
 * 一條訊息進行刷盤
 * @param result 擴充套件到記憶體ByteBuffer的結果
 * @param putMessageResult 放入ByteBuffer這個過程的結果
 * @param messageExt 存放的訊息
 */
public void handleDiskFlush(AppendMessageResult result, PutMessageResult putMessageResult, MessageExt messageExt) {
    // Synchronization flush 同步
    if (FlushDiskType.SYNC_FLUSH == this.defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {
        final GroupCommitService service = (GroupCommitService) this.flushCommitLogService;
        // 是否等待伺服器將這一條訊息儲存完畢再返回(等待刷盤完成),還是直接處理其他寫佇列requestsWrite裡面的請求
        if (messageExt.isWaitStoreMsgOK()) {
            //刷盤請求
            GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes());
            //放入寫請求佇列
            service.putRequest(request);
            // 同步等待獲取刷盤結果
            CompletableFuture<PutMessageStatus> flushOkFuture = request.future();
            PutMessageStatus flushStatus = null;
            try {
                // 5秒超市等待刷盤結果
                flushStatus = flushOkFuture.get(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout(),
                        TimeUnit.MILLISECONDS);
            } catch (InterruptedException | ExecutionException | TimeoutException e) {
                //flushOK=false;
            }
            // 刷盤失敗,更新存放訊息結果超時
            if (flushStatus != PutMessageStatus.PUT_OK) {
                log.error("do groupcommit, wait for flush failed, topic: " + messageExt.getTopic() + " tags: " + messageExt.getTags()
                    + " client address: " + messageExt.getBornHostString());
                putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_DISK_TIMEOUT);
            }
        } else {
            // 喚醒處理刷盤請求寫磁碟執行緒,處理刷盤請求執行緒和提交刷盤請求之前的協調,通過CountDownLatch(1)操作,通過控制hasNotified狀態來實現寫佇列和讀佇列的交換
            service.wakeup();
        }
    }
    // 非同步
    // Asynchronous flush
    else {
        if (!this.defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {
            flushCommitLogService.wakeup();
        } else {
            commitLogService.wakeup();
        }
    }
}

同步刷盤會創造一個刷盤請求,然後將請求放入處理寫刷盤請求的requestsWrite佇列,請求裡面封裝了CompletableFuture物件用來記錄刷盤結果,

利用CompletableFuturee的get方法同步等待獲取結果。flushStatus = flushOkFuture.get(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout(),TimeUnit.MILLISECONDS);

flushStatus為刷盤結果,預設等待5秒超時。

GroupCommitService為一個執行緒,用來定時處理requestsWrite佇列裡面的寫刷盤請求,進行刷盤;它的requestsWrite和requestsRead佇列進行了讀寫分離,

寫GroupCommitRequest請求到requestsWrite佇列,讀GroupCommitRequest請求從requestsRead讀取,讀取請求今夕寫盤操作。這兩個佇列,形成了化零為整,

將一個個請求,劃分為一批,處理一批的GroupCommitRequest請求,然後requestsWrite和requestsRead佇列進行交換,requestsRead作為寫佇列,

requestsWrite作為讀佇列,實現讀寫分離。從中使用CountDownLatch2來實現處理刷盤請求執行緒和提交刷盤請求之前的協調,通過控制hasNotified狀態來實現寫佇列和讀佇列的交換。

// 同步刷盤服務
class GroupCommitService extends FlushCommitLogService {
    // 兩個佇列,讀寫請求分離
    // 刷盤服務寫入請求佇列
    private volatile List<GroupCommitRequest> requestsWrite = new ArrayList<GroupCommitRequest>();
    // 刷盤服務讀取請求佇列
    private volatile List<GroupCommitRequest> requestsRead = new ArrayList<GroupCommitRequest>();
    // 將請求同步寫入requestsWrite
    public synchronized void putRequest(final GroupCommitRequest request) {
        synchronized (this.requestsWrite) {
            this.requestsWrite.add(request);
        }
        // 喚醒刷盤執行緒處理請求
        this.wakeup();
    }
    // 寫佇列和讀佇列交換
    private void swapRequests() {
        List<GroupCommitRequest> tmp = this.requestsWrite;
        this.requestsWrite = this.requestsRead;
        this.requestsRead = tmp;
    }

    private void doCommit() {
        // 上鎖讀請求佇列
        synchronized (this.requestsRead) {
            if (!this.requestsRead.isEmpty()) {
                // 每一個請求進行刷盤
                for (GroupCommitRequest req : this.requestsRead) {
                    // There may be a message in the next file, so a maximum of
                    // two times the flush
                    // 一個落盤請求,處理兩次,第一次為false,進行刷盤,一次刷盤的資料是多個offset,並不是只有當前這個offset的值,這個offset的值進行了刷盤,
這個請求的第二次刷盤,這個offset已經已經落盤了, // flushWhere這個值在flush方法已經更新變大,所以flushOK=true,跳出for迴圈,通知flushOKFuture已經完成。 boolean flushOK = false; for (int i = 0; i < 2 && !flushOK; i++) { // 是否已經刷過,false未刷,true已刷 flushOK = CommitLog.this.mappedFileQueue.getFlushedWhere() >= req.getNextOffset(); // false 刷盤 if (!flushOK) { //0程式碼立刻刷盤,不管快取中訊息有多少 CommitLog.this.mappedFileQueue.flush(0); } } // flushOK:true,返回ok,已經刷過盤了,不用再刷盤;false:刷盤中,返回超時 // 喚醒等待刷盤結果的執行緒 req.wakeupCustomer(flushOK ? PutMessageStatus.PUT_OK : PutMessageStatus.FLUSH_DISK_TIMEOUT); } // 更新checkpoint的刷盤commitlog的最後刷盤時間,但是隻寫寫到了checkpoint的記憶體ByteBuffer,並沒有刷盤 long storeTimestamp = CommitLog.this.mappedFileQueue.getStoreTimestamp(); if (storeTimestamp > 0) { CommitLog.this.defaultMessageStore.getStoreCheckpoint().setPhysicMsgTimestamp(storeTimestamp); } // 清空佇列 this.requestsRead.clear(); } else { // Because of individual messages is set to not sync flush, it // will come to this process // 因為個別的訊息不是同步刷盤的,所以它回到這裡進行處理 CommitLog.this.mappedFileQueue.flush(0); } } } public void run() { CommitLog.log.info(this.getServiceName() + " service started"); // 執行緒是否停止 while (!this.isStopped()) { try { // 設定hasNotified為false,未被通知,然後交換寫對佇列和讀佇列,重置waitPoint為(1),休息200ms,出事化為10ms,finally設定hasNotified為未被通知,
交換寫對佇列和讀佇列 this.waitForRunning(10); // 進行刷盤服務處理,一次處理一批請求,單個請求返回給等待刷盤服務結果的執行緒 this.doCommit(); } catch (Exception e) { CommitLog.log.warn(this.getServiceName() + " service has exception. ", e); } } // 處理非正常停機,sleep10ms,交換寫請求佇列和讀請求佇列,等待資料處理 // Under normal circumstances shutdown, wait for the arrival of the // request, and then flush try { Thread.sleep(10); } catch (InterruptedException e) { CommitLog.log.warn("GroupCommitService Exception, ", e); } synchronized (this) { this.swapRequests(); } // 進行請求處理 this.doCommit(); CommitLog.log.info(this.getServiceName() + " service end"); } @Override protected void onWaitEnd() { // 寫佇列和讀佇列交換 this.swapRequests(); } @Override public String getServiceName() { return GroupCommitService.class.getSimpleName(); } // 5 分鐘 @Override public long getJointime() { return 1000 * 60 * 5; } }
處理刷盤請求執行緒和提交刷盤請求之前的協調
# org.apache.rocketmq.common.ServiceThread
// 喚醒處理刷盤請求寫磁碟執行緒,處理刷盤請求執行緒和提交刷盤請求之前的協調,通過控制hasNotified狀態來實現寫佇列和讀佇列的交換
public void wakeup() {
    // hasNotified預設值是false,未被喚醒,這個操作之後喚醒了,處理刷盤請求
    if (hasNotified.compareAndSet(false, true)) {
        // waitPoint預設是1,然後其他執行緒處理
        waitPoint.countDown(); // notify
    }
}

/**
 * 設定hasNotified為false,未被通知,然後交換寫對佇列和讀佇列,重置waitPoint為(1),休息200ms,finally設定hasNotified為未被通知,交換寫對佇列和讀佇列
 * @param interval 200ms
 */
protected void waitForRunning(long interval) {
    // compareAndSet(except,update);如果真實值value==except,設定value值為update,返回true;如果真實值value !=except,真實值不變,返回false;
    // 如果hasNotified真實值為true,那麼設定真實值為false,返回true;hasNotified真實值為false,那就返回false,真實值不變
    // 如果已經通知了,那就狀態變為未通知,如果是同步刷盤任務,交換寫請求佇列和讀請求佇列
    if (hasNotified.compareAndSet(true, false)) {
        // 同步刷盤:寫佇列和讀佇列交換
        this.onWaitEnd();
        return;
    }
    // 重置countDownLatch物件,等待接受刷盤請求的執行緒寫入請求到requestsRead,寫完後,waitPoint.countDown,喚醒處理刷盤請求的執行緒,開始刷盤
    //entry to wait
    waitPoint.reset();

    try {
        // 等待interval毫秒
        waitPoint.await(interval, TimeUnit.MILLISECONDS);
    } catch (InterruptedException e) {
        log.error("Interrupted", e);
    } finally {
        // 設定是否通知為false
        hasNotified.set(false);
        this.onWaitEnd();
    }
}
// 等待這個方法的步驟完成。比如:同步刷盤:寫佇列和讀佇列交換
protected void onWaitEnd() {
}

非同步刷盤

非同步刷盤根據是否開啟TransientStorePool暫存池,來區分是否有commit操作。開啟TransientStorePool會將writerBuffer中的資料commit到FileChannel中(fileChannel.write(writerBuffer))

然後再將FileChannel中的資料通過flush操作(fileChannel.force())到磁碟中;
如果為開啟TransientStorePool,就不會有commit操作,直接flush(MappedByteBuffer.force())到磁碟中。

// 非同步刷盤
// Asynchronous flush
if (!this.defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {
    //執行flush操作
    flushCommitLogService.wakeup();
} else {
    //執行commit操作,然後喚醒執行flush操作
    commitLogService.wakeup();
}
CommitRealTimeService

定時將 transientStorePool 中的直接記憶體 ByteBuffer,提交到FileChannel中,然後喚醒刷盤操作。

// 定時將 transientStorePoll 中的直接記憶體 ByteBuffer,提交到FileChannel中
class CommitRealTimeService extends FlushCommitLogService {

    private long lastCommitTimestamp = 0;

    @Override
    public String getServiceName() {
        return CommitRealTimeService.class.getSimpleName();
    }

    @Override
    public void run() {
        CommitLog.log.info(this.getServiceName() + " service started");
        // 刷盤執行緒是否停止
        while (!this.isStopped()) {
            // writerBuffer寫資料到FileChannel時間間隔200ms
            int interval = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getCommitIntervalCommitLog();
            // writerBuffer寫資料到FileChannel頁數大小4
            int commitDataLeastPages = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getCommitCommitLogLeastPages();

            // writerBuffer寫資料到FileChannel跨度時間間隔200ms
            int commitDataThoroughInterval =
                CommitLog.this.defaultMessageStore.getMessageStoreConfig().getCommitCommitLogThoroughInterval();
            // 開始時間
            long begin = System.currentTimeMillis();
            // 觸發commit機制有兩種方式:1.commit時間超過了兩次commit時間間隔,然後只要有資料就進行提交 2.commit資料頁數大於預設設定的4頁
            // 本次commit時間>上次commit時間+兩次commit時間間隔,則進行commit,不用關心commit頁數的大小,設定commitDataLeastPages=0
            if (begin >= (this.lastCommitTimestamp + commitDataThoroughInterval)) {
                this.lastCommitTimestamp = begin;
                commitDataLeastPages = 0;
            }

            try {
                // result=false,表示提交了資料,多與上次提交的位置;表示此次有資料提交;result=true,表示沒有新的資料被提交
                boolean result = CommitLog.this.mappedFileQueue.commit(commitDataLeastPages);
                long end = System.currentTimeMillis();
                // result = false means some data committed.表示此次有資料提交,然後進行刷盤
                if (!result) {
                    this.lastCommitTimestamp = end; // result = false means some data committed.
                    //now wake up flush thread.
                    // 喚起刷盤執行緒,進行刷盤
                    flushCommitLogService.wakeup();
                }

                if (end - begin > 500) {
                    log.info("Commit data to file costs {} ms", end - begin);
                }
                // 暫停200ms,再執行
                this.waitForRunning(interval);
            } catch (Throwable e) {
                CommitLog.log.error(this.getServiceName() + " service has exception. ", e);
            }
        }

        boolean result = false;
        // 正常關機,迴圈10次,進行10次的有資料就提交的操作
        for (int i = 0; i < RETRY_TIMES_OVER && !result; i++) {
            result = CommitLog.this.mappedFileQueue.commit(0);
            CommitLog.log.info(this.getServiceName() + " service shutdown, retry " + (i + 1) + " times " + (result ? "OK" : "Not OK"));
        }
        CommitLog.log.info(this.getServiceName() + " service end");
    }
}
FlushRealTimeService

非同步刷盤服務

class FlushRealTimeService extends FlushCommitLogService {
    private long lastFlushTimestamp = 0;
    // 刷盤次數
    private long printTimes = 0;

    public void run() {
        CommitLog.log.info(this.getServiceName() + " service started");

        while (!this.isStopped()) {
            // 預設值為false,表示await方法等待,如果為true,表示使用Thread.sleep方法等待
            boolean flushCommitLogTimed = CommitLog.this.defaultMessageStore.getMessageStoreConfig().isFlushCommitLogTimed();
            // 刷盤任務時間間隔,多久刷一次盤500ms
            int interval = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushIntervalCommitLog();
            // 一次刷寫任務至少包含頁數,如果待刷寫資料不足,小於該引數配置的值,將忽略本次刷寫任務,預設4頁
            int flushPhysicQueueLeastPages = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushCommitLogLeastPages();
            // 兩次真實刷寫任務最大跨度,預設10s
            int flushPhysicQueueThoroughInterval =
                CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushCommitLogThoroughInterval();

            // 列印記錄日誌標誌
            boolean printFlushProgress = false;

            // Print flush progress
            long currentTimeMillis = System.currentTimeMillis();
            // 觸發刷盤機制有兩種方式:1.刷盤時間超過了兩次刷盤時間間隔,然後只要有資料就進行提交 2.commit資料頁數大於預設設定的4頁
            // 本次刷盤時間>上次刷盤時間+兩次刷盤時間間隔,則進行刷盤,不用關心刷盤頁數的大小,設定commitDataLeastPages=0
            if (currentTimeMillis >= (this.lastFlushTimestamp + flushPhysicQueueThoroughInterval)) {
                this.lastFlushTimestamp = currentTimeMillis;
                flushPhysicQueueLeastPages = 0;
                // 每間隔10次記錄一次刷盤日誌
                printFlushProgress = (printTimes++ % 10) == 0;
            }

            try {
                // 刷盤之前,進行執行緒sleep
                if (flushCommitLogTimed) {
                    Thread.sleep(interval);
                } else {
                    this.waitForRunning(interval);
                }
                // 列印記錄日誌
                if (printFlushProgress) {
                    this.printFlushProgress();
                }
                // 刷盤開始時間
                long begin = System.currentTimeMillis();
                // 刷盤
                CommitLog.this.mappedFileQueue.flush(flushPhysicQueueLeastPages);
                long storeTimestamp = CommitLog.this.mappedFileQueue.getStoreTimestamp();
                // 更新checkpoint最後刷盤時間
                if (storeTimestamp > 0) {
                    CommitLog.this.defaultMessageStore.getStoreCheckpoint().setPhysicMsgTimestamp(storeTimestamp);
                }
                long past = System.currentTimeMillis() - begin;
                if (past > 500) {
                    log.info("Flush data to disk costs {} ms", past);
                }
            } catch (Throwable e) {
                CommitLog.log.warn(this.getServiceName() + " service has exception. ", e);
                this.printFlushProgress();
            }
        }
        // while迴圈結束,正常關機,保證所有的資料刷寫到磁碟
        // Normal shutdown, to ensure that all the flush before exit
        boolean result = false;
        for (int i = 0; i < RETRY_TIMES_OVER && !result; i++) {
            result = CommitLog.this.mappedFileQueue.flush(0);
            CommitLog.log.info(this.getServiceName() + " service shutdown, retry " + (i + 1) + " times " + (result ? "OK" : "Not OK"));
        }
        // 列印日誌
        this.printFlushProgress();

        CommitLog.log.info(this.getServiceName() + " service end");
    }

    @Override
    public String getServiceName() {
        return FlushRealTimeService.class.getSimpleName();
    }

    private void printFlushProgress() {
        // CommitLog.log.info("how much disk fall behind memory, "
        // + CommitLog.this.mappedFileQueue.howMuchFallBehind());
    }

    @Override
    public long getJointime() {
        return 1000 * 60 * 5;
    }
}   

刷盤是否開啟TransientStorePool的區別

這裡講一下刷盤是否開啟TransientStorePool的區別。

image.png
不開啟TransientStorePool:

MappedByteBuffer是直接記憶體,它暫時儲存了message訊息,MappedFile.mapp()方法做好MappedByteBuffer物件直接記憶體和落盤檔案的對映關係,

然後flush()方法執行MappedByteBuffer.force():強制將ByteBuffer中的任何內容的改變寫入到磁碟檔案。

開啟TransientStorePool:

MappedFile的writerBuffer為直接開闢的記憶體,然後MappedFile的初始化操作,做好FileChannel和磁碟檔案的對映,commit()方法實質是執行fileChannel.write(writerBuffer),

將writerBuffer的資料寫入到FileChannel對映的磁碟檔案,flush操作執行FileChannel.force():將對映檔案中的資料強制重新整理到磁碟。

TransientStorePool的作用

TransientStorePool 相當於在記憶體層面做了讀寫分離,寫走記憶體磁碟,讀走pagecache,同時最大程度消除了page cache的鎖競爭,降低了毛刺。它還使用了鎖機制,

避免直接記憶體被交換到swap分割槽。



作者:93張先生
連結:https://www.jianshu.com/p/6ef2f03c0ff6
來源:簡書
著作權歸作者所有。商業轉載請聯絡作者獲得授權,非商業轉載請註明出處。 我是個雙魚座的小王子,沉浸在自己的程式碼世界裡,去探索這未知的世界,希望遇到更多的小夥伴一起前行!