1. 程式人生 > 實用技巧 >RocketMQ(五)訊息持久化儲存原始碼解析

RocketMQ(五)訊息持久化儲存原始碼解析

一、原理

1、訊息存在哪了?

訊息持久化的地方其實是磁碟上,在如下目錄裡的commitlog資料夾裡。

/root/store/commitlog

原始碼如下:

// {@link org.apache.rocketmq.store.config.MessageStoreConfig}

// 資料儲存根目錄
private String storePathRootDir = System.getProperty("user.home") + File.separator + "store";
// commitlog目錄
private String storePathCommitLog = System.getProperty("user.home") + File.separator + "store" + File.separator + "commitlog";
// 每個commitlog檔案大小為1GB,超過1GB則建立新的commitlog檔案 private int mappedFileSizeCommitLog = 1024 * 1024 * 1024;

比如驗證下

[root@iZ2ze84zygpzjw5bfcmh2hZ commitlog]# pwd
/root/store/commitlog
[root@iZ2ze84zygpzjw5bfcmh2hZ commitlog]# ll -h
total 400K
-rw-r--r-- 1 root root 1.0G Jun 30 18:21 00000000000000000000
[root@iZ2ze84zygpzjw5bfcmh2hZ commitlog]#

可以清晰的看到檔案大小是1.0G,超過1.0G再寫入訊息的話會自動建立新的commitlog檔案。

2、關鍵類解釋

2.1、MappedFile

對應的是commitlog檔案,比如上面的00000000000000000000檔案。

2.2、MappedFileQueue

MappedFile所在的資料夾,對MappedFile進行封裝成檔案佇列。

2.3、CommitLog

針對MappedFileQueue的封裝使用。

二、Broker接收訊息

1、呼叫鏈

BrokerStartup.start() -》 BrokerController.start() -》 NettyRemotingServer.start() -》 NettyRemotingServer.prepareSharableHandlers() -》 new
NettyServerHandler() -》 NettyRemotingAbstract.processMessageReceived() -》 NettyRemotingAbstract.processRequestCommand() -》 SendMessageProcessor.processRequest()

2、processRequest

SendMessageProcessor.processRequest()

@Override
public RemotingCommand processRequest(ChannelHandlerContext ctx,
                                      RemotingCommand request) throws RemotingCommandException {
    RemotingCommand response = null;
    try {
        // 呼叫asyncProcessRequest
        response = asyncProcessRequest(ctx, request).get();
    } catch (InterruptedException | ExecutionException e) {
        log.error("process SendMessage error, request : " + request.toString(), e);
    }
    return response;
}


3、asyncProcessRequest

public CompletableFuture<RemotingCommand> asyncProcessRequest(ChannelHandlerContext ctx,
                                                                  RemotingCommand request) throws RemotingCommandException {
    final SendMessageContext mqtraceContext;
    switch (request.getCode()) {
        // 表示消費者傳送的訊息,傳送者消費失敗會重新發回佇列進行訊息重試
        case RequestCode.CONSUMER_SEND_MSG_BACK:
            return this.asyncConsumerSendMsgBack(ctx, request);
        default:
            // 解析header,也就是我們Producer傳送過來的訊息都在request裡,給他解析到SendMessageRequestHeader物件裡去。
            SendMessageRequestHeader requestHeader = parseRequestHeader(request);
            if (requestHeader == null) {
                return CompletableFuture.completedFuture(null);
            }
            mqtraceContext = buildMsgContext(ctx, requestHeader);
            // 將解析好的引數放到SendMessageContext物件裡
            this.executeSendMessageHookBefore(ctx, request, mqtraceContext);
            if (requestHeader.isBatch()) {
                // 批處理訊息用
                return this.asyncSendBatchMessage(ctx, request, mqtraceContext, requestHeader);
            } else {
                // 非批處理,我們這裡介紹的核心。
                return this.asyncSendMessage(ctx, request, mqtraceContext, requestHeader);
            }
    }
}


4、asyncSendMessage

