1. 程式人生 > 程式設計 >RocketMQ 生產者和訊息儲存

RocketMQ 生產者和訊息儲存

RocketMQ 生產者和訊息儲存

1. 前言

本篇主要從原始碼分析訊息的傳送及儲存。rocketmq傳送訊息分為三種實現方式:可靠同步傳送、可靠非同步傳送、單向傳送。目前的MQ中介軟體從儲存模型來看,分為需要持久化和不需要持久化兩種。本篇文章會分析rocketmq的訊息儲存機制。

2. RocketMQ 訊息

先看看rocketmq 訊息封裝類org.apache.rocketmq.common.message.Message

RocketMQ

基本屬性:主題topic、訊息flag、訊息體、擴充套件屬性

隱藏屬性:

  • tag:訊息TAG,用於訊息過濾
  • keys:訊息索引鍵
  • waitStoreMsgOK:訊息傳送時是否等訊息儲存完成後再返回
  • delayTimeLevel:訊息延遲級別,用於定時訊息或訊息重試

擴充套件屬性都存在Message的properties中。

3. 生產者啟動流程

我們從DefaultMQProducerImpl 的start 方法追蹤。

第一步:檢查productGroup 是否符合要求,並改變生產者的instanceName為程式ID

//DefaultMQProducerImpl::start
public void start() throws MQClientException {
    this.start(true);//預設為true
}

public void start(final boolean
startFactory)
throws MQClientException
{ switch (this.serviceState) { case CREATE_JUST: this.serviceState = ServiceState.START_FAILED; this.checkConfig(); if (!this.defaultMQProducer.getProducerGroup().equals(MixAll.CLIENT_INNER_PRODUCER_GROUP)) { this
.defaultMQProducer.changeInstanceNameToPID(); } //第一步 this.mQClientFactory = MQClientManager.getInstance().getAndCreateMQClientInstance(this.defaultMQProducer,rpcHook); boolean registerOK = mQClientFactory.registerProducer(this.defaultMQProducer.getProducerGroup(),this); if (!registerOK) { this.serviceState = ServiceState.CREATE_JUST; throw new MQClientException("The producer group[" + this.defaultMQProducer.getProducerGroup() + "] has been created before,specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),null); } this.topicPublishInfoTable.put(this.defaultMQProducer.getCreateTopicKey(),new TopicPublishInfo()); if (startFactory) { mQClientFactory.start(); } log.info("the producer [{}] start OK. sendMessageWithVIPChannel={}",this.defaultMQProducer.getProducerGroup(),this.defaultMQProducer.isSendMessageWithVIPChannel()); this.serviceState = ServiceState.RUNNING; break; case RUNNING: case START_FAILED: case SHUTDOWN_ALREADY: throw new MQClientException("The producer service state not OK,maybe started once," + this.serviceState + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),null); default: break; } this.mQClientFactory.sendHeartbeatToAllBrokerWithLock(); } 複製程式碼

第二步:建立MQClientInstance例項。

第三步:向MQClientInstance註冊,將當前生產者加入MQClientInstance管理中,方便後續呼叫網路請求、進行心跳檢測等。

第四步:啟動MQClientInstance,如果MQClientInstance已經啟動,如果已經啟動則本次不啟動。

4. 訊息傳送基本流程

訊息傳送流程主要是:驗證訊息、查詢路由、訊息傳送(包含異常處理機制)。

訊息驗證,主要是進行訊息的長度驗證,我們主要講解一下查詢路由和訊息傳送。

4.1 查詢路由

訊息傳送之前,首先需要獲取主題的路由資訊

//DefaultMQProducerImpl::tryToFindTopicPublishInfo
private TopicPublishInfo tryToFindTopicPublishInfo(final String topic) {
        TopicPublishInfo topicPublishInfo = this.topicPublishInfoTable.get(topic);
        if (null == topicPublishInfo || !topicPublishInfo.ok()) {
            this.topicPublishInfoTable.putIfAbsent(topic,new TopicPublishInfo());
            this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);
            topicPublishInfo = this.topicPublishInfoTable.get(topic);
        }

        if (topicPublishInfo.isHaveTopicRouterInfo() || topicPublishInfo.ok()) {
            return topicPublishInfo;
        } else {
            this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic,true,this.defaultMQProducer);
            topicPublishInfo = this.topicPublishInfoTable.get(topic);
            return topicPublishInfo;
        }
    }
