RocketMQ-儲存機制-刷盤機制
RocketMQ-儲存機制-刷盤機制
在理解RocketMQ刷盤實現之前,先理解一下上圖展示的刷盤的2種實現的:
1)直接通過記憶體對映檔案,通過flush重新整理到磁碟
2)當非同步刷盤且啟用了對外記憶體池的時候,先write到writeBuffer,然後commit到Filechannel,最後flush到磁碟
另外輸盤的方式分為非同步刷盤 同步刷盤 非同步轉存刷盤方式。
初始化過程如下
if (FlushDiskType.SYNC_FLUSH == defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {this.flushCommitLogService = new GroupCommitService(); //同步 } else { this.flushCommitLogService = new FlushRealTimeService(); // 非同步 } this.commitLogService = new CommitRealTimeService(); 非同步轉存
在commitlog的putMessage方法最後handleDiskFlush,處理了刷盤的操作.
public void handleDiskFlush(AppendMessageResult result, PutMessageResult putMessageResult, MessageExt messageExt) {// Synchronization flush if (FlushDiskType.SYNC_FLUSH == this.defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) { final GroupCommitService service = (GroupCommitService) this.flushCommitLogService; if (messageExt.isWaitStoreMsgOK()) { GroupCommitRequest request= new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes()); // 提交同步刷盤請求 service.putRequest(request); // 同步等待 boolean flushOK = request.waitForFlush(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout()); if (!flushOK) { log.error("do groupcommit, wait for flush failed, topic: " + messageExt.getTopic() + " tags: " + messageExt.getTags() + " client address: " + messageExt.getBornHostString()); putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_DISK_TIMEOUT); } } else { // 喚醒輸盤操作 service.wakeup(); } } // Asynchronous flush 非同步刷盤 else { if (!this.defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()) { flushCommitLogService.wakeup(); //非同步 } else { commitLogService.wakeup(); // 非同步轉存 } } }
同步刷盤GroupCommitService
呼叫putRequest之後,實際上,會放到一個寫容器中,如果當前不在處理,那就喚醒同步刷盤執行緒立即處理
public synchronized void putRequest(final GroupCommitRequest request) { synchronized (this.requestsWrite) { this.requestsWrite.add(request); } if (hasNotified.compareAndSet(false, true)) { waitPoint.countDown(); // notify } }
而此時的同步刷盤執行緒,如果正好檢測到有請求過來就會立即執行任務,如果處在等待狀態,則被喚醒,等待處理完,又把通知狀態設定為false。
protected void waitForRunning(long interval) { // 如果有刷盤請求 則立即返回 if (hasNotified.compareAndSet(true, false)) { this.onWaitEnd(); return; } //entry to wait waitPoint.reset(); try { // 如果沒有輸盤請求 await 直到被喚醒 // 喚醒之後處理do_commit 且當前hasNotified 已設定成true waitPoint.await(interval, TimeUnit.MILLISECONDS); } catch (InterruptedException e) { log.error("Interrupted", e); } finally { // 最後通知狀態設定成false hasNotified.set(false); this.onWaitEnd(); } }
實際上具體的刷盤邏輯是在doCommit方法中。因為採用的是讀寫分離的方式,所以在每次執行刷盤邏輯之前,都會互動讀寫容器中的資料。
private void swapRequests() { List<GroupCommitRequest> tmp = this.requestsWrite; this.requestsWrite = this.requestsRead; this.requestsRead = tmp; }
對於刷盤有沒有成功的判斷是這樣的,每一次flush,都會記錄好flush的位置,如果發現當前已經flush的位置已經超過了請求flush的位置,那就說明已經重新整理成功,而這個過程執行重試2次。重新整理完成之後便通知使用者
private void doCommit() { synchronized (this.requestsRead) { if (!this.requestsRead.isEmpty()) { for (GroupCommitRequest req : this.requestsRead) { // There may be a message in the next file, so a maximum of // two times the flush boolean flushOK = false; // 確認有沒有刷盤成功 for (int i = 0; i < 2 && !flushOK; i++) { flushOK = CommitLog.this.mappedFileQueue.getFlushedWhere() >= req.getNextOffset(); if (!flushOK) { CommitLog.this.mappedFileQueue.flush(0); } } // 通知請求客戶端,返回刷盤結果 req.wakeupCustomer(flushOK); } long storeTimestamp = CommitLog.this.mappedFileQueue.getStoreTimestamp(); if (storeTimestamp > 0) { // 記錄checkoutpoint CommitLog.this.defaultMessageStore.getStoreCheckpoint().setPhysicMsgTimestamp(storeTimestamp); } this.requestsRead.clear(); } else { // Because of individual messages is set to not sync flush, it // will come to this process CommitLog.this.mappedFileQueue.flush(0); } } }
在深入看一下CommitLog.this.mappedFileQueue.flush(0);中的mappedFile.flush(flushLeastPages);,實際上就是根據當前flushwhere的點位來找到對應的MappendFile,然後flush當前MapedFile中的writeBuffer或者mappedByteBuffer中的資料到自盤
public int flush(final int flushLeastPages) { if (this.isAbleToFlush(flushLeastPages)) { if (this.hold()) { int value = getReadPosition(); try { //We only append data to fileChannel or mappedByteBuffer, never both. if (writeBuffer != null || this.fileChannel.position() != 0) { this.fileChannel.force(false); } else { this.mappedByteBuffer.force(); } } catch (Throwable e) { log.error("Error occurred when force data to disk.", e); } this.flushedPosition.set(value); this.release(); } else { log.warn("in flush, hold failed, flush offset = " + this.flushedPosition.get()); this.flushedPosition.set(getReadPosition()); } } return this.getFlushedPosition(); }
非同步刷盤FlushRealTimeService
非同步刷盤分為定時非同步排程刷盤和實時排程刷盤。
class FlushRealTimeService extends FlushCommitLogService { private long lastFlushTimestamp = 0; private long printTimes = 0; public void run() { CommitLog.log.info(this.getServiceName() + " service started"); while (!this.isStopped()) { // 實時刷盤還是排程刷盤 boolean flushCommitLogTimed = CommitLog.this.defaultMessageStore.getMessageStoreConfig().isFlushCommitLogTimed(); // 非同步刷盤時間間隔 500ms int interval = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushIntervalCommitLog(); // 每次重新整理多少的pagecache頁 int flushPhysicQueueLeastPages = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushCommitLogLeastPages(); // 10s int flushPhysicQueueThoroughInterval = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushCommitLogThoroughInterval(); boolean printFlushProgress = false; // Print flush progress long currentTimeMillis = System.currentTimeMillis(); if (currentTimeMillis >= (this.lastFlushTimestamp + flushPhysicQueueThoroughInterval)) { this.lastFlushTimestamp = currentTimeMillis; flushPhysicQueueLeastPages = 0; printFlushProgress = (printTimes++ % 10) == 0; } try { if (flushCommitLogTimed) { Thread.sleep(interval); // 非同步排程刷盤 還是 實時刷盤? } else { this.waitForRunning(interval); } if (printFlushProgress) { this.printFlushProgress(); } long begin = System.currentTimeMillis(); CommitLog.this.mappedFileQueue.flush(flushPhysicQueueLeastPages); long storeTimestamp = CommitLog.this.mappedFileQueue.getStoreTimestamp(); if (storeTimestamp > 0) { CommitLog.this.defaultMessageStore.getStoreCheckpoint().setPhysicMsgTimestamp(storeTimestamp); } long past = System.currentTimeMillis() - begin; if (past > 500) { log.info("Flush data to disk costs {} ms", past); } } catch (Throwable e) { CommitLog.log.warn(this.getServiceName() + " service has exception. ", e); this.printFlushProgress(); } } // Normal shutdown, to ensure that all the flush before exit boolean result = false; for (int i = 0; i < RETRY_TIMES_OVER && !result; i++) { result = CommitLog.this.mappedFileQueue.flush(0); CommitLog.log.info(this.getServiceName() + " service shutdown, retry " + (i + 1) + " times " + (result ? "OK" : "Not OK")); } this.printFlushProgress(); CommitLog.log.info(this.getServiceName() + " service end"); } @Override public String getServiceName() { return FlushRealTimeService.class.getSimpleName(); } private void printFlushProgress() { // CommitLog.log.info("how much disk fall behind memory, " // + CommitLog.this.mappedFileQueue.howMuchFallBehind()); } @Override public long getJointime() { return 1000 * 60 * 5; } }
這裡面會去判斷未flush的資料量有沒有超過flushLeastPages,處理的方式就是(flushOffset - writeOffset) / os_page_size >= flushLeastPages來決定是和否需要flush。
非同步轉存commit服務CommitRealTimeService
轉存服務就是為了把對外記憶體中的資料寫進filechannel。在CommitRealTimeService的run方法中:
boolean result = CommitLog.this.mappedFileQueue.commit(commitDataLeastPages);
public boolean commit(final int commitLeastPages) { boolean result = true; // 獲取committedWhere對應的mappedFile MappedFile mappedFile = this.findMappedFileByOffset(this.committedWhere, false); if (mappedFile != null) { // commit int offset = mappedFile.commit(commitLeastPages); long where = mappedFile.getFileFromOffset() + offset; result = where == this.committedWhere; this.committedWhere = where; } return result; }
protected void commit0(final int commitLeastPages) { int writePos = this.wrotePosition.get(); int lastCommittedPosition = this.committedPosition.get(); if (writePos - this.committedPosition.get() > 0) { try { ByteBuffer byteBuffer = writeBuffer.slice(); byteBuffer.position(lastCommittedPosition); byteBuffer.limit(writePos); this.fileChannel.position(lastCommittedPosition); // commit主要是讀取DM中的資料往檔案通道寫資料,然後交給非同步刷盤服務去flush this.fileChannel.write(byteBuffer); this.committedPosition.set(writePos); } catch (Throwable e) { log.error("Error occurred when commit data to FileChannel.", e); } } }
啟動非同步轉存服務,一定是啟用了對外記憶體池,且設定成非同步刷盤的方式。