private CompletableFuture<RemotingCommand> asyncSendMessage(ChannelHandlerContext ctx, RemotingCommand request,
                                                                SendMessageContext mqtraceContext,
                                                                SendMessageRequestHeader requestHeader) {
    final byte[] body = request.getBody();

    int queueIdInt = requestHeader.getQueueId();
    TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(requestHeader.getTopic());

    // 拼湊message物件
    MessageExtBrokerInner msgInner = new MessageExtBrokerInner();
    msgInner.setTopic(requestHeader.getTopic());
    msgInner.setQueueId(queueIdInt);
    msgInner.setBody(body);
    msgInner.setFlag(requestHeader.getFlag());
    MessageAccessor.setProperties(msgInner, MessageDecoder.string2messageProperties(requestHeader.getProperties()));
    msgInner.setPropertiesString(requestHeader.getProperties());
    msgInner.setBornTimestamp(requestHeader.getBornTimestamp());
    msgInner.setBornHost(ctx.channel().remoteAddress());
    msgInner.setStoreHost(this.getStoreHost());
    msgInner.setReconsumeTimes(requestHeader.getReconsumeTimes() == null ? 0 : requestHeader.getReconsumeTimes());
    
    CompletableFuture<PutMessageResult> putMessageResult = null;
    Map<String, String> origProps = MessageDecoder.string2messageProperties(requestHeader.getProperties());
    // 真正接收訊息的方法
    putMessageResult = this.brokerController.getMessageStore().asyncPutMessage(msgInner);
    return handlePutMessageResultFuture(putMessageResult, response, request, msgInner, responseHeader, mqtraceContext, ctx, queueIdInt);
}

至此我們的訊息接收完成了,都封裝到了MessageExtBrokerInner物件裡。

三、Broker訊息儲存(持久化)

1、asyncPutMessage

接著上步驟的asyncSendMessage繼續看

@Override
public CompletableFuture<PutMessageResult> asyncPutMessage(MessageExtBrokerInner msg) {
    CompletableFuture<PutMessageResult> putResultFuture = this.commitLog.asyncPutMessage(msg);
    putResultFuture.thenAccept((result) -> {
        ......
    });
    return putResultFuture;
}

2、commitLog.asyncPutMessage

public CompletableFuture<PutMessageResult> asyncPutMessage(final MessageExtBrokerInner msg) {
    // 獲取最後一個檔案,MappedFile就是commitlog目錄下的那個0000000000檔案
    MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile();
    try {
        // 追加資料到commitlog
        result = mappedFile.appendMessage(msg, this.appendMessageCallback);
        switch (result.getStatus()) {
            ......
        }
        // 將記憶體的資料持久化到磁碟
        CompletableFuture<PutMessageStatus> flushResultFuture = submitFlushRequest(result, putMessageResult, msg);
    }
}

3、appendMessagesInner

public AppendMessageResult appendMessagesInner(final MessageExt messageExt, final AppendMessageCallback cb) {
    // 將訊息寫到記憶體
    return cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos, (MessageExtBrokerInner) messageExt);
}

4、doAppend

@Override
public AppendMessageResult doAppend(final long fileFromOffset, final ByteBuffer byteBuffer, final int maxBlank,
                                    final MessageExtBrokerInner msgInner) {
    // Initialization of storage space
    this.resetByteBuffer(msgStoreItemMemory, msgLen);
    // 1 TOTALSIZE
    this.msgStoreItemMemory.putInt(msgLen);
    // 2 MAGICCODE
    this.msgStoreItemMemory.putInt(CommitLog.MESSAGE_MAGIC_CODE);
    // 3 BODYCRC
    this.msgStoreItemMemory.putInt(msgInner.getBodyCRC());
    // 4 QUEUEID
    this.msgStoreItemMemory.putInt(msgInner.getQueueId());
    // 5 FLAG
    this.msgStoreItemMemory.putInt(msgInner.getFlag());
    // 6 QUEUEOFFSET
    this.msgStoreItemMemory.putLong(queueOffset);
    // 7 PHYSICALOFFSET
    this.msgStoreItemMemory.putLong(fileFromOffset + byteBuffer.position());
    // 8 SYSFLAG
    this.msgStoreItemMemory.putInt(msgInner.getSysFlag());
    // 9 BORNTIMESTAMP
    this.msgStoreItemMemory.putLong(msgInner.getBornTimestamp());
    // 10 BORNHOST
    this.resetByteBuffer(bornHostHolder, bornHostLength);
    this.msgStoreItemMemory.put(msgInner.getBornHostBytes(bornHostHolder));
    // 11 STORETIMESTAMP
    this.msgStoreItemMemory.putLong(msgInner.getStoreTimestamp());
    // 12 STOREHOSTADDRESS
    this.resetByteBuffer(storeHostHolder, storeHostLength);
    this.msgStoreItemMemory.put(msgInner.getStoreHostBytes(storeHostHolder));
    // 13 RECONSUMETIMES
    this.msgStoreItemMemory.putInt(msgInner.getReconsumeTimes());
    // 14 Prepared Transaction Offset
    this.msgStoreItemMemory.putLong(msgInner.getPreparedTransactionOffset());
    // 15 BODY
    this.msgStoreItemMemory.putInt(bodyLength);
    if (bodyLength > 0)
        this.msgStoreItemMemory.put(msgInner.getBody());
    // 16 TOPIC
    this.msgStoreItemMemory.put((byte) topicLength);
    this.msgStoreItemMemory.put(topicData);
    // 17 PROPERTIES
    this.msgStoreItemMemory.putShort((short) propertiesLength);
    if (propertiesLength > 0)
        this.msgStoreItemMemory.put(propertiesData);

    final long beginTimeMills = CommitLog.this.defaultMessageStore.now();
    // Write messages to the queue buffer
    byteBuffer.put(this.msgStoreItemMemory.array(), 0, msgLen);
    return result;
}