複製程式碼

如果生產者快取了該 topic 路由資訊,包含了訊息佇列,則直接返回該路由資訊,如果沒有快取或沒有包含訊息佇列,則向NameServer查詢該topic的路由資訊。如果是第一次傳送訊息,未找到會嘗試用預設topic去查詢。沒找到則報錯。

4.2 選擇訊息

根據路由資訊選擇訊息佇列,返回的訊息佇列按照broker、序號排序。首先訊息傳送採取重試機制,迴圈執行,選擇訊息佇列、傳送訊息,傳送成功則返回,傳送失敗則重試。訊息選擇有兩種方式。

  • sendLatencyFaultEnable=false,預設機制
  • sendLatencyFaultEnable=true,啟用Broker故障延遲機制
//MQFaultStrategy::selectOneMessageQueue
public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo,final String lastBrokerName) {
    if (this.sendLatencyFaultEnable) {
        try {
            int index = tpInfo.getSendWhichQueue().getAndIncrement();
            for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) {
                int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size();
                if (pos < 0)
                    pos = 0;
                MessageQueue mq = tpInfo.getMessageQueueList().get(pos);
                if (latencyFaultTolerance.isAvailable(mq.getBrokerName())) {
                    if (null == lastBrokerName || mq.getBrokerName().equals(lastBrokerName))
                        return mq;
                }
            }

            final String notBestBroker = latencyFaultTolerance.pickOneAtLeast();
            int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker);
            if (writeQueueNums > 0) {
                final MessageQueue mq = tpInfo.selectOneMessageQueue();
                if (notBestBroker != null) {
                    mq.setBrokerName(notBestBroker);
                    mq.setQueueId(tpInfo.getSendWhichQueue().getAndIncrement() % writeQueueNums);
                }
                return mq;
            } else {
                latencyFaultTolerance.remove(notBestBroker);
            }
        } catch (Exception e) {
            log.error("Error occurred when selecting message queue",e);
        }

        return tpInfo.selectOneMessageQueue();
    }

    return tpInfo.selectOneMessageQueue(lastBrokerName);
}
複製程式碼

4.3 訊息傳送

訊息傳送API核心入口:DefaultMQProducerImpl::sendKernelImpl

private SendResult sendKernelImpl(final Message msg,final MessageQueue mq,final CommunicationMode communicationMode,final SendCallback sendCallback,final TopicPublishInfo topicPublishInfo,final long timeout) throws MQClientException,RemotingException,MQBrokerException,InterruptedException {
//省略
}
複製程式碼

引數詳解:

  • Message msg:待傳送訊息
  • MessageQueue mq:訊息將傳送到訊息佇列上
  • CommunicationMode communicationMode:訊息傳送模式,SYNC、ASYNC、ONEWAY
  • SendCallback sendCallback:非同步訊息回撥函式
  • TopicPublishInfo topicPublishInfo:主題路由訊息
  • long timeout:訊息傳送超時時間

傳送步驟:

  1. 根據MessageQueue獲取Broker的網路地址
  2. 為訊息分配全域性唯一ID
  3. 如果註冊了訊息傳送鉤子函式,則執行訊息傳送之前的增強邏輯
  4. 構建訊息傳送請求包
  5. 根據訊息傳送方式,同步、非同步、單向方式進行網路傳輸
  6. 如果註冊了訊息傳送鉤子函式,執行after邏輯

4.3.1 同步傳送

MQ客戶端傳送訊息的入口是MQClientAPIImpl::sendMessage

同步傳送步驟

  1. 檢查訊息傳送是否合理
