RocketMQ刷盤機制
概覽
RocketMQ的儲存讀寫是基於JDK NIO的記憶體對映機制的,訊息儲存時首先將訊息追加到記憶體中。在根據不同的刷盤策略在不同的時間進行刷盤
。如果是同步刷盤,訊息追加到記憶體後,將同步呼叫MappedByteBuffer的force()方法,同步等待刷盤結果,進行刷盤結果返回。如果是非同步刷盤,
在訊息追加到記憶體後立刻,不等待刷盤結果立刻返回儲存成功結果給訊息傳送端。RocketMQ使用一個單獨的執行緒按照一個設定的頻率執行刷盤操作。
通過在broker配置檔案中配置flushDiskType來設定刷盤方式,ASYNC_FLUSH(非同步刷盤)、SYNC_FLUSH(同步刷盤)。預設為非同步刷盤。
本次以Commitlog檔案刷盤機制為例來講解刷盤機制。Consumequeue、IndexFile刷盤原理和Commitlog一直。索引檔案的刷盤機制並不是採取定時刷盤機制,
而是每更新一次索引檔案就會將上一次的改動刷寫到磁碟。
刷盤服務是將commitlog、consumequeue兩者中的MappedFile檔案中的MappedByteBuffer或者FileChannel中的記憶體中的資料,刷寫到磁碟。
還有將IndexFile中的MappedByteBuffer(this.mappedByteBuffer = this.mappedFile.getMappedByteBuffer())中記憶體的資料刷寫到磁碟。
刷盤服務的入口
刷盤服務的入口是CommitLog類物件,FlushCommitLogService是刷盤服務物件,如果是同步刷盤它被賦值為GroupCommitService,
如果是非同步刷盤它被賦值為FlushRealTimeService;還有一個FlushCommitLogService的commitLogService物件,這個是將 TransientStorePoll 中的直接記憶體ByteBuffer,
寫到FileChannel對映的磁碟檔案中的服務。
// 非同步、同步刷盤服務初始化
if (FlushDiskType.SYNC_FLUSH == defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {
// 同步刷盤服務為 GroupCommitService
this.flushCommitLogService = new GroupCommitService();
} else {
// 非同步刷盤服務為 FlushRealTimeService
this.flushCommitLogService = new FlushRealTimeService();
}
// 定時將 transientStorePoll 中的直接記憶體 ByteBuffer,提交到記憶體對映 MappedByteBuffer 中
this.commitLogService = new CommitRealTimeService();
刷盤方法呼叫入口
putMessage()方法,將訊息寫入記憶體的方式不同,呼叫的刷盤方式也不同。如果是asyncPutMessage()非同步將訊息寫入記憶體,submitFlushRequest()方法是刷盤入口。
如果是putMessage()同步將訊息寫入記憶體,handleDiskFlush()方法是刷盤入口。handleDiskFlush()和submitFlushRequest()都包含有同步刷盤和非同步刷盤的方法。
// 非同步的方式存放訊息
public CompletableFuture<PutMessageResult> asyncPutMessage(final MessageExtBrokerInner msg) {
// 非同步儲存訊息,提交刷盤請求
CompletableFuture<PutMessageStatus> flushResultFuture = submitFlushRequest(result, putMessageResult, msg);
CompletableFuture<PutMessageStatus> replicaResultFuture = submitReplicaRequest(result, putMessageResult, msg);
// 根據刷盤結果副本結果,返回存放訊息的結果
return flushResultFuture.thenCombine(replicaResultFuture, (flushStatus, replicaStatus) -> {
if (flushStatus != PutMessageStatus.PUT_OK) {
putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_DISK_TIMEOUT);
}
if (replicaStatus != PutMessageStatus.PUT_OK) {
putMessageResult.setPutMessageStatus(replicaStatus);
}
return putMessageResult;
});
}
// 同步方式存放訊息
public PutMessageResult putMessage(final MessageExtBrokerInner msg) {
// handle 硬碟重新整理
handleDiskFlush(result, putMessageResult, msg);
// handle 高可用
handleHA(result, putMessageResult, msg);
// 返回儲存訊息的結果
return putMessageResult;
}
同步刷盤
一條訊息呼叫一次刷盤服務,等待刷盤結果返回,然後再將結果返回;才能處理下一條刷盤訊息。以handleDiskFlush()方法來介紹同步刷盤和非同步刷盤,
這裡是區分刷盤方式的分水嶺。
/**
* 一條訊息進行刷盤
* @param result 擴充套件到記憶體ByteBuffer的結果
* @param putMessageResult 放入ByteBuffer這個過程的結果
* @param messageExt 存放的訊息
*/
public void handleDiskFlush(AppendMessageResult result, PutMessageResult putMessageResult, MessageExt messageExt) {
// Synchronization flush 同步
if (FlushDiskType.SYNC_FLUSH == this.defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {
final GroupCommitService service = (GroupCommitService) this.flushCommitLogService;
// 是否等待伺服器將這一條訊息儲存完畢再返回(等待刷盤完成),還是直接處理其他寫佇列requestsWrite裡面的請求
if (messageExt.isWaitStoreMsgOK()) {
//刷盤請求
GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes());
//放入寫請求佇列
service.putRequest(request);
// 同步等待獲取刷盤結果
CompletableFuture<PutMessageStatus> flushOkFuture = request.future();
PutMessageStatus flushStatus = null;
try {
// 5秒超市等待刷盤結果
flushStatus = flushOkFuture.get(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout(),
TimeUnit.MILLISECONDS);
} catch (InterruptedException | ExecutionException | TimeoutException e) {
//flushOK=false;
}
// 刷盤失敗,更新存放訊息結果超時
if (flushStatus != PutMessageStatus.PUT_OK) {
log.error("do groupcommit, wait for flush failed, topic: " + messageExt.getTopic() + " tags: " + messageExt.getTags()
+ " client address: " + messageExt.getBornHostString());
putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_DISK_TIMEOUT);
}
} else {
// 喚醒處理刷盤請求寫磁碟執行緒,處理刷盤請求執行緒和提交刷盤請求之前的協調,通過CountDownLatch(1)操作,通過控制hasNotified狀態來實現寫佇列和讀佇列的交換
service.wakeup();
}
}
// 非同步
// Asynchronous flush
else {
if (!this.defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {
flushCommitLogService.wakeup();
} else {
commitLogService.wakeup();
}
}
}
同步刷盤會創造一個刷盤請求,然後將請求放入處理寫刷盤請求的requestsWrite佇列,請求裡面封裝了CompletableFuture物件用來記錄刷盤結果,
利用CompletableFuturee的get方法同步等待獲取結果。flushStatus = flushOkFuture.get(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout(),TimeUnit.MILLISECONDS);
flushStatus為刷盤結果,預設等待5秒超時。
GroupCommitService為一個執行緒,用來定時處理requestsWrite佇列裡面的寫刷盤請求,進行刷盤;它的requestsWrite和requestsRead佇列進行了讀寫分離,
寫GroupCommitRequest請求到requestsWrite佇列,讀GroupCommitRequest請求從requestsRead讀取,讀取請求今夕寫盤操作。這兩個佇列,形成了化零為整,
將一個個請求,劃分為一批,處理一批的GroupCommitRequest請求,然後requestsWrite和requestsRead佇列進行交換,requestsRead作為寫佇列,
requestsWrite作為讀佇列,實現讀寫分離。從中使用CountDownLatch2來實現處理刷盤請求執行緒和提交刷盤請求之前的協調,通過控制hasNotified狀態來實現寫佇列和讀佇列的交換。
// 同步刷盤服務
class GroupCommitService extends FlushCommitLogService {
// 兩個佇列,讀寫請求分離
// 刷盤服務寫入請求佇列
private volatile List<GroupCommitRequest> requestsWrite = new ArrayList<GroupCommitRequest>();
// 刷盤服務讀取請求佇列
private volatile List<GroupCommitRequest> requestsRead = new ArrayList<GroupCommitRequest>();
// 將請求同步寫入requestsWrite
public synchronized void putRequest(final GroupCommitRequest request) {
synchronized (this.requestsWrite) {
this.requestsWrite.add(request);
}
// 喚醒刷盤執行緒處理請求
this.wakeup();
}
// 寫佇列和讀佇列交換
private void swapRequests() {
List<GroupCommitRequest> tmp = this.requestsWrite;
this.requestsWrite = this.requestsRead;
this.requestsRead = tmp;
}
private void doCommit() {
// 上鎖讀請求佇列
synchronized (this.requestsRead) {
if (!this.requestsRead.isEmpty()) {
// 每一個請求進行刷盤
for (GroupCommitRequest req : this.requestsRead) {
// There may be a message in the next file, so a maximum of
// two times the flush
// 一個落盤請求,處理兩次,第一次為false,進行刷盤,一次刷盤的資料是多個offset,並不是只有當前這個offset的值,這個offset的值進行了刷盤,
這個請求的第二次刷盤,這個offset已經已經落盤了,
// flushWhere這個值在flush方法已經更新變大,所以flushOK=true,跳出for迴圈,通知flushOKFuture已經完成。
boolean flushOK = false;
for (int i = 0; i < 2 && !flushOK; i++) {
// 是否已經刷過,false未刷,true已刷
flushOK = CommitLog.this.mappedFileQueue.getFlushedWhere() >= req.getNextOffset();
// false 刷盤
if (!flushOK) {
//0程式碼立刻刷盤,不管快取中訊息有多少
CommitLog.this.mappedFileQueue.flush(0);
}
}
// flushOK:true,返回ok,已經刷過盤了,不用再刷盤;false:刷盤中,返回超時
// 喚醒等待刷盤結果的執行緒
req.wakeupCustomer(flushOK ? PutMessageStatus.PUT_OK : PutMessageStatus.FLUSH_DISK_TIMEOUT);
}
// 更新checkpoint的刷盤commitlog的最後刷盤時間,但是隻寫寫到了checkpoint的記憶體ByteBuffer,並沒有刷盤
long storeTimestamp = CommitLog.this.mappedFileQueue.getStoreTimestamp();
if (storeTimestamp > 0) {
CommitLog.this.defaultMessageStore.getStoreCheckpoint().setPhysicMsgTimestamp(storeTimestamp);
}
// 清空佇列
this.requestsRead.clear();
} else {
// Because of individual messages is set to not sync flush, it
// will come to this process
// 因為個別的訊息不是同步刷盤的,所以它回到這裡進行處理
CommitLog.this.mappedFileQueue.flush(0);
}
}
}
public void run() {
CommitLog.log.info(this.getServiceName() + " service started");
// 執行緒是否停止
while (!this