1. 程式人生 > 其它 >RocketMQ-儲存機制-刷盤機制

RocketMQ-儲存機制-刷盤機制

RocketMQ-儲存機制-刷盤機制

在理解RocketMQ刷盤實現之前,先理解一下上圖展示的刷盤的2種實現的:

1)直接通過記憶體對映檔案,通過flush重新整理到磁碟

2)當非同步刷盤且啟用了對外記憶體池的時候,先write到writeBuffer,然後commit到Filechannel,最後flush到磁碟

另外輸盤的方式分為非同步刷盤 同步刷盤 非同步轉存刷盤方式。

初始化過程如下

if (FlushDiskType.SYNC_FLUSH == defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {
            
this.flushCommitLogService = new GroupCommitService(); //同步 } else { this.flushCommitLogService = new FlushRealTimeService(); // 非同步 } this.commitLogService = new CommitRealTimeService(); 非同步轉存

在commitlog的putMessage方法最後handleDiskFlush,處理了刷盤的操作.

public void handleDiskFlush(AppendMessageResult result, PutMessageResult putMessageResult, MessageExt messageExt) {
        
// Synchronization flush if (FlushDiskType.SYNC_FLUSH == this.defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) { final GroupCommitService service = (GroupCommitService) this.flushCommitLogService; if (messageExt.isWaitStoreMsgOK()) { GroupCommitRequest request
= new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes()); // 提交同步刷盤請求 service.putRequest(request); // 同步等待 boolean flushOK = request.waitForFlush(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout()); if (!flushOK) { log.error("do groupcommit, wait for flush failed, topic: " + messageExt.getTopic() + " tags: " + messageExt.getTags() + " client address: " + messageExt.getBornHostString()); putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_DISK_TIMEOUT); } } else { // 喚醒輸盤操作 service.wakeup(); } } // Asynchronous flush 非同步刷盤 else { if (!this.defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()) { flushCommitLogService.wakeup(); //非同步 } else { commitLogService.wakeup(); // 非同步轉存 } } }

同步刷盤GroupCommitService

呼叫putRequest之後,實際上,會放到一個寫容器中,如果當前不在處理,那就喚醒同步刷盤執行緒立即處理

public synchronized void putRequest(final GroupCommitRequest request) {
            synchronized (this.requestsWrite) {
                this.requestsWrite.add(request);
            }
            if (hasNotified.compareAndSet(false, true)) {
                waitPoint.countDown(); // notify
            }
        }

而此時的同步刷盤執行緒,如果正好檢測到有請求過來就會立即執行任務,如果處在等待狀態,則被喚醒,等待處理完,又把通知狀態設定為false。

protected void waitForRunning(long interval) {
        // 如果有刷盤請求  則立即返回
        if (hasNotified.compareAndSet(true, false)) {
            this.onWaitEnd();
            return;
        }

        //entry to wait
        waitPoint.reset();

        try {
            // 如果沒有輸盤請求  await  直到被喚醒
            // 喚醒之後處理do_commit 且當前hasNotified 已設定成true
            waitPoint.await(interval, TimeUnit.MILLISECONDS);
        } catch (InterruptedException e) {
            log.error("Interrupted", e);
        } finally {
            // 最後通知狀態設定成false
            hasNotified.set(false);
            this.onWaitEnd();
        }
    }

實際上具體的刷盤邏輯是在doCommit方法中。因為採用的是讀寫分離的方式,所以在每次執行刷盤邏輯之前,都會互動讀寫容器中的資料。

private void swapRequests() {
            List<GroupCommitRequest> tmp = this.requestsWrite;
            this.requestsWrite = this.requestsRead;
            this.requestsRead = tmp;
        }

對於刷盤有沒有成功的判斷是這樣的,每一次flush,都會記錄好flush的位置,如果發現當前已經flush的位置已經超過了請求flush的位置,那就說明已經重新整理成功,而這個過程執行重試2次。重新整理完成之後便通知使用者

        private void doCommit() {
            synchronized (this.requestsRead) {
                if (!this.requestsRead.isEmpty()) {
                    for (GroupCommitRequest req : this.requestsRead) {
                        // There may be a message in the next file, so a maximum of
                        // two times the flush
                        boolean flushOK = false;
                        // 確認有沒有刷盤成功
                        for (int i = 0; i < 2 && !flushOK; i++) {
                            flushOK = CommitLog.this.mappedFileQueue.getFlushedWhere() >= req.getNextOffset();

                            if (!flushOK) {
                                CommitLog.this.mappedFileQueue.flush(0);
                            }
                        }
                        // 通知請求客戶端,返回刷盤結果
                        req.wakeupCustomer(flushOK);
                    }

                    long storeTimestamp = CommitLog.this.mappedFileQueue.getStoreTimestamp();
                    if (storeTimestamp > 0) {
                        // 記錄checkoutpoint
                        CommitLog.this.defaultMessageStore.getStoreCheckpoint().setPhysicMsgTimestamp(storeTimestamp);
                    }

                    this.requestsRead.clear();
                } else {
                    // Because of individual messages is set to not sync flush, it
                    // will come to this process
                    CommitLog.this.mappedFileQueue.flush(0);
                }
            }
        }

在深入看一下CommitLog.this.mappedFileQueue.flush(0);中的mappedFile.flush(flushLeastPages);,實際上就是根據當前flushwhere的點位來找到對應的MappendFile,然後flush當前MapedFile中的writeBuffer或者mappedByteBuffer中的資料到自盤

    public int flush(final int flushLeastPages) {
        if (this.isAbleToFlush(flushLeastPages)) {
            if (this.hold()) {
                int value = getReadPosition();

                try {
                    //We only append data to fileChannel or mappedByteBuffer, never both.
                    if (writeBuffer != null || this.fileChannel.position() != 0) {
                        this.fileChannel.force(false);
                    } else {
                        this.mappedByteBuffer.force();
                    }
                } catch (Throwable e) {
                    log.error("Error occurred when force data to disk.", e);
                }

                this.flushedPosition.set(value);
                this.release();
            } else {
                log.warn("in flush, hold failed, flush offset = " + this.flushedPosition.get());
                this.flushedPosition.set(getReadPosition());
            }
        }
        return this.getFlushedPosition();
    }

非同步刷盤FlushRealTimeService

非同步刷盤分為定時非同步排程刷盤和實時排程刷盤。

 class FlushRealTimeService extends FlushCommitLogService {
        private long lastFlushTimestamp = 0;
        private long printTimes = 0;

        public void run() {
            CommitLog.log.info(this.getServiceName() + " service started");

            while (!this.isStopped()) {
                // 實時刷盤還是排程刷盤
                boolean flushCommitLogTimed = CommitLog.this.defaultMessageStore.getMessageStoreConfig().isFlushCommitLogTimed();

                // 非同步刷盤時間間隔  500ms
                int interval = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushIntervalCommitLog();
                // 每次重新整理多少的pagecache頁
                int flushPhysicQueueLeastPages = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushCommitLogLeastPages();
                // 10s
                int flushPhysicQueueThoroughInterval =
                    CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushCommitLogThoroughInterval();

                boolean printFlushProgress = false;

                // Print flush progress
                long currentTimeMillis = System.currentTimeMillis();
                if (currentTimeMillis >= (this.lastFlushTimestamp + flushPhysicQueueThoroughInterval)) {
                    this.lastFlushTimestamp = currentTimeMillis;
                    flushPhysicQueueLeastPages = 0;
                    printFlushProgress = (printTimes++ % 10) == 0;
                }

                try {
                    if (flushCommitLogTimed) {
                        Thread.sleep(interval);  // 非同步排程刷盤  還是  實時刷盤?
                    } else {
                        this.waitForRunning(interval);
                    }

                    if (printFlushProgress) {
                        this.printFlushProgress();
                    }

                    long begin = System.currentTimeMillis();
                    CommitLog.this.mappedFileQueue.flush(flushPhysicQueueLeastPages);
                    long storeTimestamp = CommitLog.this.mappedFileQueue.getStoreTimestamp();
                    if (storeTimestamp > 0) {
                        CommitLog.this.defaultMessageStore.getStoreCheckpoint().setPhysicMsgTimestamp(storeTimestamp);
                    }
                    long past = System.currentTimeMillis() - begin;
                    if (past > 500) {
                        log.info("Flush data to disk costs {} ms", past);
                    }
                } catch (Throwable e) {
                    CommitLog.log.warn(this.getServiceName() + " service has exception. ", e);
                    this.printFlushProgress();
                }
            }

            // Normal shutdown, to ensure that all the flush before exit
            boolean result = false;
            for (int i = 0; i < RETRY_TIMES_OVER && !result; i++) {
                result = CommitLog.this.mappedFileQueue.flush(0);
                CommitLog.log.info(this.getServiceName() + " service shutdown, retry " + (i + 1) + " times " + (result ? "OK" : "Not OK"));
            }

            this.printFlushProgress();

            CommitLog.log.info(this.getServiceName() + " service end");
        }

        @Override
        public String getServiceName() {
            return FlushRealTimeService.class.getSimpleName();
        }

        private void printFlushProgress() {
            // CommitLog.log.info("how much disk fall behind memory, "
            // + CommitLog.this.mappedFileQueue.howMuchFallBehind());
        }

        @Override
        public long getJointime() {
            return 1000 * 60 * 5;
        }
    }

這裡面會去判斷未flush的資料量有沒有超過flushLeastPages,處理的方式就是(flushOffset - writeOffset) / os_page_size >= flushLeastPages來決定是和否需要flush。

非同步轉存commit服務CommitRealTimeService

轉存服務就是為了把對外記憶體中的資料寫進filechannel。在CommitRealTimeService的run方法中:

boolean result = CommitLog.this.mappedFileQueue.commit(commitDataLeastPages);

    public boolean commit(final int commitLeastPages) {
        boolean result = true;
        // 獲取committedWhere對應的mappedFile
        MappedFile mappedFile = this.findMappedFileByOffset(this.committedWhere, false);
        if (mappedFile != null) {
            // commit
            int offset = mappedFile.commit(commitLeastPages);
            long where = mappedFile.getFileFromOffset() + offset;
            result = where == this.committedWhere;
            this.committedWhere = where;
        }

        return result;
    }
protected void commit0(final int commitLeastPages) {
        int writePos = this.wrotePosition.get();
        int lastCommittedPosition = this.committedPosition.get();

        if (writePos - this.committedPosition.get() > 0) {
            try {
                ByteBuffer byteBuffer = writeBuffer.slice();
                byteBuffer.position(lastCommittedPosition);
                byteBuffer.limit(writePos);
                this.fileChannel.position(lastCommittedPosition);
                // commit主要是讀取DM中的資料往檔案通道寫資料,然後交給非同步刷盤服務去flush
                this.fileChannel.write(byteBuffer);
                this.committedPosition.set(writePos);
            } catch (Throwable e) {
                log.error("Error occurred when commit data to FileChannel.", e);
            }
        }
    }

啟動非同步轉存服務,一定是啟用了對外記憶體池,且設定成非同步刷盤的方式。