//AbstractSendMessageProcessor::msgCheck
protected RemotingCommand msgCheck(final ChannelHandlerContext ctx,final SendMessageRequestHeader requestHeader,final RemotingCommand response) {
    if (!PermName.isWriteable(this.brokerController.getBrokerConfig().getBrokerPermission())
        && this.brokerController.getTopicConfigManager().isOrderTopic(requestHeader.getTopic())) {
        response.setCode(ResponseCode.NO_PERMISSION);
        response.setRemark("the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1()
            + "] sending message is forbidden");
        return response;
    }
    if (!this.brokerController.getTopicConfigManager().isTopicCanSendMessage(requestHeader.getTopic())) {
        String errorMsg = "the topic[" + requestHeader.getTopic() + "] is conflict with system reserved words.";
        log.warn(errorMsg);
        response.setCode(ResponseCode.SYSTEM_ERROR);
        response.setRemark(errorMsg);
        return response;
    }

    TopicConfig topicConfig =
        this.brokerController.getTopicConfigManager().selectTopicConfig(requestHeader.getTopic());
    if (null == topicConfig) {
        int topicSysFlag = 0;
        if (requestHeader.isUnitMode()) {
            if (requestHeader.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
                topicSysFlag = TopicSysFlag.buildSysFlag(false,true);
            } else {
                topicSysFlag = TopicSysFlag.buildSysFlag(true,false);
            }
        }

        log.warn("the topic {} not exist,producer: {}",requestHeader.getTopic(),ctx.channel().remoteAddress());
        topicConfig = this.brokerController.getTopicConfigManager().createTopicInSendMessageMethod(
            requestHeader.getTopic(),requestHeader.getDefaultTopic(),RemotingHelper.parseChannelRemoteAddr(ctx.channel()),requestHeader.getDefaultTopicQueueNums(),topicSysFlag);

        if (null == topicConfig) {
            if (requestHeader.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
                topicConfig =
                    this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(
                        requestHeader.getTopic(),1,PermName.PERM_WRITE | PermName.PERM_READ,topicSysFlag);
            }
        }

        if (null == topicConfig) {
            response.setCode(ResponseCode.TOPIC_NOT_EXIST);
            response.setRemark("topic[" + requestHeader.getTopic() + "] not exist,apply first please!"
                + FAQUrl.suggestTodo(FAQUrl.APPLY_TOPIC_URL));
            return response;
        }
    }

    int queueIdInt = requestHeader.getQueueId();
    int idValid = Math.max(topicConfig.getWriteQueueNums(),topicConfig.getReadQueueNums());
    if (queueIdInt >= idValid) {
        String errorInfo = String.format("request queueId[%d] is illegal,%s Producer: %s",queueIdInt,topicConfig.toString(),RemotingHelper.parseChannelRemoteAddr(ctx.channel()));

        log.warn(errorInfo);
        response.setCode(ResponseCode.SYSTEM_ERROR);
        response.setRemark(errorInfo);

        return response;
    }
    return response;
}
複製程式碼
  1. 如果訊息重試次數超過允許的最大重試次數,訊息將進入DLD延遲佇列
  2. 呼叫DefaultMessageStore::putMessage進行訊息儲存

4.3.2 非同步傳送

非同步傳送,無須阻塞等待訊息伺服器返回訊息傳送結果,只需要提供一個回撥函式供訊息傳送客戶端在收到響應結果回撥。非同步方式相比同步傳送,傳送端的傳送效能提高了不少。

4.3.3 單向傳送

單向傳送,無須等待結果,也無須提供回撥函式,訊息傳送端壓根不關心訊息是否傳送成功,原理和非同步傳送相同,只是訊息傳送端收到結果後什麼也不做。

4.3.4 批量傳送

批量訊息傳送是將同一主題的多條資訊一起打包傳送給訊息服務端,減少網路呼叫次數,提高網路傳輸速率。

單條訊息傳送時,訊息體的內容將儲存在body中,批量訊息傳送,需要將多條訊息的內容儲存在body中,RocketMQ 對多條訊息內容進行固定格式進行儲存。

訊息格式

批量傳送:

//DefaultMQProducer::send
public SendResult send(
    Collection<Message> msgs) throws MQClientException,InterruptedException {
    return this.defaultMQProducerImpl.send(batch(msgs));
}
複製程式碼

**傳送流程:**首先在訊息傳送端,呼叫batch方法,將一批訊息封裝成MessageBatch物件,MessageBatch內部持有Listmessages,這樣批量傳送就和單條傳送流程完全一樣了。

MessageUML

循跡一下:

//DefaultMQProducer::send
public SendResult send(
    Collection<Message> msgs) throws MQClientException,InterruptedException {
    return this.defaultMQProducerImpl.send(batch(msgs));
}


