RocketMQ原始碼分析----刷盤的實現
注:可以先了解一下記憶體對映,然後再看RocketMq的刷盤,會更容易理解
Broker啟動的時候,會呼叫CommitLog的start方法,然後再啟動flushCommitLogService執行緒
在CommitLog的構造方法中,會判斷刷盤的型別
同步刷盤使用GroupCommitService,非同步刷盤使用FlushRealTimeService,預設是使用非同步刷盤public CommitLog(final DefaultMessageStore defaultMessageStore) { this.mapedFileQueue = new MapedFileQueue(defaultMessageStore.getMessageStoreConfig().getStorePathCommitLog(), defaultMessageStore.getMessageStoreConfig().getMapedFileSizeCommitLog(), defaultMessageStore.getAllocateMapedFileService()); this.defaultMessageStore = defaultMessageStore; if (FlushDiskType.SYNC_FLUSH == defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) { this.flushCommitLogService = new GroupCommitService(); } else { this.flushCommitLogService = new FlushRealTimeService(); } this.appendMessageCallback = new DefaultAppendMessageCallback(defaultMessageStore.getMessageStoreConfig().getMaxMessageSize()); }
同步和非同步的區別在於,broker的處理Producer請求的時候,如果是同步刷盤,那麼會進行刷盤後才返回給Producer傳送成功,而非同步刷盤則是喚醒刷盤執行緒後就返回
非同步刷盤
非同步刷盤是FlushRealTimeService,其run方法有個while迴圈,只要broker不關閉就一直迴圈下去
public void run() { while (!this.isStoped()) { boolean flushCommitLogTimed = CommitLog.this.defaultMessageStore.getMessageStoreConfig().isFlushCommitLogTimed();//是否定時刷盤,預設為實時刷盤 //刷盤間隔,預設1秒 int interval = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushIntervalCommitLog(); int flushPhysicQueueLeastPages = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushCommitLogLeastPages();//刷CommitLog,至少刷幾個PAGE //刷CommitLog,徹底刷盤間隔時間 int flushPhysicQueueThoroughInterval = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushCommitLogThoroughInterval(); boolean printFlushProgress = false; // Print flush progress long currentTimeMillis = System.currentTimeMillis(); if (currentTimeMillis >= (this.lastFlushTimestamp + flushPhysicQueueThoroughInterval)) {//每flushPhysicQueueThoroughInterva秒徹底刷盤一次
this.lastFlushTimestamp = currentTimeMillis; flushPhysicQueueLeastPages = 0;//這個引數為0後面會說到 printFlushProgress = ((printTimes++ % 10) == 0); } try { if (flushCommitLogTimed) {//定時刷 Thread.sleep(interval); } else {//實時刷 this.waitForRunning(interval); } if (printFlushProgress) { this.printFlushProgress();//空方法 } CommitLog.this.mapedFileQueue.commit(flushPhysicQueueLeastPages); long storeTimestamp = CommitLog.this.mapedFileQueue.getStoreTimestamp(); if (storeTimestamp > 0) { CommitLog.this.defaultMessageStore.getStoreCheckpoint().setPhysicMsgTimestamp(storeTimestamp); } } catch (Exception e) { CommitLog.log.warn(this.getServiceName() + " service has exception. ", e); this.printFlushProgress(); } } // Normal shutdown, to ensure that all the flush before exit boolean result = false; for (int i = 0; i < RetryTimesOver && !result; i++) { result = CommitLog.this.mapedFileQueue.commit(0); CommitLog.log.info(this.getServiceName() + " service shutdown, retry " + (i + 1) + " times " + (result ? "OK" : "Not OK")); } }
首先獲取配置:刷盤間隔,刷盤時至少刷幾個page,徹底刷盤間隔時間等,然後真正刷盤的地方是CommitLog.this.mapedFileQueue.commit(flushPhysicQueueLeastPages);
public boolean commit(final int flushLeastPages) {
boolean result = true;
MapedFile mapedFile = this.findMapedFileByOffset(this.committedWhere, true);
if (mapedFile != null) {
long tmpTimeStamp = mapedFile.getStoreTimestamp();
int offset = mapedFile.commit(flushLeastPages);
long where = mapedFile.getFileFromOffset() + offset;
result = (where == this.committedWhere);
this.committedWhere = where;
if (0 == flushLeastPages) {
this.storeTimestamp = tmpTimeStamp;
}
}
return result;
}
this.findMapedFileByOffset(this.committedWhere, true);是根據對應位移獲取對應的檔案
然後呼叫commit方法進行刷盤
public int commit(final int flushLeastPages) {
if (this.isAbleToFlush(flushLeastPages)) {
if (this.hold()) {
int value = this.wrotePostion.get();
this.mappedByteBuffer.force();
this.committedPosition.set(value);
this.release();
} else {
log.warn("in commit, hold failed, commit offset = " + this.committedPosition.get());
this.committedPosition.set(this.wrotePostion.get());
}
}
return this.getCommittedPosition();
}
然後還要呼叫isAbleToFlush方法判斷是否可以刷盤,真正刷盤是ByteBuffer的force方法
private boolean isAbleToFlush(final int flushLeastPages) {
int flush = this.committedPosition.get();//當前刷盤刷到的位置
int write = this.wrotePostion.get();//當前檔案寫到的位置
// 如果當前檔案已經寫滿,應該立刻刷盤
if (this.isFull()) {
return true;
}
// 只有未刷盤資料滿足指定page數目才刷盤
if (flushLeastPages > 0) {
return ((write / OS_PAGE_SIZE) - (flush / OS_PAGE_SIZE)) >= flushLeastPages;
}
return write > flush;
}
第二個if的地方,如果當前寫的位置和上一次刷盤的位置之間相差是flushLeastPages個頁以上,才可以進行刷盤,所以即使是實時的刷盤也是要到達指定的頁數後才會進行刷盤
當flushLeastPages等於0的情況,只要寫的位置比上次刷盤的位置大就行了,而flushLeastPages是在FlushRealTimeService的run方法中設定的
if (currentTimeMillis >= (this.lastFlushTimestamp + flushPhysicQueueThoroughInterval)) {//每flushPhysicQueueThoroughInterva秒徹底刷盤一次
this.lastFlushTimestamp = currentTimeMillis;
flushPhysicQueueLeastPages = 0;
printFlushProgress = ((printTimes++ % 10) == 0);
}
當到達徹底刷盤的時間後,就講flushLeastPages設定為0
當這個引數為0的時候,那麼只要有資料就進行刷盤操作
當這個方法返回true的時候就將ByteBuffer裡的資料刷到檔案中
綜上:broker啟動的時候就會啟動一個執行緒,去持續的進行刷盤操作
在處理broker傳送請求的時候,有一個判斷
// Synchronization flush
if (FlushDiskType.SYNC_FLUSH == this.defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {
GroupCommitService service = (GroupCommitService) this.flushCommitLogService;
if (msg.isWaitStoreMsgOK()) {//是否等待伺服器將訊息儲存完畢再返回
request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes());
service.putRequest(request);
boolean flushOK = request.waitForFlush(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout());
if (!flushOK) {
log.error("do groupcommit, wait for flush failed, topic: " + msg.getTopic() + " tags: " + msg.getTags()
+ " client address: " + msg.getBornHostString());
putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_DISK_TIMEOUT);
}
} else {
service.wakeup();
}
}
// Asynchronous flush
else {
this.flushCommitLogService.wakeup();
}
wakeup方法如下
public void wakeup() {
synchronized (this) {
if (!this.hasNotified) {
this.hasNotified = true;
this.notify();
}
}
}
protected void waitForRunning(long interval) {
synchronized (this) {
if (this.hasNotified) {
this.hasNotified = false;
this.onWaitEnd();
return;
}
try {
this.wait(interval);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
this.hasNotified = false;
this.onWaitEnd();
}
}
}
將標誌位設定為true,然後run的迴圈中,有行程式碼this.waitForRunning(interval);就是等待一段時間,然後再去刷盤,而呼叫了wakeup方法,就不用等待,直接就返回刷盤
另外,run方法的迴圈外有一段程式碼
// Normal shutdown, to ensure that all the flush before exit
boolean result = false;
for (int i = 0; i < RetryTimesOver && !result; i++) {
result = CommitLog.this.mapedFileQueue.commit(0);
CommitLog.log.info(this.getServiceName() + " service shutdown, retry " + (i + 1) + " times " + (result ? "OK" : "Not OK"));
}
當出了迴圈,證明broker已經停止了,那麼需要將所有的資料進行刷盤。而且commit傳入的引數為0,那麼在判斷是否刷盤的時候只要寫的位置比上次刷盤的位置大就行了
總結:非同步刷盤情況下,broker會開啟一個執行緒,等待一段時間,然後判斷刷盤條件是否符合,若符合就進行刷盤。在迴圈中會重複這個操作。在傳送訊息的時候只會進行一個喚醒操作,然後等待的執行緒馬上返回進行判斷刷盤。
在broker停止只會還會將所有的資料進行刷盤
同步刷盤
非同步刷盤使用的是FlushRealTimeService,而同步刷盤使用的是GroupCommitService
然後傳送訊息的時候,有個判斷
// Synchronization flush
if (FlushDiskType.SYNC_FLUSH == this.defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {
GroupCommitService service = (GroupCommitService) this.flushCommitLogService;
if (msg.isWaitStoreMsgOK()) {//是否等待伺服器將訊息儲存完畢再返回
request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes());
service.putRequest(request);
boolean flushOK = request.waitForFlush(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout());
if (!flushOK) {
log.error("do groupcommit, wait for flush failed, topic: " + msg.getTopic() + " tags: " + msg.getTags()
+ " client address: " + msg.getBornHostString());
putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_DISK_TIMEOUT);
}
} else {
service.wakeup();
}
}
首先建立一個GroupCommitRequest物件,引數為刷盤後文件的位置,即當前寫到的位置+要刷盤的位元組數
接著呼叫putRequest方法
public void putRequest(final GroupCommitRequest request) {
synchronized (this) {
this.requestsWrite.add(request);
if (!this.hasNotified) {
this.hasNotified = true;
this.notify();
}
}
}
把request放到list中,然後喚醒執行緒(這裡和非同步刷盤一樣)
request.waitForFlush方法使用CountDownLatch來阻塞當前執行緒,直到超時或者刷盤完成(GroupCommitService中呼叫countDown()方法)
和非同步刷盤一樣,一個while迴圈中執行刷盤操作,當broker正常停止會把為刷盤的資料進行刷盤,如下
public void run() {
CommitLog.log.info(this.getServiceName() + " service started");
while (!this.isStoped()) {
try {
this.waitForRunning(0);
this.doCommit();
} catch (Exception e) {
CommitLog.log.warn(this.getServiceName() + " service has exception. ", e);
}
}
// Under normal circumstances shutdown, wait for the arrival of the
// request, and then flush
try {
Thread.sleep(10);
} catch (InterruptedException e) {
CommitLog.log.warn("GroupCommitService Exception, ", e);
}
synchronized (this) {
this.swapRequests();
}
this.doCommit();
CommitLog.log.info(this.getServiceName() + " service end");
}
刷盤的真正邏輯在CommitLog類中的doCommit方法
private void doCommit() {
if (!this.requestsRead.isEmpty()) {
for (GroupCommitRequest req : this.requestsRead) {
// There may be a message in the next file, so a maximum of
// two times the flush
boolean flushOK = false;
for (int i = 0; (i < 2) && !flushOK; i++) {
flushOK = (CommitLog.this.mapedFileQueue.getCommittedWhere() >= req.getNextOffset());//上一次刷盤的位置是否小於這個刷盤後的位置
if (!flushOK) {
CommitLog.this.mapedFileQueue.commit(0);//刷盤
}
}
//CountDownLatch計數器減1,putMessage中的request.waitForFlush繼續執行
req.wakeupCustomer(flushOK);
}
long storeTimestamp = CommitLog.this.mapedFileQueue.getStoreTimestamp();
if (storeTimestamp > 0) {
CommitLog.this.defaultMessageStore.getStoreCheckpoint().setPhysicMsgTimestamp(storeTimestamp);
}
this.requestsRead.clear();
} else {
// Because of individual messages is set to not sync flush, it
// will come to this process
CommitLog.this.mapedFileQueue.commit(0);
}
}
邏輯相對簡單,先判斷位置是否正確,即刷盤後的位置要大於上次刷盤後的位置,然後和非同步刷盤一樣,使用commit方法進行刷盤,看了非同步刷盤,就知道了引數為0,則只判斷位置,不管“至少要刷多少個頁”的這個配置,即有多少刷多少
刷完之後,會喚醒剛剛阻塞的執行緒
總結:
傳送訊息的時候,組裝GroupCommitRequest物件(儲存了刷盤位置資訊),喚醒同步刷盤執行緒GroupCommitService,然後使用CountDownLatch阻塞當前執行緒直至超時或刷盤完成,然後正在的刷盤操作在CommitLog的doCommit方法中