RocketMQ是如何判斷flushOK,及4.2版本所出現的bug
RocketMQ
版本:rocketmq-4.2.0
bug所表現形式:在同步刷盤時,生產訊息,返回SendResult的SendStatus為FLUSH_DISK_TIMEOUT,而且是在傳送訊息總量大概mapedFileSizeCommitLog(預設配置1G)的時候出現,每次達到mapedFileSizeCommitLog大小左右的時候都會出現FLUSH_DISK_TIMEOUT。而其餘時間並沒有出現狀態,總是如此這顯然是有問題的。
RocketMQ是如何判斷flushOK?
刷盤:CommitLog.this.mappedFileQueue.flush(0)
原理:根據刷盤起始點【CommitLog.this.mappedFileQueue.getFlushedWhere()】和下次刷盤點【req.getNextOffset()】的比較來判斷是否成功刷入磁碟。由於一條訊息可能被兩個分片所儲存,故迴圈次數為2。
程式碼:
private void doCommit() { synchronized (this.requestsRead) { if (!this.requestsRead.isEmpty()) { for (GroupCommitRequest req : this.requestsRead) { // There may be a message in the next file, so a maximum of // two times the flush boolean flushOK = false; for (int i = 0; i < 2 && !flushOK; i++) { flushOK = CommitLog.this.mappedFileQueue.getFlushedWhere() >= req.getNextOffset(); if (!flushOK) { CommitLog.this.mappedFileQueue.flush(0); } } req.wakeupCustomer(flushOK); } long storeTimestamp = CommitLog.this.mappedFileQueue.getStoreTimestamp(); if (storeTimestamp > 0) { CommitLog.this.defaultMessageStore.getStoreCheckpoint().setPhysicMsgTimestamp(storeTimestamp); } this.requestsRead.clear(); } else { // Because of individual messages is set to not sync flush, it // will come to this process CommitLog.this.mappedFileQueue.flush(0); } } } |
先講正常流程:進入for迴圈,第一次:由於刷盤起始點【CommitLog.this.mappedFileQueue.getFlushedWhere()】小於下次刷盤點【req.getNextOffset()】,故flushOK為false,執行刷盤操作。第二次:由於刷盤成功,刷盤起始點【CommitLog.this.mappedFileQueue.getFlushedWhere()】等於下次刷盤點【req.getNextOffset()】,flushOK為true。結束迴圈。
bug出現了:若一條訊息儲存在兩個分片時,第一次flushOK為false,刷盤之後刷盤起始點【CommitLog.this.mappedFileQueue.getFlushedWhere()】還是小於下次刷盤點【req.getNextOffset()】,故開始第二次刷盤。而第二次刷盤成功,而這時迴圈卻結束了。可flushOK還是為false。
修改程式碼:在for迴圈外再加flushOK的判斷。