//DefaultMQProducer::batch
private MessageBatch batch(Collection<Message> msgs) throws MQClientException {
    MessageBatch msgBatch;
    try {
        msgBatch = MessageBatch.generateFromList(msgs);
        for (Message message : msgBatch) {
            Validators.checkMessage(message,this);
            MessageClientIDSetter.setUniqID(message);
            message.setTopic(withNamespace(message.getTopic()));
        }
        msgBatch.setBody(msgBatch.encode());
    } catch (Exception e) {
        throw new MQClientException("Failed to initiate the MessageBatch",e);
    }
    msgBatch.setTopic(withNamespace(msgBatch.getTopic()));
    return msgBatch;
}


//DefaultMQProducerImpl::send
public SendResult send(
    Message msg) throws MQClientException,InterruptedException {
    return send(msg,this.defaultMQProducer.getSendMsgTimeout());
}
複製程式碼

5. 訊息儲存

業務系統大多需要MQ有持久儲存的能力,能大大增加系統的高可用性。

我們先看看rocketmq 資料流向:

資料流向

  • CommitLog:訊息儲存檔案,所有訊息主題的訊息都儲存在CommitLog檔案中
  • ConsumeQueue:訊息消費佇列,訊息到達CommitLog檔案後,將非同步轉發到訊息消費佇列,供訊息消費者消費
  • IndexFile:訊息索引檔案,主要儲存訊息Key與Offset的對應關係
  • 事務狀態服務:儲存每條訊息的事務狀態
  • 定時訊息服務:每一個延遲級別對應一個訊息消費佇列,儲存延遲佇列的訊息拉去進度

RocketMQ的儲存架構:

RocketMQ的儲存架構

訊息儲存實現類: org.apache.rocketmq.store.DefaultMessageStore

DefaultMessageStore

介紹核心屬性:

  • MessageStoreConfig messageStoreConfig:訊息儲存配置屬性
  • CommitLog commitLog:CommitLog 檔案儲存的實現類
  • ConcurrentMap<String/* topic /,ConcurrentMap<Integer/ queueId */,ConsumeQueue>> consumeQueueTable :訊息佇列儲存快取表,按訊息主題分組
  • FlushConsumeQueueService flushConsumeQueueService:訊息佇列檔案ConsumeQueue刷盤執行緒
  • CleanCommitLogService cleanCommitLogService:清除CommitLog問價服務
  • CleanConsumeQueueService cleanConsumeQueueService:清除ConsumeQueue檔案服務
  • IndexService indexService:索引檔案實現類
  • AllocateMappedFileService allocateMappedFileService:MappedFile分配服務
  • ReputMessageService reputMessageService:CommitLog訊息分發,根據CommitLog檔案構建ConsumeQueue、IndexFile檔案
  • HAService haService:儲存HA機制
  • TransientStorePool transientStorePool:訊息堆記憶體快取
  • MessageArrivingListener messageArrivingListener:訊息拉取長輪詢模式訊息達到監聽器
  • BrokerConfig brokerConfig:Broker配置屬性
  • StoreCheckpoint storeCheckpoint:檔案刷盤檢測點
  • LinkedList dispatcherList:CommitLog檔案轉發請求

5.1 訊息傳送儲存流程

訊息儲存入口:org.apache.rocketmq.store.DefaultMessageStore::putMessage

  1. 如果當前Broker停止工作或Broker為SLAVE 角色或當前Rocket不支援寫入則拒絕訊息寫入,如果訊息長度超過256字元、訊息屬性長度超過65536個字元將拒絕該訊息寫入
  2. 驗證訊息延遲級別
  3. 獲取當前可以寫入的CommitLog檔案
  4. 寫入CommitLog之前,先申請putMessageLock,也就是將訊息儲存到CommitLog檔案中是序列
  5. 設計訊息的儲存時間
  6. 將訊息追加到MappedFile中
  7. 建立全域性唯一訊息ID
  8. 獲取該訊息在訊息佇列的偏移量
  9. 根據訊息體的長度、主題的長度、屬性的長度結合訊息儲存格式計算訊息的總長度
  10. 如果訊息長度 +END_FILE_MIN_BLANK_LENGTH 大於CommitLog檔案
  11. 將訊息記憶體儲存到ByteBuffer中,然後建立AppendMessageResult。
  12. 更新訊息佇列邏輯偏移量
  13. 處理完訊息追加邏輯後將釋放putMessageLock鎖
  14. DefaultAppendMessageCallback::doAppend 只是將訊息追加到記憶體中,需要根據同步刷盤還是非同步刷盤方式,將記憶體中的資料持久化到磁碟

