RocketMQ(五)訊息持久化儲存原始碼解析
一、原理
1、訊息存在哪了?
訊息持久化的地方其實是磁碟上,在如下目錄裡的commitlog資料夾裡。
/root/store/commitlog
原始碼如下:
// {@link org.apache.rocketmq.store.config.MessageStoreConfig} // 資料儲存根目錄 private String storePathRootDir = System.getProperty("user.home") + File.separator + "store"; // commitlog目錄 private String storePathCommitLog = System.getProperty("user.home") + File.separator + "store" + File.separator + "commitlog";// 每個commitlog檔案大小為1GB,超過1GB則建立新的commitlog檔案 private int mappedFileSizeCommitLog = 1024 * 1024 * 1024;
比如驗證下:
[root@iZ2ze84zygpzjw5bfcmh2hZ commitlog]# pwd /root/store/commitlog [root@iZ2ze84zygpzjw5bfcmh2hZ commitlog]# ll -h total 400K -rw-r--r-- 1 root root 1.0G Jun 30 18:21 00000000000000000000 [root@iZ2ze84zygpzjw5bfcmh2hZ commitlog]#
可以清晰的看到檔案大小是1.0G,超過1.0G再寫入訊息的話會自動建立新的commitlog檔案。
2、關鍵類解釋
2.1、MappedFile
對應的是commitlog檔案,比如上面的00000000000000000000
檔案。
2.2、MappedFileQueue
是MappedFile
所在的資料夾,對MappedFile
進行封裝成檔案佇列。
2.3、CommitLog
針對MappedFileQueue
的封裝使用。
二、Broker接收訊息
1、呼叫鏈
BrokerStartup.start() -》 BrokerController.start() -》 NettyRemotingServer.start() -》 NettyRemotingServer.prepareSharableHandlers() -》 newNettyServerHandler() -》 NettyRemotingAbstract.processMessageReceived() -》 NettyRemotingAbstract.processRequestCommand() -》 SendMessageProcessor.processRequest()
2、processRequest
SendMessageProcessor.processRequest()
@Override public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { RemotingCommand response = null; try { // 呼叫asyncProcessRequest response = asyncProcessRequest(ctx, request).get(); } catch (InterruptedException | ExecutionException e) { log.error("process SendMessage error, request : " + request.toString(), e); } return response; }
3、asyncProcessRequest
public CompletableFuture<RemotingCommand> asyncProcessRequest(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { final SendMessageContext mqtraceContext; switch (request.getCode()) { // 表示消費者傳送的訊息,傳送者消費失敗會重新發回佇列進行訊息重試 case RequestCode.CONSUMER_SEND_MSG_BACK: return this.asyncConsumerSendMsgBack(ctx, request); default: // 解析header,也就是我們Producer傳送過來的訊息都在request裡,給他解析到SendMessageRequestHeader物件裡去。 SendMessageRequestHeader requestHeader = parseRequestHeader(request); if (requestHeader == null) { return CompletableFuture.completedFuture(null); } mqtraceContext = buildMsgContext(ctx, requestHeader); // 將解析好的引數放到SendMessageContext物件裡 this.executeSendMessageHookBefore(ctx, request, mqtraceContext); if (requestHeader.isBatch()) { // 批處理訊息用 return this.asyncSendBatchMessage(ctx, request, mqtraceContext, requestHeader); } else { // 非批處理,我們這裡介紹的核心。 return this.asyncSendMessage(ctx, request, mqtraceContext, requestHeader); } } }
4、asyncSendMessage
private CompletableFuture<RemotingCommand> asyncSendMessage(ChannelHandlerContext ctx, RemotingCommand request, SendMessageContext mqtraceContext, SendMessageRequestHeader requestHeader) { final byte[] body = request.getBody(); int queueIdInt = requestHeader.getQueueId(); TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(requestHeader.getTopic()); // 拼湊message物件 MessageExtBrokerInner msgInner = new MessageExtBrokerInner(); msgInner.setTopic(requestHeader.getTopic()); msgInner.setQueueId(queueIdInt); msgInner.setBody(body); msgInner.setFlag(requestHeader.getFlag()); MessageAccessor.setProperties(msgInner, MessageDecoder.string2messageProperties(requestHeader.getProperties())); msgInner.setPropertiesString(requestHeader.getProperties()); msgInner.setBornTimestamp(requestHeader.getBornTimestamp()); msgInner.setBornHost(ctx.channel().remoteAddress()); msgInner.setStoreHost(this.getStoreHost()); msgInner.setReconsumeTimes(requestHeader.getReconsumeTimes() == null ? 0 : requestHeader.getReconsumeTimes()); CompletableFuture<PutMessageResult> putMessageResult = null; Map<String, String> origProps = MessageDecoder.string2messageProperties(requestHeader.getProperties()); // 真正接收訊息的方法 putMessageResult = this.brokerController.getMessageStore().asyncPutMessage(msgInner); return handlePutMessageResultFuture(putMessageResult, response, request, msgInner, responseHeader, mqtraceContext, ctx, queueIdInt); }
至此我們的訊息接收完成了,都封裝到了MessageExtBrokerInner物件裡。
三、Broker訊息儲存(持久化)
1、asyncPutMessage
接著上步驟的asyncSendMessage繼續看
@Override public CompletableFuture<PutMessageResult> asyncPutMessage(MessageExtBrokerInner msg) { CompletableFuture<PutMessageResult> putResultFuture = this.commitLog.asyncPutMessage(msg); putResultFuture.thenAccept((result) -> { ...... }); return putResultFuture; }
2、commitLog.asyncPutMessage
public CompletableFuture<PutMessageResult> asyncPutMessage(final MessageExtBrokerInner msg) { // 獲取最後一個檔案,MappedFile就是commitlog目錄下的那個0000000000檔案 MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile(); try { // 追加資料到commitlog result = mappedFile.appendMessage(msg, this.appendMessageCallback); switch (result.getStatus()) { ...... } // 將記憶體的資料持久化到磁碟 CompletableFuture<PutMessageStatus> flushResultFuture = submitFlushRequest(result, putMessageResult, msg); } }
3、appendMessagesInner
public AppendMessageResult appendMessagesInner(final MessageExt messageExt, final AppendMessageCallback cb) { // 將訊息寫到記憶體 return cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos, (MessageExtBrokerInner) messageExt); }
4、doAppend
@Override public AppendMessageResult doAppend(final long fileFromOffset, final ByteBuffer byteBuffer, final int maxBlank, final MessageExtBrokerInner msgInner) { // Initialization of storage space this.resetByteBuffer(msgStoreItemMemory, msgLen); // 1 TOTALSIZE this.msgStoreItemMemory.putInt(msgLen); // 2 MAGICCODE this.msgStoreItemMemory.putInt(CommitLog.MESSAGE_MAGIC_CODE); // 3 BODYCRC this.msgStoreItemMemory.putInt(msgInner.getBodyCRC()); // 4 QUEUEID this.msgStoreItemMemory.putInt(msgInner.getQueueId()); // 5 FLAG this.msgStoreItemMemory.putInt(msgInner.getFlag()); // 6 QUEUEOFFSET this.msgStoreItemMemory.putLong(queueOffset); // 7 PHYSICALOFFSET this.msgStoreItemMemory.putLong(fileFromOffset + byteBuffer.position()); // 8 SYSFLAG this.msgStoreItemMemory.putInt(msgInner.getSysFlag()); // 9 BORNTIMESTAMP this.msgStoreItemMemory.putLong(msgInner.getBornTimestamp()); // 10 BORNHOST this.resetByteBuffer(bornHostHolder, bornHostLength); this.msgStoreItemMemory.put(msgInner.getBornHostBytes(bornHostHolder)); // 11 STORETIMESTAMP this.msgStoreItemMemory.putLong(msgInner.getStoreTimestamp()); // 12 STOREHOSTADDRESS this.resetByteBuffer(storeHostHolder, storeHostLength); this.msgStoreItemMemory.put(msgInner.getStoreHostBytes(storeHostHolder)); // 13 RECONSUMETIMES this.msgStoreItemMemory.putInt(msgInner.getReconsumeTimes()); // 14 Prepared Transaction Offset this.msgStoreItemMemory.putLong(msgInner.getPreparedTransactionOffset()); // 15 BODY this.msgStoreItemMemory.putInt(bodyLength); if (bodyLength > 0) this.msgStoreItemMemory.put(msgInner.getBody()); // 16 TOPIC this.msgStoreItemMemory.put((byte) topicLength); this.msgStoreItemMemory.put(topicData); // 17 PROPERTIES this.msgStoreItemMemory.putShort((short) propertiesLength); if (propertiesLength > 0) this.msgStoreItemMemory.put(propertiesData); final long beginTimeMills = CommitLog.this.defaultMessageStore.now(); // Write messages to the queue buffer byteBuffer.put(this.msgStoreItemMemory.array(), 0, msgLen); return result; }
這一步其實就已經把訊息儲存到緩衝區裡了,也就是msgStoreItemMemory,這裡採取的NIO。
private final ByteBuffer msgStoreItemMemory;
5、submitFlushRequest
再次回到【2、commitLog.asyncPutMessage】的submitFlushRequest方法,因為之前的方法是將資料已經寫到ByteBuffer緩衝區裡了,下一步也就是我們現在這一步就要刷盤了。
public CompletableFuture<PutMessageStatus> submitFlushRequest(AppendMessageResult result, PutMessageResult putMessageResult, MessageExt messageExt) { // 同步刷盤 if (FlushDiskType.SYNC_FLUSH == this.defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) { final GroupCommitService service = (GroupCommitService) this.flushCommitLogService; if (messageExt.isWaitStoreMsgOK()) { GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes(), this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout()); service.putRequest(request); return request.future(); } else { service.wakeup(); return CompletableFuture.completedFuture(PutMessageStatus.PUT_OK); } } // 非同步刷盤 else { if (!this.defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()) { flushCommitLogService.wakeup(); } else { commitLogService.wakeup(); } return CompletableFuture.completedFuture(PutMessageStatus.PUT_OK); } }
6、非同步刷盤
class FlushRealTimeService extends FlushCommitLogService { @Override public void run() { while (!this.isStopped()) { try { // 每隔500ms刷一次盤 if (flushCommitLogTimed) { Thread.sleep(500); } else { this.waitForRunning(500); } // 呼叫mappedFileQueue的flush方法 CommitLog.this.mappedFileQueue.flush(flushPhysicQueueLeastPages); } catch (Throwable e) { } } } }
可看出預設是每隔500毫秒刷一次盤
7、mappedFileQueue.flush
public boolean flush(final int flushLeastPages) { MappedFile mappedFile = this.findMappedFileByOffset(this.flushedWhere, this.flushedWhere == 0); if (mappedFile != null) { // 真正的刷盤操作 int offset = mappedFile.flush(flushLeastPages); } }
8、mappedFile.flush
public int flush(final int flushLeastPages) { if (this.isAbleToFlush(flushLeastPages)) { try { if (writeBuffer != null || this.fileChannel.position() != 0) { // 刷盤 NIO this.fileChannel.force(false); } else { // 刷盤 NIO this.mappedByteBuffer.force(); } } catch (Throwable e) { log.error("Error occurred when force data to disk.", e); } } return this.getFlushedPosition(); }
至此已經全部結束。
四、總結
面試被問:Broker收到訊息後怎麼持久化的?
回答者:有兩種方式:同步和非同步。一般選擇非同步,同步效率低,但是更可靠。訊息儲存大致原理是:
核心類MappedFile對應的是每個commitlog檔案,MappedFileQueue相當於資料夾,管理所有的檔案,還有一個管理者CommitLog物件,他負責提供一些操作。具體的是Broker端拿到訊息後先將訊息、topic、queue等內容存到ByteBuffer裡,然後去持久化到commitlog檔案中。commitlog檔案大小為1G,超出大小會新建立commitlog檔案來儲存,採取的nio方式。
五、補充:同步/非同步刷盤
1、關鍵類
類名 | 描述 | 刷盤效能 |
---|---|---|
CommitRealTimeService | 非同步刷盤 &&開啟位元組緩衝區 | 最高 |
FlushRealTimeService | 非同步刷盤&&關閉記憶體位元組緩衝區 | 較高 |
GroupCommitService | 同步刷盤,刷完盤才會返回訊息寫入成功 | 最低 |
2、圖解
3、同步刷盤
3.1、原始碼
// {@link org.apache.rocketmq.store.CommitLog#submitFlushRequest()} // Synchronization flush if (FlushDiskType.SYNC_FLUSH == this.defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) { // 同步刷盤service -> GroupCommitService final GroupCommitService service = (GroupCommitService) this.flushCommitLogService; if (messageExt.isWaitStoreMsgOK()) { // 資料準備 GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes(), this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout()); // 將資料物件放到requestsWrite裡 service.putRequest(request); return request.future(); } else { service.wakeup(); return CompletableFuture.completedFuture(PutMessageStatus.PUT_OK); } }
putRequest
public synchronized void putRequest(final GroupCommitRequest request) { synchronized (this.requestsWrite) { this.requestsWrite.add(request); } // 這裡很關鍵!!!,給他設定成true。然後計數器-1。下面run方法的時候才會進行交換資料且return if (hasNotified.compareAndSet(false, true)) { waitPoint.countDown(); // notify } }
run
public void run() { while (!this.isStopped()) { try { // 是同步還是非同步的關鍵方法,也就是說組不阻塞全看這裡。 this.waitForRunning(10); // 真正的刷盤邏輯 this.doCommit(); } catch (Exception e) { CommitLog.log.warn(this.getServiceName() + " service has exception. ", e); } } }
waitForRunning
protected volatile AtomicBoolean hasNotified = new AtomicBoolean(false); // 其實就是CountDownLatch protected final CountDownLatch2 waitPoint = new CountDownLatch2(1); protected void waitForRunning(long interval) { // 如果是true,且給他改成false成功的話,則onWaitEnd()且return,但是預設是false,也就是預設情況下這個if不會進。 if (hasNotified.compareAndSet(true, false)) { this.onWaitEnd(); return; } //entry to wait waitPoint.reset(); try { // 等待,預設值是1,也就是waitPoint.countDown()一次後就會啟用這裡。 waitPoint.await(interval, TimeUnit.MILLISECONDS); } catch (InterruptedException e) { log.error("Interrupted", e); } finally { // 給狀態值設定成false hasNotified.set(false); this.onWaitEnd(); } }
3.2、總結
總結下同步刷盤的主要流程:
核心類是GroupCommitService,核心方法 是waitForRunning。
- 先呼叫putRequest方法將hasNotified變為true,且進行notify,也就是
waitPoint.countDown()
。 - 其次是run方法裡的
waitForRunning()
,waitForRunning()
判斷hasNotified是不是true,是true則交換資料然後return掉,也就是不進行await阻塞,直接return。 - 最後上一步return了,沒有阻塞,那麼順理成章的呼叫doCommit進行真正意義的刷盤。
4、非同步刷盤
4.1、原始碼
核心類是:FlushRealTimeService
// {@link org.apache.rocketmq.store.CommitLog#submitFlushRequest()} // Asynchronous flush if (!this.defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()) { flushCommitLogService.wakeup(); } else { commitLogService.wakeup(); } return CompletableFuture.completedFuture(PutMessageStatus.PUT_OK);
run
// {@link org.apache.rocketmq.store.CommitLog.FlushRealTimeService#run()} class FlushRealTimeService extends FlushCommitLogService { @Override public void run() { while (!this.isStopped()) { try { // 每隔500ms刷一次盤 if (flushCommitLogTimed) { Thread.sleep(500); } else { // 根上面同步刷盤呼叫的是同一個方法,區別在於這裡沒有將hasNotified變為true,也就是還是預設的false,那麼waitForRunning方法內部的第一個判斷就不會走,就不會return掉,就會進行下面的await方法阻塞,預設阻塞時間是500毫秒。也就是預設500ms刷一次盤。 this.waitForRunning(500); } // 呼叫mappedFileQueue的flush方法 CommitLog.this.mappedFileQueue.flush(flushPhysicQueueLeastPages); } catch (Throwable e) { } } } }
4.2、總結
核心類#方法:FlushRealTimeService#run()
- 判斷
flushCommitLogTimed
是不是true,預設false,是true則直接sleep(500ms)然後進行mappedFileQueue.flush()
刷盤。 - 若是false,則進入
waitForRunning(500)
,這裡是和同步刷盤的區別關鍵所在,同步刷盤之前將hasNotified變為true了,所以直接一套小連招:return+doCommit
了 ,非同步這裡直接呼叫的waitForRunning(500)
,在這之前沒任何對hasNotified的操作,所以不會return,而是會繼續走下面的waitPoint.await(500, TimeUnit.MILLISECONDS);
進行阻塞500毫秒,500毫秒後自動喚醒然後進行flush刷盤。也就是非同步刷盤的話預設500ms刷盤一次。
@Override
publicRemotingCommandprocessRequest(ChannelHandlerContextctx,
RemotingCommandrequest)throwsRemotingCommandException{
RemotingCommandresponse=null;
try{
//呼叫asyncProcessRequest
response=asyncProcessRequest(ctx,request).get();
}catch(InterruptedException|ExecutionExceptione){
log.error("processSendMessageerror,request:"+request.toString(),e);
}
returnresponse;
}