這一步其實就已經把訊息儲存到緩衝區裡了,也就是msgStoreItemMemory,這裡採取的NIO。

private final ByteBuffer msgStoreItemMemory;

5、submitFlushRequest

再次回到【2、commitLog.asyncPutMessage】的submitFlushRequest方法,因為之前的方法是將資料已經寫到ByteBuffer緩衝區裡了,下一步也就是我們現在這一步就要刷盤了。

public CompletableFuture<PutMessageStatus> submitFlushRequest(AppendMessageResult result, PutMessageResult putMessageResult,
                                                              MessageExt messageExt) {
    // 同步刷盤
    if (FlushDiskType.SYNC_FLUSH == this.defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {
        final GroupCommitService service = (GroupCommitService) this.flushCommitLogService;
        if (messageExt.isWaitStoreMsgOK()) {
            GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes(),
                                                                this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout());
            service.putRequest(request);
            return request.future();
        } else {
            service.wakeup();
            return CompletableFuture.completedFuture(PutMessageStatus.PUT_OK);
        }
    }
    // 非同步刷盤
    else {
        if (!this.defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {
            
            flushCommitLogService.wakeup();
        } else  {
            commitLogService.wakeup();
        }
        return CompletableFuture.completedFuture(PutMessageStatus.PUT_OK);
    }
}

6、非同步刷盤

class FlushRealTimeService extends FlushCommitLogService {
    @Override
    public void run() {
        while (!this.isStopped()) {
            try {
    // 每隔500ms刷一次盤
                if (flushCommitLogTimed) {
                    Thread.sleep(500);
                } else {
                    this.waitForRunning(500);
                }
                // 呼叫mappedFileQueue的flush方法
                CommitLog.this.mappedFileQueue.flush(flushPhysicQueueLeastPages);
            } catch (Throwable e) {
            }
        }
    }
}

可看出預設是每隔500毫秒刷一次盤

7、mappedFileQueue.flush

public boolean flush(final int flushLeastPages) {
    MappedFile mappedFile = this.findMappedFileByOffset(this.flushedWhere, this.flushedWhere == 0);
    if (mappedFile != null) {
        // 真正的刷盤操作
        int offset = mappedFile.flush(flushLeastPages);
    }
}

8、mappedFile.flush

public int flush(final int flushLeastPages) {
    if (this.isAbleToFlush(flushLeastPages)) {
        try {
            if (writeBuffer != null || this.fileChannel.position() != 0) {
                // 刷盤   NIO
                this.fileChannel.force(false);
            } else {
    // 刷盤  NIO
                this.mappedByteBuffer.force();
            }
        } catch (Throwable e) {
            log.error("Error occurred when force data to disk.", e);
        }
    }
    return this.getFlushedPosition();
}

至此已經全部結束。

四、總結

面試被問:Broker收到訊息後怎麼持久化的?

回答者:有兩種方式:同步和非同步。一般選擇非同步,同步效率低,但是更可靠。訊息儲存大致原理是:

核心類MappedFile對應的是每個commitlog檔案,MappedFileQueue相當於資料夾,管理所有的檔案,還有一個管理者CommitLog物件,他負責提供一些操作。具體的是Broker端拿到訊息後先將訊息、topic、queue等內容存到ByteBuffer裡,然後去持久化到commitlog檔案中。commitlog檔案大小為1G,超出大小會新建立commitlog檔案來儲存,採取的nio方式。

五、補充:同步/非同步刷盤

1、關鍵類

類名描述刷盤效能
CommitRealTimeService 非同步刷盤 &&開啟位元組緩衝區 最高
FlushRealTimeService 非同步刷盤&&關閉記憶體位元組緩衝區 較高
GroupCommitService 同步刷盤,刷完盤才會返回訊息寫入成功 最低

2、圖解

3、同步刷盤

3.1、原始碼

// {@link org.apache.rocketmq.store.CommitLog#submitFlushRequest()}
// Synchronization flush
if (FlushDiskType.SYNC_FLUSH == this.defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {
    // 同步刷盤service -> GroupCommitService
    final GroupCommitService service = (GroupCommitService) this.flushCommitLogService;
    if (messageExt.isWaitStoreMsgOK()) {
        // 資料準備
        GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes(),
                                                 this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout());
        // 將資料物件放到requestsWrite裡
        service.putRequest(request);
        return request.future();
    } else {
        service.wakeup();
        return CompletableFuture.completedFuture(PutMessageStatus.PUT_OK);
    }
}

putRequest

public synchronized void putRequest(final GroupCommitRequest request) {
    synchronized (this.requestsWrite) {
        this.requestsWrite.add(request);
    }
    // 這裡很關鍵!!!,給他設定成true。然後計數器-1。下面run方法的時候才會進行交換資料且return
    if (hasNotified.compareAndSet(false, true)) {
        waitPoint.countDown(); // notify
    }
}

run

public void run() {
    while (!this.isStopped()) {
        try {
            // 是同步還是非同步的關鍵方法,也就是說組不阻塞全看這裡。
            this.waitForRunning(10);
            // 真正的刷盤邏輯
            this.doCommit();
        } catch (Exception e) {
            CommitLog.log.warn(this.getServiceName() + " service has exception. ", e);
        }
    }
}

waitForRunning

protected volatile AtomicBoolean hasNotified = new AtomicBoolean(false);
// 其實就是CountDownLatch
protected final CountDownLatch2 waitPoint = new CountDownLatch2(1);

protected void waitForRunning(long interval) {
    // 如果是true,且給他改成false成功的話,則onWaitEnd()且return,但是預設是false,也就是預設情況下這個if不會進。
    if (hasNotified.compareAndSet(true, false)) {
        this.onWaitEnd();
        return;
    }

    //entry to wait
    waitPoint.reset();

    try {
        // 等待,預設值是1,也就是waitPoint.countDown()一次後就會啟用這裡。
        waitPoint.await(interval, TimeUnit.MILLISECONDS);
    } catch (InterruptedException e) {
        log.error("Interrupted", e);
    } finally {
        // 給狀態值設定成false
        hasNotified.set(false);
        this.onWaitEnd();
    }
}

3.2、總結

總結下同步刷盤的主要流程:

核心類是GroupCommitService,核心方法 是waitForRunning。

  • 先呼叫putRequest方法將hasNotified變為true,且進行notify,也就是waitPoint.countDown()
  • 其次是run方法裡的waitForRunning()waitForRunning()判斷hasNotified是不是true,是true則交換資料然後return掉,也就是不進行await阻塞,直接return。
  • 最後上一步return了,沒有阻塞,那麼順理成章的呼叫doCommit進行真正意義的刷盤。

4、非同步刷盤

4.1、原始碼

核心類是:FlushRealTimeService

// {@link org.apache.rocketmq.store.CommitLog#submitFlushRequest()}
// Asynchronous flush
if (!this.defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {
    flushCommitLogService.wakeup();
} else  {
    commitLogService.wakeup();
}
return CompletableFuture.completedFuture(PutMessageStatus.PUT_OK);

run

// {@link org.apache.rocketmq.store.CommitLog.FlushRealTimeService#run()}

class FlushRealTimeService extends FlushCommitLogService {
    @Override
    public void run() {
        while (!this.isStopped()) {
            try {
    // 每隔500ms刷一次盤
                if (flushCommitLogTimed) {
                    Thread.sleep(500);
                } else {
                    // 根上面同步刷盤呼叫的是同一個方法,區別在於這裡沒有將hasNotified變為true,也就是還是預設的false,那麼waitForRunning方法內部的第一個判斷就不會走,就不會return掉,就會進行下面的await方法阻塞,預設阻塞時間是500毫秒。也就是預設500ms刷一次盤。
                    this.waitForRunning(500);
                }
                // 呼叫mappedFileQueue的flush方法
                CommitLog.this.mappedFileQueue.flush(flushPhysicQueueLeastPages);
            } catch (Throwable e) {
            }
        }
    }
}

4.2、總結

核心類#方法:FlushRealTimeService#run()

  • 判斷flushCommitLogTimed是不是true,預設false,是true則直接sleep(500ms)然後進行mappedFileQueue.flush()刷盤。
  • 若是false,則進入waitForRunning(500),這裡是和同步刷盤的區別關鍵所在,同步刷盤之前將hasNotified變為true了,所以直接一套小連招:return+doCommit了 ,非同步這裡直接呼叫的waitForRunning(500),在這之前沒任何對hasNotified的操作,所以不會return,而是會繼續走下面的waitPoint.await(500, TimeUnit.MILLISECONDS);進行阻塞500毫秒,500毫秒後自動喚醒然後進行flush刷盤。也就是非同步刷盤的話預設500ms刷盤一次。

@Override
publicRemotingCommandprocessRequest(ChannelHandlerContextctx,
RemotingCommandrequest)
throwsRemotingCommandException
{
RemotingCommandresponse=null;
try{
//呼叫asyncProcessRequest
response=asyncProcessRequest(ctx,request).get();
}catch(InterruptedException|ExecutionExceptione){
log.error("processSendMessageerror,request:"+request.toString(),e);
}
returnresponse;
}