簡化成如下時序圖

5.2 記憶體對映流程

RocketMQ通過使用記憶體對映檔案來提高IO訪問效能,無論是CommitLog、ConsumeQueue還是IndexFile,單個檔案都被設計為固定長度,如果一個檔案寫滿後再建立一個新檔案,檔名就為第一條訊息對應的全域性物力偏移量。

記憶體對映流程

步驟:

  1. 記憶體對映檔案MappedFile通過AllocateMappedFileService建立
  2. MappedFile的建立是典型的生產者-消費者模型
  3. MappedFileQueue呼叫getLastMappedFile獲取MappedFile時,將請求放入佇列中
  4. AllocateMappedFileService執行緒持續監聽佇列,佇列有請求時,創建出MappedFile物件
  5. 最後將MappedFile物件預熱,底層呼叫force方法和mlock方法

5.3 刷盤流程

訊息在呼叫MapedFile的appendMessage後,也只是將訊息裝載到了ByteBuffer中,也就是記憶體中,還沒有落盤。落盤需要將記憶體flush到磁碟上,針對commitLog,rocketMQ提供了兩種落盤方式。

刷盤流程

  • producer傳送給broker的訊息儲存在MappedFile中,然後通過刷盤機制同步到磁碟中
  • 刷盤分為同步刷盤和非同步刷盤
  • 非同步刷盤後臺執行緒按一定時間間隔執行
  • 同步刷盤也是生產者-消費者模型。broker儲存訊息到MappedFile後,建立GroupCommitRequest請求放入列表,並阻塞等待。後臺執行緒從列表中獲取請求並重新整理磁碟,成功刷盤後通知等待執行緒。

同步刷盤(CommitLog.java)

//封裝的一次刷盤請求
public static class GroupCommitRequest {
    //這次請求要刷到的offSet位置,比如已經刷到2,
    private final long nextOffset;
    //控制flush的拴
    private final CountDownLatch countDownLatch = new CountDownLatch(1);
    private volatile boolean flushOK = false;


    public GroupCommitRequest(long nextOffset) {
        this.nextOffset = nextOffset;
    }


    public long getNextOffset() {
        return nextOffset;
    }

    //刷完了喚醒
    public void wakeupCustomer(final boolean flushOK) {
        this.flushOK = flushOK;
        this.countDownLatch.countDown();
    }


    public boolean waitForFlush(long timeout) {
        try {
            this.countDownLatch.await(timeout,TimeUnit.MILLISECONDS);
            return this.flushOK;
        } catch (InterruptedException e) {
            e.printStackTrace();
            return false;
        }
    }
}

/**
    * GroupCommit Service
    * 批量刷盤服務
    */
class GroupCommitService extends FlushCommitLogService {
    //用來接收訊息的佇列,提供寫訊息
    private volatile List<GroupCommitRequest> requestsWrite = new ArrayList<GroupCommitRequest>();
    //用來讀訊息的佇列,將訊息從記憶體讀到硬碟
    private volatile List<GroupCommitRequest> requestsRead = new ArrayList<GroupCommitRequest>();

    //新增一個刷盤的request
    public void putRequest(final GroupCommitRequest request) {
        synchronized (this) {
            //新增到寫訊息的list中
            this.requestsWrite.add(request);
            //喚醒其他執行緒
            if (!this.hasNotified) {
                this.hasNotified = true;
                this.notify();
            }
        }
    }

    //交換讀寫佇列,避免上鎖
    private void swapRequests() {
        List<GroupCommitRequest> tmp = this.requestsWrite;
        this.requestsWrite = this.requestsRead;
        this.requestsRead = tmp;
    }


    private void doCommit() {
        //讀佇列不為空
        if (!this.requestsRead.isEmpty()) {
            //遍歷
            for (GroupCommitRequest req : this.requestsRead) {
                // There may be a message in the next file,so a maximum of
                // two times the flush
                boolean flushOK = false;
                for (int i = 0; (i < 2) && !flushOK; i++) {
                    //
                    flushOK = (CommitLog.this.mapedFileQueue.getCommittedWhere() >= req.getNextOffset());
                    //如果沒刷完 即flushOK為false則繼續刷
                    if (!flushOK) {
                        CommitLog.this.mapedFileQueue.commit(0);
                    }
                }
                //刷完了喚醒
                req.wakeupCustomer(flushOK);
            }

            long storeTimestamp = CommitLog.this.mapedFileQueue.getStoreTimestamp();
            if (storeTimestamp > 0) {
                CommitLog.this.defaultMessageStore.getStoreCheckpoint().setPhysicMsgTimestamp(storeTimestamp);
            }
            //清空讀list
            this.requestsRead.clear();
        } else {
            // Because of individual messages is set to not sync flush,it
            // will come to this process
            CommitLog.this.mapedFileQueue.commit(0);
        }
    }


    public void run() {
        CommitLog.log.info(this.getServiceName() + " service started");

        while (!this.isStoped()) {
            try {
                this.waitForRunning(0);
                this.doCommit();
            } catch (Exception e) {
                CommitLog.log.warn(this.getServiceName() + " service has exception. ",e);
            }
        }

        // Under normal circumstances shutdown,wait for the arrival of the
        // request,and then flush
        //正常關閉時要把沒刷完的刷完
        try {
            Thread.sleep(10);
        } catch (InterruptedException e) {
            CommitLog.log.warn("GroupCommitService Exception,",e);
        }

        synchronized (this) {
            this.swapRequests();
        }

        this.doCommit();

        CommitLog.log.info(this.getServiceName() + " service end");
    }
    }
複製程式碼

非同步刷盤(CommitLog.java)

public void run() {
    CommitLog.log.info(this.getServiceName() + " service started");
    //不停輪詢
    while (!this.isStoped()) {
        boolean flushCommitLogTimed = CommitLog.this.defaultMessageStore.getMessageStoreConfig().isFlushCommitLogTimed();

        int interval = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushIntervalCommitLog();
        //拿到要刷盤的頁數
        int flushPhysicQueueLeastPages = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushCommitLogLeastPages();

        int flushPhysicQueueThoroughInterval =
                CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushCommitLogThoroughInterval();

        boolean printFlushProgress = false;

        // Print flush progress
        long currentTimeMillis = System.currentTimeMillis();
        //控制刷盤間隔,如果當前的時間還沒到刷盤的間隔時間則不刷
        if (currentTimeMillis >= (this.lastFlushTimestamp + flushPhysicQueueThoroughInterval)) {
            this.lastFlushTimestamp = currentTimeMillis;
            flushPhysicQueueLeastPages = 0;
            printFlushProgress = ((printTimes++ % 10) == 0);
        }

        try {
            //是否需要刷盤休眠
            if (flushCommitLogTimed) {
                Thread.sleep(interval);
            } else {
                this.waitForRunning(interval);
            }

            if (printFlushProgress) {
                this.printFlushProgress();
            }
            //commit開始刷盤
            CommitLog.this.mapedFileQueue.commit(flushPhysicQueueLeastPages);
            long storeTimestamp = CommitLog.this.mapedFileQueue.getStoreTimestamp();
            if (storeTimestamp > 0) {
                CommitLog.this.defaultMessageStore.getStoreCheckpoint().setPhysicMsgTimestamp(storeTimestamp);
            }
        } catch (Exception e) {
            CommitLog.log.warn(this.getServiceName() + " service has exception. ",e);
            this.printFlushProgress();
        }
    }

    // Normal shutdown,to ensure that all the flush before exit
    boolean result = false;
    for (int i = 0; i < RetryTimesOver && !result; i++) {
        result = CommitLog.this.mapedFileQueue.commit(0);
        CommitLog.log.info(this.getServiceName() + " service shutdown,retry " + (i + 1) + " times " + (result ? "OK" : "Not OK"));
    }

    this.printFlushProgress();

    CommitLog.log.info(this.getServiceName() + " service end");
}
複製程式碼

6. 小結&參考資料

小結

訊息傳送流程圖:

訊息傳送流程圖

訊息儲存流程圖:

RocketMQ/16b3c679bf8c166c%201.jpg

參考資料