1. 程式人生 > 實用技巧 >RocketMQ原始碼分析 consumer消費,併發、順序、延時、事務訊息總結

RocketMQ原始碼分析 consumer消費,併發、順序、延時、事務訊息總結

目錄

1.消費客戶端啟動流程

先貼下consume client啟動的流程圖

消費端啟動和producer啟動很類似,可以和producer啟動進行對比。

不同之處是消費端的PullMessageService、RebalanceService才有真正作用,而producer該兩個服務執行緒是無用的,這兩個服務執行緒也是消費端的核心。

2.消費佇列負載均衡RebalanceService

先貼總體流程圖

消費端訊息佇列負載的核心功能方法是org.apache.rocketmq.client.impl.consumer.RebalanceImpl.updateProcessQueueTableInRebalance(String, Set, boolean),只解釋該方法,其餘方法看流程圖看程式碼就很容易明白。

傳入引數Set是經過負載後分配給當前消費端的mq集合,boolean表示是順序消費true,併發消費false。

看程式碼註釋參考

 /*
 * 消費端重新負載的核心方法
 * 傳入引數:mqSet即分配給該消費者的佇列, isOrder為false表示非順序訊息
 * 功能就是更新處理器佇列集合RebalanceImpl.processQueueTable
 */
private boolean updateProcessQueueTableInRebalance(final String topic, final Set<MessageQueue> mqSet,
    final boolean isOrder) {
    boolean changed = false;
 
    Iterator<Entry<MessageQueue, ProcessQueue>> it = this.processQueueTable.entrySet().iterator();
    while (it.hasNext()) {
        Entry<MessageQueue, ProcessQueue> next = it.next();
        MessageQueue mq = next.getKey();
        ProcessQueue pq = next.getValue();
 
        if (mq.getTopic().equals(topic)) {
            if (!mqSet.contains(mq)) {//比如減少or新增了消費端,分配給當前消費端的MessageQueue變化了了,那麼可能原來的MessageQueue就不在當前重新負載後的mqSet
                pq.setDropped(true);//丟棄該ProcessQueue,那麼在拉取消費的時候就不會該ProcessQueue進行處理
                if (this.removeUnnecessaryMessageQueue(mq, pq)) {//把該mq的客戶端消費offset更新到broker儲存,移除客戶端該mq的消費offset記錄,如果是順序消費則到broker解鎖mq
                    it.remove();//移除
                    changed = true;
                    log.info("doRebalance, {}, remove unnecessary mq, {}", consumerGroup, mq);
                }
            } else if (pq.isPullExpired()) {//拉取失效,
                switch (this.consumeType()) {
                    case CONSUME_ACTIVELY:
                        break;
                    case CONSUME_PASSIVELY://push走這裡
                        pq.setDropped(true);//把ProcessQueue置為失效,這樣在PullService執行緒拉取的時候該物件是失效狀態,就不再拉取該物件
                        if (this.removeUnnecessaryMessageQueue(mq, pq)) {
                            it.remove();
                            changed = true;
                            log.error("[BUG]doRebalance, {}, remove unnecessary mq, {}, because pull is pause, so try to fixed it",
                                consumerGroup, mq);
                        }
                        break;
                    default:
                        break;
                }
            }
        }
    }//end while
 
    List<PullRequest> pullRequestList = new ArrayList<PullRequest>();
    for (MessageQueue mq : mqSet) {
        if (!this.processQueueTable.containsKey(mq)) {//說明該mq是本次負載新增的
            if (isOrder && !this.lock(mq)) {//順序消費到broker加鎖該MessageQueue
                log.warn("doRebalance, {}, add a new mq failed, {}, because lock failed", consumerGroup, mq);
                continue;
            }
 
            this.removeDirtyOffset(mq);//消費客戶端移除該mq的消費offset
            ProcessQueue pq = new ProcessQueue();
            long nextOffset = this.computePullFromWhere(mq);//向broker傳送命令QUERY_CONSUMER_OFFSET獲取broker端記錄的該mq的消費offset
            if (nextOffset >= 0) {
                ProcessQueue pre = this.processQueueTable.putIfAbsent(mq, pq);
                if (pre != null) {
                    log.info("doRebalance, {}, mq already exists, {}", consumerGroup, mq);
                } else {//不存在
                    log.info("doRebalance, {}, add a new mq, {}", consumerGroup, mq);
                    PullRequest pullRequest = new PullRequest();
                    pullRequest.setConsumerGroup(consumerGroup);
                    pullRequest.setNextOffset(nextOffset);//該messagequeue在broker端的消費位置
                    pullRequest.setMessageQueue(mq);
                    pullRequest.setProcessQueue(pq);
                    pullRequestList.add(pullRequest);
                    changed = true;
                }
            } else {
                log.warn("doRebalance, {}, add new mq failed, {}", consumerGroup, mq);
            }
        }
    }
 
    this.dispatchPullRequest(pullRequestList);//遍歷pullRequestList集合,吧pullRequest物件新增到PullMessageService服務執行緒的阻塞佇列內供PullMessageService拉取執行
 
    return changed;
}

3.消費拉取PullMessageService流程

圖片太大,如果顯示不了,備份地址為https://gitee.com/yulewo123/mdpicture/raw/master/docs/20201220141543.jpg

訊息拉取是非同步方式,總共涉及到三個回撥

第一個回撥:netty io通過網路把資料傳送出去,即傳送成功,執行netty io的監聽器NettyRemotingAbstract$4,傳送成功設定ResponseFuture.sendRequestOK=true,傳送失敗,則把ResponseFuture從NettyRemotingAbstract.responseTable集合移除。

第二個回撥:InvokeCallback的回撥,即MQClientAPIImpl$2,該操作是在pull到訊息or超時由掃描發起,入口是ResponseFuture.executeInvokeCallback(),繼而執行MQClientAPIImpl$2.operationComplete(ResponseFuture),目的就是為了執行PullCallback

第三個回撥:PullCallback回撥,執行拉取訊息成功後的回撥,DefaultMQPushConsumerImpl$1.onSuccess(PullResult pullResult),或者執行異常回調DefaultMQPushConsumerImpl$1.onException(Throwable e)

程式碼和註釋如下

PullCallback pullCallback = new PullCallback() {//DefaultMQPushConsumerImpl$1
    	/*
    	 * 功能就是把拉取到的訊息儲存到processqueue上,然後進行客戶端實際業務消費,最後把pullRequest重新新增到阻塞佇列供pullmessageservice服務執行緒重新拉取
    	 */
        @Override
        public void onSuccess(PullResult pullResult) {
            if (pullResult != null) {
                pullResult = DefaultMQPushConsumerImpl.this.pullAPIWrapper.processPullResult(pullRequest.getMessageQueue(), pullResult,
                    subscriptionData);//解碼拉取到的訊息,填充pullResult物件,把解碼的訊息儲存到PullResult.msgFoundList
 
                switch (pullResult.getPullStatus()) {
                    case FOUND://訊息拉取結果,訊息拉取到了
                        long prevRequestOffset = pullRequest.getNextOffset();//拉取到的訊息的位置,相對於consumer queue
                        pullRequest.setNextOffset(pullResult.getNextBeginOffset());//下次待拉取的訊息在consumer queue的位置
                        long pullRT = System.currentTimeMillis() - beginTimestamp;
                        DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullRT(pullRequest.getConsumerGroup(),
                            pullRequest.getMessageQueue().getTopic(), pullRT);//統計拉取訊息的responsetime
 
                        long firstMsgOffset = Long.MAX_VALUE;
                        if (pullResult.getMsgFoundList() == null || pullResult.getMsgFoundList().isEmpty()) {//msgFoundList為空說明沒有拉取到訊息
                            DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);//沒拉取到訊息的情況下,把pullRequest重新放入到pullservice的佇列再次拉取
                        } else {
                            firstMsgOffset = pullResult.getMsgFoundList().get(0).getQueueOffset();//拉取到的訊息中的第一個訊息在commitlog的位置
 
                            DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullTPS(pullRequest.getConsumerGroup(),
                                pullRequest.getMessageQueue().getTopic(), pullResult.getMsgFoundList().size());//統計tps
 
														//把拉取到的32條訊息儲存到ProcessQueue.msgTreeMap
                            boolean dispatchToConsume = processQueue.putMessage(pullResult.getMsgFoundList());
                            DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(
                                pullResult.getMsgFoundList(),
                                processQueue,
                                pullRequest.getMessageQueue(),
                                dispatchToConsume);//客戶端消費併發執行 ConsumeRequest.run()
 
                            //把PullRequest重新儲存到PullMessageService.pullRequestQueue阻塞佇列,供消費執行緒繼續執行訊息拉取
                            if (DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval() > 0) {
                                DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest,
                                    DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval());
                            } else {
                                DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);//把PullRequest重新儲存到PullMessageService.pullRequestQueue阻塞佇列,供消費執行緒繼續執行訊息拉取
                            }
                        }
 
                        if (pullResult.getNextBeginOffset() < prevRequestOffset
                            || firstMsgOffset < prevRequestOffset) {
                            log.warn(
                                "[BUG] pull message result maybe data wrong, nextBeginOffset: {} firstMsgOffset: {} prevRequestOffset: {}",
                                pullResult.getNextBeginOffset(),
                                firstMsgOffset,
                                prevRequestOffset);
                        }
 
                        break;
                    case NO_NEW_MSG://未拉取到訊息
                        pullRequest.setNextOffset(pullResult.getNextBeginOffset());//拉取下一個新的offset
 
                        DefaultMQPushConsumerImpl.this.correctTagsOffset(pullRequest);//本次拉取到的訊息總size==0,則更新消費端本地的offset
 
                        DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);//把pullRequest重新儲存到pullmessageservice的阻塞佇列供拉取執行緒重新執行
                        break;
                    case NO_MATCHED_MSG://訊息拉取到了但是不匹配tag,broker進行tag過濾
                        pullRequest.setNextOffset(pullResult.getNextBeginOffset());//拉取下一個新的offset
 
                        DefaultMQPushConsumerImpl.this.correctTagsOffset(pullRequest);
 
                        DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);//把pullRequest重新儲存到pullmessageservice的阻塞佇列供拉取執行緒重新執行
                        break;
                    case OFFSET_ILLEGAL://offset非法,那麼該pullRequest不會被重新進行拉取
                        log.warn("the pull request offset illegal, {} {}",
                            pullRequest.toString(), pullResult.toString());
                        pullRequest.setNextOffset(pullResult.getNextBeginOffset());//拉取下一個新的offset
 
                        pullRequest.getProcessQueue().setDropped(true);//拋棄processqueue
                        DefaultMQPushConsumerImpl.this.executeTaskLater(new Runnable() {
 
                            @Override
                            public void run() {
                                try {
                                    DefaultMQPushConsumerImpl.this.offsetStore.updateOffset(pullRequest.getMessageQueue(),
                                        pullRequest.getNextOffset(), false);//更新消費端本地的offset到RemoteBrokerOffsetStore.offsetTable
 
                                    DefaultMQPushConsumerImpl.this.offsetStore.persist(pullRequest.getMessageQueue());//把當前mq的消費offset更新儲存到broker
 
                                    DefaultMQPushConsumerImpl.this.rebalanceImpl.removeProcessQueue(pullRequest.getMessageQueue());//從處理佇列集合移除該processqueue
 
                                    log.warn("fix the pull request offset, {}", pullRequest);
                                } catch (Throwable e) {
                                    log.error("executeTaskLater Exception", e);
                                }
                            }
                        }, 10000);
                        break;
                    default:
                        break;
                }
            }
        }
 
        @Override
        public void onException(Throwable e) {
            if (!pullRequest.getMessageQueue().getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
                log.warn("execute the pull request exception", e);
            }
 
            DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION);
        }
    };//PullCallback end

PullCallback執行,即DefaultMQPushConsumerImpl$1執行把拉取的訊息儲存到MessageQueue對應的處理佇列ProcessQueue,然後由消費客戶端進行消費,分併發消費和順序消費

3.1.併發消費

併發消費入口是在pullcallback內,org.apache.rocketmq.client.impl.consumer.ConsumeMessageConcurrentlyService.submitConsumeRequest(List, ProcessQueue, MessageQueue, boolean),功能就是把拉取到的每個訊息包裝為task ConsumeRequest,然後丟入到消費端執行緒池進行併發消費

@Override
public void submitConsumeRequest(
    final List<MessageExt> msgs,//拉取到的訊息集合
    final ProcessQueue processQueue,
    final MessageQueue messageQueue,
    final boolean dispatchToConsume) {
    final int consumeBatchSize = this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize();//程式碼@1 批量消費的數量,預設1
    if (msgs.size() <= consumeBatchSize) {//訊息只拉取到一個
        ConsumeRequest consumeRequest = new ConsumeRequest(msgs, processQueue, messageQueue);
        try {
            this.consumeExecutor.submit(consumeRequest);
        } catch (RejectedExecutionException e) {
            this.submitConsumeRequestLater(consumeRequest);
        }
    } else {//訊息預設拉取到32條
        for (int total = 0; total < msgs.size(); ) {//程式碼@2
            List<MessageExt> msgThis = new ArrayList<MessageExt>(consumeBatchSize);
            for (int i = 0; i < consumeBatchSize; i++, total++) {//程式碼@3
                if (total < msgs.size()) {
                    msgThis.add(msgs.get(total));
                } else {
                    break;
                }
            }
 
            ConsumeRequest consumeRequest = new ConsumeRequest(msgThis, processQueue, messageQueue);//程式碼@4
            try {
                this.consumeExecutor.submit(consumeRequest);//程式碼@5 每個consumeRequest丟入到執行緒池處理,那麼就併發消費這拉取到的32個訊息了
            } catch (RejectedExecutionException e) {//如果消費端的速度跟不上,導致消費執行緒池reject,則進行批量消費
                for (; total < msgs.size(); total++) {//程式碼@6
                    msgThis.add(msgs.get(total));
                }
 
                this.submitConsumeRequestLater(consumeRequest);//程式碼@6
            }
        }
    }
}

解釋說明:

程式碼@1:獲取消費客戶端的預設單次消費訊息的個數,預設是1,可以設定DefaultMQPushConsumer.setConsumeMessageBatchMaxSize(int)設定為批量消費。

程式碼@2:遍歷訊息的數量

程式碼@3:按照消費的批次個數設定一個消費執行緒要消費的訊息集合,預設是1個訊息

程式碼@4:待消費的訊息集合(預設一條訊息)、處理佇列、訊息佇列包裝為task物件ConsumeMessageConcurrentlyService.ConsumeRequest

程式碼@5:把task物件丟入到消費執行緒池處理,這是多個執行緒併發執行,因此叫併發消費。消費端的執行緒池是ConsumeMessageConcurrentlyService.consumeExecutor,預設是20~64個消費執行緒,如果業務程式碼消費訊息速度慢,可以在消費客戶端進行設定較大的消費執行緒池。

程式碼@6:消費速度跟不上拉取速度導致消費執行緒池報reject,則單個訊息消費變為批量消費,遇到這樣問題,就需要調整消費客戶端的消費執行緒池了,或者檢視客戶端消費速度慢的原因。因此在客戶端消費的業務程式碼內,不能只是msgs.get(0)處理,要進行遍歷處理。

下面看併發消費的具體邏輯org.apache.rocketmq.client.impl.consumer.ConsumeMessageConcurrentlyService.ConsumeRequest.run()

@Override
public void run() {
    if (this.processQueue.isDropped()) {//pq被拋棄,則不執行實際業務邏輯消費,被拋棄的原因比如消費端發生變化,rebalance執行緒重新負載了
        log.info("the message queue not be able to consume, because it's dropped. group={} {}", ConsumeMessageConcurrentlyService.this.consumerGroup, this.messageQueue);
        return;
    }
 
    MessageListenerConcurrently listener = ConsumeMessageConcurrentlyService.this.messageListener;//業務消費中的消費監聽器 ack機制
    ConsumeConcurrentlyContext context = new ConsumeConcurrentlyContext(messageQueue);
    ConsumeConcurrentlyStatus status = null;
    
    //忽略鉤子方法
    
    long beginTimestamp = System.currentTimeMillis();
    boolean hasException = false;
    ConsumeReturnType returnType = ConsumeReturnType.SUCCESS;
    try {
        ConsumeMessageConcurrentlyService.this.resetRetryTopic(msgs);//如果msg的topic是%RETRY%,則說明是消費失敗的重發訊息,更新msg的topic為原始topic
        if (msgs != null && !msgs.isEmpty()) {
            for (MessageExt msg : msgs) {
                MessageAccessor.setConsumeStartTimeStamp(msg, String.valueOf(System.currentTimeMillis()));//更新msg的CONSUME_START_TIME屬性為當前時間戳
                /*
                 * 為什麼要設定當前時間戳呢?是因為防止訊息超過60s還沒被消費,在消費客戶端啟動的時候啟動一個計劃執行緒,每15s執行一次ConsumeMessageConcurrentlyService.cleanExpireMsg(),
                 * 功能就是遍歷ProcessQueue中儲存的訊息集合,如果第一條訊息的CONSUME_START_TIME距離當前時間戳超過了60s,則從pq上移除,並回發到broker。
                 */
            }
        }
        status = listener.consumeMessage(Collections.unmodifiableList(msgs), context);//業務程式碼執行訊息消費
    } catch (Throwable e) {
        log.warn("consumeMessage exception: {} Group: {} Msgs: {} MQ: {}",
            RemotingHelper.exceptionSimpleDesc(e),
            ConsumeMessageConcurrentlyService.this.consumerGroup,
            msgs,
            messageQueue);
        hasException = true;
    }
    
    //忽略不重要程式碼
    
    if (!processQueue.isDropped()) {//processQueue未拋棄
    	/*
    	 * 處理消費結果
    	 */
        ConsumeMessageConcurrentlyService.this.processConsumeResult(status, context, this);
    } else {
        log.warn("processQueue is dropped without process consume result. messageQueue={}, msgs={}", messageQueue, msgs);
    }
}

run()方法程式碼分三步

step1:訊息主題是%RETRY%,則恢復訊息topic

step2:執行業務程式碼消費

step3:pq未被廢棄,處理消費結果,這個是核心方法,下面看這個方法

public void processConsumeResult(
        final ConsumeConcurrentlyStatus status,
        final ConsumeConcurrentlyContext context,
        final ConsumeRequest consumeRequest
    ) {
        int ackIndex = context.getAckIndex();//預設是Integer.MAX_VALUE
 
        if (consumeRequest.getMsgs().isEmpty())//待消費的訊息是空,則不處理
            return;
 
        switch (status) {
            case CONSUME_SUCCESS:
                if (ackIndex >= consumeRequest.getMsgs().size()) {
                    ackIndex = consumeRequest.getMsgs().size() - 1;//消費成功,ackIndex賦值為消費的訊息條數-1,即通常是消費單個訊息,那麼就是0
                }
                int ok = ackIndex + 1;//消費單個訊息情況是1,批量消費是本次run()執行消費的訊息條數
                int failed = consumeRequest.getMsgs().size() - ok;//失敗是0
                // 統計成功/失敗數量
                this.getConsumerStatsManager().incConsumeOKTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), ok);
                this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), failed);
                break;
            case RECONSUME_LATER:
                ackIndex = -1;
                this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(),
                    consumeRequest.getMsgs().size());
                break;
            default:
                break;
        }
 
        switch (this.defaultMQPushConsumer.getMessageModel()) {
            case BROADCASTING:
                for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) {
                    MessageExt msg = consumeRequest.getMsgs().get(i);
                    log.warn("BROADCASTING, the message consume failed, drop it, {}", msg.toString());
                }
                break;
            case CLUSTERING:
                List<MessageExt> msgBackFailed = new ArrayList<MessageExt>(consumeRequest.getMsgs().size());
                //在批量消費中如果設定了ConsumeConcurrentlyContext.ackIndex,那麼就會從失敗處開始重複消費,而非從該批量的開始重複消費
                /*
                 * 對於單條消費且消費成功,ackIndex=0,那麼i=1開始,則不進入for迴圈
                 * 對於批量消費且消費成功,ackIndex=消費條數,那麼i從消費的訊息條數開始,因此也不進入for迴圈
                 * 因此對於消費成功,無論單條消費or批量消費,都不進入for迴圈
                 * 
                 * 對於單條消費且消費失敗,ackIndex=-1,那麼i=0開始,則進入for迴圈,迴圈一次
                 * 對於批量消費且消費失敗,ackIndex=-1,那麼i=0開始,則進入for迴圈,迴圈次數為訊息的總條數。
                 * 
                 * 問題:那麼對於批量消費,比如32條,那麼消費到第32條的時候消費失敗了,那麼這次消費的訊息要全部回發到broker,
                 * 	然後消費端又重新消費了前面31條,這樣是不好的,可否有從消費失敗處回發呢?可以的,在業務程式碼內設定
                 * 	ConsumeConcurrentlyContext.setAckIndex(int)即可,設定為消費失敗的位置,這樣
                 * 	消費失敗就會從消費失敗的訊息位置進行回發到broker繼而被消費端消費,就避免了批量消費重複消費成功的訊息。
                 * 
                 */
                for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) {//消費成功不走這裡,消費失敗走這裡
                    MessageExt msg = consumeRequest.getMsgs().get(i);
                    boolean result = this.sendMessageBack(msg, context);//訊息回發到broker,主題是%RETRY%+topicName,result為true表示回發成功
                    if (!result) {//回發broker失敗
                        msg.setReconsumeTimes(msg.getReconsumeTimes() + 1);//設定msg的重複消費次數+1
                        msgBackFailed.add(msg);//儲存該msg到msgBackFailed,供下面重新消費
                    }
                }
 
                if (!msgBackFailed.isEmpty()) {//重發訊息傳送到broker失敗的情況下
                    consumeRequest.getMsgs().removeAll(msgBackFailed);//把消費失敗的訊息從consumeRequest移除,這裡對應程式碼@1
 
                    //消費task重新執行消費失敗且回發到broker失敗的訊息,延時5s執行
                    this.submitConsumeRequestLater(msgBackFailed, consumeRequest.getProcessQueue(), consumeRequest.getMessageQueue());
                }
                break;
            default:
                break;
        }
        
        //把消費成功的訊息從ProcessQueue移除,並返回該批訊息的最小offset
        long offset = consumeRequest.getProcessQueue().removeMessage(consumeRequest.getMsgs());//程式碼@1
        if (offset >= 0 && !consumeRequest.getProcessQueue().isDropped()) {		//如果ProcessQueue失效了(在Reblance執行緒中pull動作超過120s置為失效),那麼就更新consumerqueue物件的offset更新為消費前offset,這樣做就是表示了失敗從頭開始
            this.defaultMQPushConsumerImpl.getOffsetStore().updateOffset(consumeRequest.getMessageQueue(), offset, true);//這裡就會導致重複消費
        }
    }

程式碼加的很清晰了,而且對於批量消費失敗消費從消費失敗位置如何進行回發也說明了。removceMessage方法返回pq.msgTreeMap上儲存的訊息最小的offset,然後在updateOffset操作內把offset儲存到消費客戶端RemoteBrokerOffsetStore.offsetTable,消費客戶端記錄的消費offset線上程Thread [MQClientFactoryScheduledThread]每5s執行MQClientInstance.persistAllConsumerOffset()內儲存到broker,傳送命令是UPDATE_CONSUMER_OFFSET。

processConsumeResult方法的核心功能是for迴圈和程式碼@1處removeMessage,下面看程式碼@1

public long removeMessage(final List<MessageExt> msgs) {
        long result = -1;
        final long now = System.currentTimeMillis();
        try {
            this.lockTreeMap.writeLock().lockInterruptibly();//加寫鎖,因為要對紅黑樹進行寫操作
            this.lastConsumeTimestamp = now;
            try {
                if (!msgTreeMap.isEmpty()) {//msgTreeMap是每次pull到訊息後儲存的本次pull到的訊息(預設一次拉取32條訊息)
                    result = this.queueOffsetMax + 1;//程式碼@2	this.queueOffsetMax儲存的是拉取到的32條訊息中offset最大的,在ProcessQueue.putMessage(List<MessageExt>)設定,該方法是拉取到訊息後在PullCallback內呼叫
                    int removedCnt = 0;
                    for (MessageExt msg : msgs) {//遍歷本次消費的訊息集合,通常是一個訊息,因為預設一次消費消費一個訊息
                        MessageExt prev = msgTreeMap.remove(msg.getQueueOffset());//從紅黑樹移除被消費的訊息
                        if (prev != null) {//說明被消費的訊息在pq內
                            removedCnt--;//計數器
                            msgSize.addAndGet(0 - msg.getBody().length);//pq.msgSize減去本次消費的訊息size
                        }
                    }
                    msgCount.addAndGet(removedCnt);//pq.msgCount訊息數量減去被消費的訊息數量
 
                    if (!msgTreeMap.isEmpty()) {//pq上還有訊息,說明拉取到的32條訊息還沒被消費完,則返回拉取到的訊息集合第一個訊息offset,即最小offset最。這也說明了為什麼用紅黑樹儲存拉取到的訊息了,按照訊息的offset排序,這次消費一條訊息,返回最小的offset,這樣避免了訊息丟失(offset大的先被執行消費完畢)
                        result = msgTreeMap.firstKey();//程式碼@1	返回pq上第一個訊息的offset,即最小offset
                    }
                }
            } finally {
                this.lockTreeMap.writeLock().unlock();//finally 釋放鎖
            }
        } catch (Throwable t) {
            log.error("removeMessage exception", t);
        }
 
        return result;
    }

該方法有些難理解,重點是程式碼@1、程式碼@2處,對於每次消費成功,從pq移除該訊息,如果pq還有訊息(多個消費執行緒消費同一個ProcessQueue.msgTreeMap集合上儲存的訊息),那麼返回最小的offset,如果pq上沒有待消費的訊息了,則返回ProcessQueue.queueOffsetMax(該屬性儲存的是一次拉取到的訊息中的max offset),這樣既避免了訊息遺漏的情況,最終又儲存到了消費最大offset的情況。因此完美解決了 對於併發消費,消費msg1,msg2,msg3,它們的offset依次是增加的,在消費成功後,msg3先被消費完,繼而儲存offset的時候還是儲存的msg1的offset,而非msg3.offset,這樣避免了消費時候訊息遺問題,但是會導致有重複消費的可能,當然rmq並不保證重複消費,由業務保證。

FIXME:唯一我不明白的是程式碼@2處result = this.queueOffsetMax + 1;,為什麼要+1呢?我認為是result = this.queueOffsetMax就行了,應該是我理解的這個哪裡有問題?後續明白了更新。解決:該offset不是具體的在consumequeue上的物理偏移量,而是表示在consumequeue上是第幾條訊息,因此需要+1,從下條訊息開始進行訊息拉取。

3.2.順序訊息消費

rocketmq的順序訊息並不是嚴格的順序,只是分割槽順序,把一個生產者產生的訊息按照訊息產生順序存放到同一個mq上,那麼這樣就涉及到傳送的時候對待存放的訊息佇列的選擇了,因此需要實現MessageQueueSelector來選擇要傳送的訊息佇列,其他傳送同普通訊息傳送。順序訊息的定義參考https://help.aliyun.com/document_detail/49319.html?spm=a2c4g.11186623.6.553.4ff06b450u63ex

順序消費的啟動和併發消費的啟動基本相同,在前面的圖已經畫出來了,順序消費ConsumeMessageOrderlyService,task是ConsumeMessageOrderlyService.ConsumeRequest,順序消費主要是要對消費的mq進行加鎖,重新負載後還要對mq解鎖。

在PullMessageService服務執行緒拉取到訊息後,執行PullCallback.onSuccess()時同併發消費一樣把拉取到的訊息儲存到ProcessQueue.msgTreeMap,而後在org.apache.rocketmq.client.impl.consumer.ConsumeMessageOrderlyService.submitConsumeRequest(List, ProcessQueue, MessageQueue, boolean)內把processQueue, messageQueue包裝建立為task ConsumeMessageOrderlyService.ConsumeRequest丟入到順序消費執行緒池(min20 max64)處理,順序消費一個messagequeue只會在一個work執行緒上執行,因此一個消費客戶端對於順序消費是序列執行,不存在併發。

接著看ConsumeMessageOrderlyService.ConsumeRequest的執行邏輯run()方法

//org.apache.rocketmq.client.impl.consumer.ConsumeMessageOrderlyService.ConsumeRequest.run()
public void run() {
    if (this.processQueue.isDropped()) {
        log.warn("run, the message queue not be able to consume, because it's dropped. {}", this.messageQueue);
        return;
    }
 
    final Object objLock = messageQueueLock.fetchLockObject(this.messageQueue);//從快取獲取一個物件,在該物件上同步
    synchronized (objLock) {//加鎖同步
    	/*
    	 * this.processQueue.isLocked()被加鎖了且鎖時間未失效!this.processQueue.isLockExpired(),pq加鎖並設定時間戳是在負載服務執行緒內設定的,還有是在計劃任務ConsumeMessageOrderlyService.lockMQPeriodically()設定
    	 * 廣播消費模式進入執行,or 叢集消費模式且pq在有效加鎖時間內進入
    	 */
        if (MessageModel.BROADCASTING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())
            || (this.processQueue.isLocked() && !this.processQueue.isLockExpired())) {
            final long beginTime = System.currentTimeMillis();
            for (boolean continueConsume = true; continueConsume; ) {
                if (this.processQueue.isDropped()) {//pq失效,則退出for迴圈,不做消費
                    log.warn("the message queue not be able to consume, because it's dropped. {}", this.messageQueue);
                    break;
                }
 
                //忽略不重要程式碼
                
                final int consumeBatchSize =
                    ConsumeMessageOrderlyService.this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize();//預設1,可以設定一次消費的訊息數量
 
								//從pq.msgTreeMap上移除offset最小的consumeBatchSize條訊息返回(預設返回一個訊息),同時把這些訊息儲存到pq.consumingMsgOrderlyTreeMap這個紅黑樹上
                List<MessageExt> msgs = this.processQueue.takeMessags(consumeBatchSize);//順序消費
                if (!msgs.isEmpty()) {
                    final ConsumeOrderlyContext context = new ConsumeOrderlyContext(this.messageQueue);
 
                    ConsumeOrderlyStatus status = null;
 
                    //忽略不重要程式碼
                    
                    long beginTimestamp = System.currentTimeMillis();
                    ConsumeReturnType returnType = ConsumeReturnType.SUCCESS;
                    boolean hasException = false;
                    try {
                        this.processQueue.getLockConsume().lock();//pq的重入鎖加鎖,保證只有一個執行緒可以消費該pq上的該訊息
                        if (this.processQueue.isDropped()) {
                            log.warn("consumeMessage, the message queue not be able to consume, because it's dropped. {}",
                                this.messageQueue);
                            break;
                        }
 
                        //業務程式碼執行消費訊息
                        status = messageListener.consumeMessage(Collections.unmodifiableList(msgs), context);
                    } catch (Throwable e) {
                        log.warn("consumeMessage exception: {} Group: {} Msgs: {} MQ: {}",
                            RemotingHelper.exceptionSimpleDesc(e),
                            ConsumeMessageOrderlyService.this.consumerGroup,
                            msgs,
                            messageQueue);
                        hasException = true;
                    } finally {
                        this.processQueue.getLockConsume().unlock();//pq的重入鎖解鎖
                    }
 
                    //忽略不重要程式碼
 
                    //更新offset成功則繼續從pq拉取訊息消費(繼續執行for迴圈),這個一個順序消費執行緒就消費完了pull到的所有訊息
                    continueConsume = ConsumeMessageOrderlyService.this.processConsumeResult(msgs, status, context, this);
                } else {//pq無待消費訊息,task退出執行
                    continueConsume = false;
                }
            }
        } else {
            if (this.processQueue.isDropped()) {
                log.warn("the message queue not be able to consume, because it's dropped. {}", this.messageQueue);
                return;
            }
          //pq未被加鎖or鎖時間失效,稍後再重新消費
            ConsumeMessageOrderlyService.this.tryLockLaterAndReconsume(this.messageQueue, this.processQueue, 100);
        }
    }
}

在去除了一些鉤子方法和統計後,方法很簡明瞭,分為三步
step1:從pq上獲取待消費的訊息,預設是一條,可以設定多條。

step2:業務程式碼消費,消費結果是ConsumeOrderlyStatus.SUCCESS、ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT兩種,前者成功,後者消費失敗。

step3:processConsumeResult處理消費結果,處理成功接著繼續從pq拉取訊息進行消費。下面看該方法

/*
 * 傳入引數msgs是被消費的訊息,status是消費結果,context是MessageListenerOrderly.consumeMessage(List<MessageExt>, ConsumeOrderlyContext)中的第二個引數,業務程式碼實現該方法,consumeRequest是ConsumeMessageOrderlyService.ConsumeRequest
 * 功能:處理消費結果,消費成功更新消費客戶端本地的offset,消費失敗,則把訊息重新放到pq.msgTreeMap上,然後阻塞在該訊息,接著繼續消費該訊息。
 */
public boolean processConsumeResult(
    final List<MessageExt> msgs,
    final ConsumeOrderlyStatus status,
    final ConsumeOrderlyContext context,
    final ConsumeRequest consumeRequest
) {
    boolean continueConsume = true;
    long commitOffset = -1L;
    if (context.isAutoCommit()) {//預設true,用於非事務訊息,預設執行這裡
        switch (status) {
            case COMMIT:
            case ROLLBACK:
                log.warn("the message queue consume result is illegal, we think you want to ack these message {}",
                    consumeRequest.getMessageQueue());
            case SUCCESS:
                commitOffset = consumeRequest.getProcessQueue().commit();//獲取本次消費的訊息中的訊息最大offset
                this.getConsumerStatsManager().incConsumeOKTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), msgs.size());
                break;
            case SUSPEND_CURRENT_QUEUE_A_MOMENT:
                this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), msgs.size());
                /*
						     * checkReconsumeTimes返回true,則說明消費失敗次數未達到最大  or 達到最大消費失敗次數但是回發broker失敗。
						     * 	返回false說明消費失敗達到最大次數且回發該訊息到broker成功
						     */
                if (checkReconsumeTimes(msgs)) {//檢測重複消費次數,返回true,則說明消費失敗次數未達到最大  or 達到最大消費失敗次數但是回發broker失敗
                    consumeRequest.getProcessQueue().makeMessageToCosumeAgain(msgs);//程式碼@1 把訊息重新放到ProcessQueue.msgTreeMap,這樣task再次執行還是獲取到當前消費失敗的訊息,繼而就是阻塞了,因此需要設定最大消費失敗次數,不然消費失敗一直阻塞在該訊息上了。
                    this.submitConsumeRequestLater(
                        consumeRequest.getProcessQueue(),
                        consumeRequest.getMessageQueue(),
                        context.getSuspendCurrentQueueTimeMillis());//延時1s繼續執行task消費任務
                    continueConsume = false;
                } else {//消費失敗次數達到了最大且回發broker成功執行這裡,即暫時跳過該消費失敗的訊息消費後續訊息,因此返回offset
                    commitOffset = consumeRequest.getProcessQueue().commit();
                }
                break;
            default:
                break;
        }
    } else {//業務程式碼內設定了手動提交,用於事務訊息
        //省略事務訊息處理
    }
 
    if (commitOffset >= 0 && !consumeRequest.getProcessQueue().isDropped()) {
        this.defaultMQPushConsumerImpl.getOffsetStore().updateOffset(consumeRequest.getMessageQueue(), commitOffset, false);//注意順序消費updateOffset操作是false,那麼就有可能消費達到最大失敗次數回發到broker後,又被重新拉取到而消費,那麼為false的情況,就會把消費端本地儲存的offset更新為舊的offset,導致重複消費。因此順序消費,消費失敗達到最大失敗次數情況下,直接返回消費成功,記錄db,不回發broker。
    }
 
    return continueConsume;
}

順序消費,消費失敗的時候是被阻塞的,消費失敗後,然後當前在執行的task就退出,訊息又重新被儲存到pq,新建立ConsumeRequest提交到執行緒池,預設延時1s後再次消費,這個延時時間可以業務程式碼內調整。

總結:順序消費在業務程式碼要設定最大失敗消費次數,達到這個次數,把訊息儲存到db,而後要返回消費成功,這樣避免了訊息回發到broker到死信佇列,這樣做比較方便。

以上是一個MessageQueue的消費情況,那麼一個消費客戶端對應消費多個mq呢?

解答:PullMessageService拉取訊息是按照PullRequest來拉取的,一個PullRequest表示一個訊息佇列mq,那麼在一個消費端被分配了多個mq的時候,每個mq拉取到的訊息都會丟入到執行緒池處理(無論併發消費or順序消費都預設是20~64個執行緒),併發消費是多個消費執行緒一起執行,這個容易理解,但是順序消費必須要序列執行,是如何做的呢?答案就在上面分析的org.apache.rocketmq.client.impl.consumer.ConsumeMessageOrderlyService.ConsumeRequest.run()程式碼的for迴圈上,順序消費和併發消費不同的是併發消費的task是ConsumeMessageConcurrentlyService.ConsumeRequest,它包裝了待消費的訊息,因此可以線上程池中併發執行。但是ConsumeMessageOrderlyService.ConsumeRequest是不包含待消費的訊息,而是在執行過程中從processqueue上拉取訊息然後進行消費,消費完畢後,接著再進行拉取訊息,因此雖然順序消費的執行緒池的work執行緒是多個,但是實際上一個mq的消費只會同時只有執行緒池中的一個work執行緒執行,因此做到了順序消費是序列的。

至此順序消費寫完。

3.3.延時消費

RocketMQ 支援定時訊息,但是不支援任意時間精度,僅支援特定的 level,例如定時 5s, 10s, 1m 等。其中,level=0 級表示不延時,level=1 表示 1 級延時,level=2 表示 2 級延時,以此類推。
如何配置:在broker的屬性配置檔案中加入以下行:
預設是messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";
描述了各級別與延時時間的對應對映關係。
這個配置項配置了從1級開始各級延時的時間,如1表示延時1s,2表示延時5s,14表示延時10m,可以修改這個指定級別的延時時間;?
時間單位支援:s、m、h、d,分別表示秒、分、時、天;?
預設值就是上面宣告的,可手工調整

在rmq中每個延時級別對應一個mq,預設是18個延時級別,則是18個mq,主題是SCHEDULE_TOPIC_XXXX

先說下延時訊息的傳送,有producer傳送延時訊息、併發消費失敗回發訊息到broker,順序消費失敗次數超過最大回發broker,這些情況都會儲存到延時主題上。

併發消費失敗回發、順序消費失敗回發、延時訊息傳送broker端處理異同如圖

併發消費是消費失敗就回發到broker,順序消費是消費次數達到了最大失敗次數才回發到broker,兩者傳送命令不同,在broker端SendMessageProcessor處理器的方法不同,但是相同的是消費重試訊息都會被儲存到SCHEDULE_TOPIC_XXXX主題對應的延時mq內。producer傳送延時訊息和順序消費重發級別相同,不同的是不需要延時訊息傳送的是原topic,而順序消費重試回發發送的是%RETRY%consumegroup。 最終不論是延時訊息or retry訊息,都是被儲存到SCHEDULE_TOPIC_XXXX上,佇列就是各自的延時級別,因為消費端不訂閱SCHEDULE_TOPIC_XXXX,因此自然延時訊息就無法被消費了。

那麼延時訊息是如何被消費的?肯定需要把SCHEDULE_TOPIC_XXXX的訊息改為原topic才可以消費,在哪裡進行的呢?

在broker啟動的時候org.apache.rocketmq.store.DefaultMessageStore.start()內執行org.apache.rocketmq.store.schedule.ScheduleMessageService.start(),該方法內啟動一個timer執行緒Thread[ScheduleMessageTimerThread],該執行緒對每個佇列任務DeliverDelayedMessageTimerTask執行org.apache.rocketmq.store.schedule.ScheduleMessageService.DeliverDelayedMessageTimerTask.run(),該run()方法每執行一次後又會重新建立DeliverDelayedMessageTimerTask再執行執行,我們就把它當作一個計劃任務即可,邏輯在org.apache.rocketmq.store.schedule.ScheduleMessageService.DeliverDelayedMessageTimerTask.executeOnTimeup()方法內

//org.apache.rocketmq.store.schedule.ScheduleMessageService.DeliverDelayedMessageTimerTask.executeOnTimeup()
public void executeOnTimeup() {
    ConsumeQueue cq =
        ScheduleMessageService.this.defaultMessageStore.findConsumeQueue(SCHEDULE_TOPIC,
            delayLevel2QueueId(delayLevel));//根據延時topic SCHEDULE_TOPIC_XXXX和延時佇列獲取consumequeue
 
    long failScheduleOffset = offset;//當前延時級別對應的mq的offset,該offset並不是在consumequeue上的物理位置,而是第幾條訊息的意思
 
    if (cq != null) {
        SelectMappedBufferResult bufferCQ = cq.getIndexBuffer(this.offset);//返回從該offset所歸屬的MappedFile物件上從offset開始到consumequeeu的寫位置之間的緩衝區
        if (bufferCQ != null) {
            try {
                long nextOffset = offset;
                int i = 0;
                ConsumeQueueExt.CqExtUnit cqExtUnit = new ConsumeQueueExt.CqExtUnit();//忽略
                /*
                 * 從consumequeue的延時佇列讀取一條訊息,如果到了要發起的時間,則把訊息還原topic,並寫入到commitlog,
                 *	然後reputmessageservice執行緒會轉儲到consumequeue中,這樣消費端就可以消費了。
                 * 	這樣for迴圈下就可以把發起時間到了的訊息都發起儲存到commitlog供消費了。
                 */
                for (; i < bufferCQ.getSize(); i += ConsumeQueue.CQ_STORE_UNIT_SIZE) {
                    long offsetPy = bufferCQ.getByteBuffer().getLong();//commitlog offset 8
                    int sizePy = bufferCQ.getByteBuffer().getInt();//msg size 4
                    long tagsCode = bufferCQ.getByteBuffer().getLong();//時間戳 8,對於延時訊息consumequeue存放的不是taghash而是具體發起時間
 
                    //忽略擴充套件的cq
                    
                    long now = System.currentTimeMillis();
                    long deliverTimestamp = this.correctDeliverTimestamp(now, tagsCode);//tagsCode>now+延時級別對應的延時時間,說明到了發起時間,則返回now值,否則返回tagsCode值
 
                    nextOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);
 
                    long countdown = deliverTimestamp - now;
 
                    if (countdown <= 0) {//到時間了需要發起的訊息
                        MessageExt msgExt =
                            ScheduleMessageService.this.defaultMessageStore.lookMessageByOffset(
                                offsetPy, sizePy);//根據訊息offset和size從commitlog查詢到訊息返回,該訊息的主題是SCHEDULE_TOPIC_XXXX
 
                        if (msgExt != null) {
                            try {
                                MessageExtBrokerInner msgInner = this.messageTimeup(msgExt);//返回新的訊息msgInner,還原了原topic queueid,清除了訊息的屬性PROPERTY_DELAY_TIME_LEVEL,這樣就是個普通訊息了,不再具有延時
                                PutMessageResult putMessageResult =
                                    ScheduleMessageService.this.defaultMessageStore
                                        .putMessage(msgInner);//把原訊息追加到commitlog中,即該訊息的topic是待消費的topic
 
                                if (putMessageResult != null
                                    && putMessageResult.getPutMessageStatus() == PutMessageStatus.PUT_OK) {
                                    continue;//原訊息追加到commitlog成功,接著進行for迴圈
                                } else {
                                    // XXX: warn and notify me
                                    log.error(
                                        "ScheduleMessageService, a message time up, but reput it failed, topic: {} msgId {}",
                                        msgExt.getTopic(), msgExt.getMsgId());
                                    ScheduleMessageService.this.timer.schedule(
                                        new DeliverDelayedMessageTimerTask(this.delayLevel,
                                            nextOffset), DELAY_FOR_A_PERIOD);
                                    ScheduleMessageService.this.updateOffset(this.delayLevel,
                                        nextOffset);
                                    return;
                                }
                            } catch (Exception e) {
                                log.error(
                                    "ScheduleMessageService, messageTimeup execute error, drop it. msgExt="
                                        + msgExt + ", nextOffset=" + nextOffset + ",offsetPy="
                                        + offsetPy + ",sizePy=" + sizePy, e);
                            }
                        }
                    } else {//訊息未到發起時間,重新執行task
                        ScheduleMessageService.this.timer.schedule(
                            new DeliverDelayedMessageTimerTask(this.delayLevel, nextOffset),
                            countdown);
                        ScheduleMessageService.this.updateOffset(this.delayLevel, nextOffset);//更新該延時佇列的消費offset到ScheduleMessageService.offsetTable
                        return;
                    }
                } // end of for
 
               
                nextOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);
                //task 100ms後執行,這裡task是新的offset
                ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(
                    this.delayLevel, nextOffset), DELAY_FOR_A_WHILE);
                //遍歷完當前延時佇列發起的訊息,更新offset位置到ScheduleMessageService.offsetTable
                ScheduleMessageService.this.updateOffset(this.delayLevel, nextOffset);
                return;
            } finally {
 
                bufferCQ.release();//是否緩衝區
            }
        } // end of if (bufferCQ != null)
        else {//不存在consumequeueu
 
            long cqMinOffset = cq.getMinOffsetInQueue();
            if (offset < cqMinOffset) {
                failScheduleOffset = cqMinOffset;
                log.error("schedule CQ offset invalid. offset=" + offset + ", cqMinOffset="
                    + cqMinOffset + ", queueId=" + cq.getQueueId());
            }
        }
    } // end of if (cq != null)
	//cq==null,即根據offset未找到cq,延時100ms重新執行task
    ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(this.delayLevel,
        failScheduleOffset), DELAY_FOR_A_WHILE);
}

程式碼具體含義看註釋,該方法的功能就是根據延時佇列的offset找到延時佇列,讀取訊息commitlog offset ,size,然後到commitlog讀取到具體的SCHEDULE_TOPIC_XXXX訊息,然後把訊息還原為原topic並追加到commitlog,這樣客戶端就可以消費訊息了。

延時訊息的幾種傳送情況說明白了,那麼對於消費而言跟普通訊息消費是完全相同的,也可以看出順序/併發消費失敗超過最大次數回發broker會被儲存到死信佇列,死信佇列預設還不能度(可以使用mqadmin命令修改為可讀),因此業務上對於順序/併發消費在失敗超過最大次數了要儲存到db,返回消費成功,避免傳送到死信情況。

至此延時訊息說完。

思考:為什麼rmq中有許多計劃任務是使用的Timer而非ScheduledThreadPoolExecutor這個計劃執行緒池呢?timer內部只是包含一個執行緒,可以使用ScheduledThreadPoolExecutor的時候也只是一個執行緒,這個為什麼不使用ScheduledThreadPoolExecutor呢?Timer已經不建議使用了,這個暫時不清楚?

3.4.事務訊息

比如一個下訂單扣庫存的動作,這兩個服務分別操作訂單庫和庫庫,屬於分散式事務範疇了,如果mq不支援事務,那麼可能做法是:

//step1:開啟本地事務

//step2:訂單庫新增一條記錄

//step3:向mq傳送訂單訊息,用於扣庫存

//step4:提交事務/回滾事務

該方案在正常情況下沒有問題,但是一些異常情況下就有了問題:

1.如果step3執行後,在step4執行前jvm程序or伺服器宕機,事務沒有成功提交,訂單庫沒變化和但是庫存庫減少,導致兩個庫資料不一致

2.由於訊息是在事務提交之前提交,傳送的訊息內容是訂單實體的內容,會造成在消費端進行消費時如果需要去驗證訂單是否存在時可能出現訂單不存在,該問題也會存在,因為消費端速度很快的話。

對於生成訂單(DB操作)和傳送訊息是一個事務內的動作,因此要保證要麼全部成功,要麼回滾,因此可以採用rocketmq的事務訊息來解決。

rocketmq事務訊息解決分散式事務,實現最終資料一致性,思想就是xa協議2pc,整體互動流程如下圖所示(圖片來源網上,該圖很清晰明瞭,如果前面的訊息傳送和消費看懂了,事務訊息也很容易明白)

所謂的訊息事務就是基於訊息中介軟體的兩階段提交,本質上是對訊息中介軟體的一種特殊利用,它是將本地事務和發訊息放在了一個分散式事務裡,保證要麼本地操作成功成功並且對外發訊息成功,要麼兩者都失敗。來源網上,覺得說的好,帖出來了。

事務訊息是使用TransactionMQProducer進行傳送的,和普通訊息的傳送者producer不同的是需要業務開發自定義執行緒池和org.apache.rocketmq.client.producer.TransactionListener的實現

下面開始看程式碼(事務訊息傳送客戶端參考rocketmq的example下的程式碼)

先貼圖,先看事務producer的啟動

和普通訊息producer啟動基本相同,只是增加了事務監聽器和事務檢查執行緒池,分別用於執行事務、檢查事務和接收broker發來的事務回查請求。

接著看事務訊息的傳送處理流程圖

該圖把一些處理細節給標註了,可以跟前面的producer傳送泳道圖比較,看看和事務訊息producer有什麼區別。

該過程中的核心點是EndTransactionProcessor.processRequest(ChannelHandlerContext, RemotingCommand),下面看該方法程式碼

//org.apache.rocketmq.broker.processor.EndTransactionProcessor.processRequest(ChannelHandlerContext, RemotingCommand)
public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) throws
    RemotingCommandException {
    final RemotingCommand response = RemotingCommand.createResponseCommand(null);
    /*
     *	 該requestHeader包含了
     *	producerGroup==producerGroupname,
     *	tranStateTableOffset==prepare訊息在consumequeue的位置(表示該訊息在cq上是第幾條訊息), (用於把原始的事務訊息儲存到consumequeue上,即儲存在prepare訊息在consumequeue的位置)
     *	commitLogOffset==prepare訊息在commitlog的絕對位置,(用於查詢在commitlog上的commitLogOffset位置的prepare訊息,把prepare訊息轉換為原始訊息,繼而最後儲存到commitlog上)
     *	commitOrRollback==事務訊息型別TRANSACTION_COMMIT_TYPE/TRANSACTION_ROLLBACK_TYPE/TRANSACTION_NOT_TYPE,
     *	transactionId==訊息的 UNIQ_KEY
     *	msgId==訊息的UNIQ_KEY
     *	fromTransactionCheck是否是broker回查事務,true是,false否
     */
    final EndTransactionRequestHeader requestHeader =
        (EndTransactionRequestHeader)request.decodeCommandCustomHeader(EndTransactionRequestHeader.class);
    LOGGER.info("Transaction request:{}", requestHeader);
    if (BrokerRole.SLAVE == brokerController.getMessageStoreConfig().getBrokerRole()) {//不允許salve broker處理事務訊息
        response.setCode(ResponseCode.SLAVE_NOT_AVAILABLE);
        LOGGER.warn("Message store is slave mode, so end transaction is forbidden. ");
        return response;
    }
 
    if (requestHeader.getFromTransactionCheck()) {//表示是否是回查檢查訊息。用於broker發producer訊息回查事務,producer結束事務傳送到broker的時候,該值為true。對於producer傳送prepare訊息後執行完本地事務,傳送commit/rollback訊息到broker的時候,該值為false。
        //回查事務和非回查,執行功能是一樣的
    	switch (requestHeader.getCommitOrRollback()) {
            case MessageSysFlag.TRANSACTION_NOT_TYPE: {
                LOGGER.warn("Check producer[{}] transaction state, but it's pending status."
                        + "RequestHeader: {} Remark: {}",
                    RemotingHelper.parseChannelRemoteAddr(ctx.channel()),
                    requestHeader.toString(),
                    request.getRemark());
                return null;
            }
 
            case MessageSysFlag.TRANSACTION_COMMIT_TYPE: {
                LOGGER.warn("Check producer[{}] transaction state, the producer commit the message."
                        + "RequestHeader: {} Remark: {}",
                    RemotingHelper.parseChannelRemoteAddr(ctx.channel()),
                    requestHeader.toString(),
                    request.getRemark());
 
                break;
            }
 
            case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE: {
                LOGGER.warn("Check producer[{}] transaction state, the producer rollback the message."
                        + "RequestHeader: {} Remark: {}",
                    RemotingHelper.parseChannelRemoteAddr(ctx.channel()),
                    requestHeader.toString(),
                    request.getRemark());
                break;
            }
            default:
                return null;
        }
    } else {
        switch (requestHeader.getCommitOrRollback()) {
            case MessageSysFlag.TRANSACTION_NOT_TYPE: {//對應事務狀態的UNKNOW,不處理
                LOGGER.warn("The producer[{}] end transaction in sending message,  and it's pending status."
                        + "RequestHeader: {} Remark: {}",
                    RemotingHelper.parseChannelRemoteAddr(ctx.channel()),
                    requestHeader.toString(),
                    request.getRemark());
                return null;
            }
 
            case MessageSysFlag.TRANSACTION_COMMIT_TYPE: {//事務commit訊息,處理
                break;
            }
 
            case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE: {//事務rollback訊息,處理
                LOGGER.warn("The producer[{}] end transaction in sending message, rollback the message."
                        + "RequestHeader: {} Remark: {}",
                    RemotingHelper.parseChannelRemoteAddr(ctx.channel()),
                    requestHeader.toString(),
                    request.getRemark());
                break;
            }
            default:
                return null;
        }
    }
    OperationResult result = new OperationResult();
    if (MessageSysFlag.TRANSACTION_COMMIT_TYPE == requestHeader.getCommitOrRollback()) {
    	//事務commit訊息,則直接將原先發的prepare從commitlog檔案讀出來訊息轉換為原訊息,並寫入commitlog,訊息的topic是原topic,即被消費者訂閱可以消費到
        result = this.brokerController.getTransactionalMessageService().commitMessage(requestHeader);//根據EndTransactionRequestHeader.commitLogOffset這個commitlog物理偏移量從commitlog中查詢到prepare訊息
        if (result.getResponseCode() == ResponseCode.SUCCESS) {//從commitlog中查詢到了prepare訊息
            RemotingCommand res = checkPrepareMessage(result.getPrepareMessage(), requestHeader);//檢測prepare訊息和收到的EndTransactionRequestHeader.commitlogOffset等資訊是否匹配
            if (res.getCode() == ResponseCode.SUCCESS) {//檢測通過
                MessageExtBrokerInner msgInner = endMessageTransaction(result.getPrepareMessage());//還原prepare訊息的topic queueid等資訊為原始訊息
                msgInner.setSysFlag(MessageSysFlag.resetTransactionValue(msgInner.getSysFlag(), requestHeader.getCommitOrRollback()));//sysflag更新為TRANSACTION_COMMIT_TYPE
                msgInner.setQueueOffset(requestHeader.getTranStateTableOffset());//設定原始訊息在consumequeue的offset,即儲存到prepare訊息在consumequeue上的位置。
                msgInner.setPreparedTransactionOffset(requestHeader.getCommitLogOffset());//prepare訊息在commitlog的絕對(物理)位置,即commitlog格式中的PTO
                msgInner.setStoreTimestamp(result.getPrepareMessage().getStoreTimestamp());//原始訊息的儲存時間戳為prepare訊息儲存時間戳
                RemotingCommand sendResult = sendFinalMessage(msgInner);//把原始訊息寫入到commitlog
                if (sendResult.getCode() == ResponseCode.SUCCESS) {//原始訊息寫入commitlog成功,從commitlog刪除prepare訊息
                    this.brokerController.getTransactionalMessageService().deletePrepareMessage(result.getPrepareMessage());//所謂刪除prepare訊息就是把該訊息寫入到commitlog,topic是op half topic,這樣broker回查的時候判斷OP HALF有了該訊息,就不再進行回查
                }
                return sendResult;
            }
            return res;
        }
    } else if (MessageSysFlag.TRANSACTION_ROLLBACK_TYPE == requestHeader.getCommitOrRollback()) {
        //如果是Rollback,則直接將訊息轉換為原訊息,並寫入到Op Topic裡
    	result = this.brokerController.getTransactionalMessageService().rollbackMessage(requestHeader);//根據EndTransactionRequestHeader.commitLogOffset這個commitlog物理偏移量從commitlog中查詢到prepare訊息
        if (result.getResponseCode() == ResponseCode.SUCCESS) {
            RemotingCommand res = checkPrepareMessage(result.getPrepareMessage(), requestHeader);//檢測prepare訊息和收到的EndTransactionRequestHeader.commitlogOffset等資訊是否匹配
            if (res.getCode() == ResponseCode.SUCCESS) {
                this.brokerController.getTransactionalMessageService().deletePrepareMessage(result.getPrepareMessage());//所謂刪除prepare訊息就是把該訊息寫入到commitlog,topic是op half topic
            }
            return res;
        }
    }
    response.setCode(result.getResponseCode());
    response.setRemark(result.getResponseRemark());
    return response;//設定返回結果,實際producer是oneway傳送方式,不返回producer
}

程式碼加了註釋,比較容易懂,大體流程就是:

commit訊息=>從commitlog讀取出prepare訊息=>檢查prepare訊息=>轉換為真正待消費訊息=>追加到commitlog檔案=>刪除prepare訊息=>ReputMessageService把待消費訊息轉儲到consumequeue=>客戶端消費事務訊息。

rollback訊息=>從commitlog讀取出prepare訊息=>檢查prepare訊息=>刪除prepare訊息。

該方法的核心就是根據EndTransactionRequestHeader上送的commitlogPhysOffset找到prepare訊息,然後還原訊息儲存到commitlog內,也很容易理解。那麼commitlogPhysOffset如來的,還得根據程式碼自己找,下面我總結了下EndTransactionRequestHeader的屬性,如果找不清楚來源的,可以參考下,

public class EndTransactionRequestHeader implements CommandCustomHeader {
    @CFNotNull
    private String producerGroup;//傳送broker前賦值	producerGroupname
    @CFNotNull
    private Long tranStateTableOffset;//傳送broker前賦值	prepare訊息在consumequeue的位置(表示該訊息在cq上是第幾條訊息)
    @CFNotNull
    private Long commitLogOffset;//傳送broker前賦值  prepare訊息在commitlog的絕對位置
    @CFNotNull
    private Integer commitOrRollback; // TRANSACTION_COMMIT_TYPE	傳送broker前賦值 為對應的訊息型別commit/rollback/unknow
    // TRANSACTION_ROLLBACK_TYPE
    // TRANSACTION_NOT_TYPE
 
    @CFNullable
    private Boolean fromTransactionCheck = false;
 
    @CFNotNull
    private String msgId;//傳送broker前賦值	訊息屬性的UNIQ_KEY
 
    private String transactionId;//傳送broker前賦值 事務id,通常是訊息屬性的UNIQ_KEY
}

那麼問題,如果執行完本地事務後,傳送commit訊息時候,producer jvm宕機了,那麼訊息沒有發出去,客戶端無法消費到,無法扣除庫存,導致資料不一致,這應該怎麼解決?rmq提供了事務回查功能。

在broker啟動的時候啟動服務執行緒Thread [TransactionalMessageCheckService],執行TransactionalMessageCheckService.run(),broker每60s回查producer事務狀態,執行堆疊如下圖

核心在check方法內,看下面程式碼和註釋

/*
     * 	傳入引數:transactionTimeout==60s 事務回查超時時間, transactionCheckMax==15,最大回查次數,listener是DefaultTransactionalMessageCheckListener
     * 	功能:讀取當前half的half queueoffset,然後從op half拉取32條訊息儲存到removeMap,如果half queueoffset處的訊息在removeMap中,
     * 		則說明該prepare訊息被處理過了,然後讀取下一條prepare訊息,如果prepare不在removeMap中,說明是需要回查的,此時broker作為client端,向服務端producer傳送回查命令,
     * 		最後由producer返回回查結果更新原prepare訊息。
     */
    @Override
    public void check(long transactionTimeout, int transactionCheckMax,
        AbstractTransactionalMessageCheckListener listener) {
        try {
            String topic = MixAll.RMQ_SYS_TRANS_HALF_TOPIC;//RMQ_SYS_TRANS_HALF_TOPIC
            Set<MessageQueue> msgQueues = transactionalMessageBridge.fetchMessageQueues(topic);//返回的是half topic的訊息佇列,只有一個佇列
            if (msgQueues == null || msgQueues.size() == 0) {//說明broker還沒有接收過prepare訊息,自然half topic是null
                log.warn("The queue of topic is empty :" + topic);
                return;
            }
            log.info("Check topic={}, queues={}", topic, msgQueues);
            for (MessageQueue messageQueue : msgQueues) {//遍歷half topic下的訊息佇列,實際只有一個訊息佇列
                long startTime = System.currentTimeMillis();
                MessageQueue opQueue = getOpQueue(messageQueue);//獲取op half topic的訊息佇列(只有一個佇列),OP就是英文operator縮寫
                long halfOffset = transactionalMessageBridge.fetchConsumeOffset(messageQueue);//獲取prepare訊息的當前消費queueoffset
                long opOffset = transactionalMessageBridge.fetchConsumeOffset(opQueue);//獲取op half訊息的當前消費queueoffset
                log.info("Before check, the queue={} msgOffset={} opOffset={}", messageQueue, halfOffset, opOffset);
                if (halfOffset < 0 || opOffset < 0) {
                    log.error("MessageQueue: {} illegal offset read: {}, op offset: {},skip this queue", messageQueue,
                        halfOffset, opOffset);
                    continue;
                }
 
                List<Long> doneOpOffset = new ArrayList<>();
                HashMap<Long, Long> removeMap = new HashMap<>();
                //fillOpRemoveMap方法返回的removeMap集合包含的是已經被commit/rollback的prepare訊息的queueoffset集合
                PullResult pullResult = fillOpRemoveMap(removeMap, opQueue, opOffset, halfOffset, doneOpOffset);//核心方法
                if (null == pullResult) {
                    log.error("The queue={} check msgOffset={} with opOffset={} failed, pullResult is null",
                        messageQueue, halfOffset, opOffset);
                    continue;
                }
                // single thread
                int getMessageNullCount = 1;//獲取空訊息的次數
                long newOffset = halfOffset;//當前處理RMQ_SYS_TRANS_HALF_TOPIC#queueId的最新進度。
                long i = halfOffset;//當前處理RMQ_SYS_TRANS_HALF_TOPIC訊息的佇列偏移量
                while (true) {//遍歷,看看queueoffset=i處的prepare訊息是否在removeMap集合內,如果在,說明該prepare訊息被commit/rollback處理過了,如果不在,則說明該prepare訊息未被處理過,需要進行回查
                    if (System.currentTimeMillis() - startTime > MAX_PROCESS_TIME_LIMIT) {//這是RocketMQ處理任務的一個通用處理邏輯,就是一個任務處理,可以限制每次最多處理的時間,RocketMQ為待檢測主題RMQ_SYS_TRANS_HALF_TOPIC的每個佇列,做事務狀態回查,一次最多不超過60S,目前該值不可配置
                        log.info("Queue={} process time reach max={}", messageQueue, MAX_PROCESS_TIME_LIMIT);
                        break;
                    }
                    if (removeMap.containsKey(i)) {//說明i位置處的這個prepare訊息已經被commit/rollback處理過了,因此i+1,接著執行下一次while
                        log.info("Half offset {} has been committed/rolled back", i);
                        removeMap.remove(i);
                    } else {
                    	//說明i位置處的這個prepare訊息還未被commit/rollback處理過,需要進行回查
                        GetResult getResult = getHalfMsg(messageQueue, i);//從queueoffset位置獲取commitlog上的prepare訊息,這裡的引數i表示queueoffset
                        MessageExt msgExt = getResult.getMsg();//獲取queueoffset=i處的prepare訊息
                        if (msgExt == null) {//prepare訊息不存在
                        	/*
                        	 * 	如果訊息為空,則根據允許重複次數進行操作,預設重試一次,目前不可配置。其具體實現為:
                        	 * 1、如果超過重試次數,直接跳出,結束該訊息佇列的事務狀態回查。
                        	 * 2、如果是由於沒有新的訊息而返回為空(拉取狀態為:PullStatus.NO_NEW_MSG),則結束該訊息佇列的事務狀態回查。
                        	 * 3、其他原因,則將偏移量i設定為: getResult.getPullResult().getNextBeginOffset(),重新拉取。
                           	 */
                            if (getMessageNullCount++ > MAX_RETRY_COUNT_WHEN_HALF_NULL) {//空訊息次數+1
                                break;
                            }
                            if (getResult.getPullResult().getPullStatus() == PullStatus.NO_NEW_MSG) {//prepare訊息不存在,則退出
                                log.info("No new msg, the miss offset={} in={}, continue check={}, pull result={}", i,
                                    messageQueue, getMessageNullCount, getResult.getPullResult());
                                break;
                            } else {//繼續從commitlog讀取下一個prepare訊息
                                log.info("Illegal offset, the miss offset={} in={}, continue check={}, pull result={}",
                                    i, messageQueue, getMessageNullCount, getResult.getPullResult());
                                i = getResult.getPullResult().getNextBeginOffset();
                                newOffset = i;
                                continue;
                            }
                        }
 
                        /*
                         * needDiscard,prepare訊息已經被回查達到15次,則不再回查該prepare訊息
                         * needSkip prepare訊息儲存時間距離現在超過了72h,則不再回查該prepare訊息
                         * 	判斷該訊息是否需要discard(吞沒,丟棄,不處理)、或skip(跳過),其依據如下
                         * 	1、needDiscard 依據:如果該訊息回查的次數超過允許的最大回查次數,則該訊息將被丟棄,即事務訊息提交失敗,不能被消費者消費,其做法,主要是每回查一次,在訊息屬性TRANSACTION_CHECK_TIMES中增1,預設最大回查次數為15次。
     					 *	2、needSkip依據:如果事務訊息超過檔案的過期時間,預設72小時(具體請檢視RocketMQ過期檔案相關內容),則跳過該訊息。
                         */
                        if (needDiscard(msgExt, transactionCheckMax) || needSkip(msgExt)) {//不再回查滿足該條件的prepare訊息
                            listener.resolveDiscardMsg(msgExt);//列印error日誌
                            newOffset = i + 1;
                            i++;
                            continue;//遍歷下一個prepare訊息
                        }
                        if (msgExt.getStoreTimestamp() >= startTime) {//prepare訊息儲存時間戳>=broker本次回查開始時間戳,結束回查。說明該prepare訊息剛被重新整理到commitlog,等待下次再回查該訊息
                            log.info("Fresh stored. the miss offset={}, check it later, store={}", i,
                                new Date(msgExt.getStoreTimestamp()));
                            break;
                        }
 
                        /*
                         * 	處理事務超時相關概念,先解釋幾個區域性變數:
                         * valueOfCurrentMinusBorn :該訊息已生成的時間,等於系統當前時間減去訊息生成的時間戳。
                         * checkImmunityTime :立即檢測事務訊息的時間,其設計的意義是,應用程式在傳送事務訊息後,事務不會馬上提交,該時間就是假設事務訊息傳送成功後,應用程式事務提交的時間,在這段時間內,RocketMQ任務事務未提交,故不應該在這個時間段嚮應用程式傳送回查請求。
                         * transactionTimeout:事務訊息的超時時間,這個時間是從OP拉取的訊息的最後一條訊息的儲存時間與check方法開始的時間,如果時間差超過了transactionTimeout,就算時間小於checkImmunityTime時間,也傳送事務回查指令。
                         */
                        long valueOfCurrentMinusBorn = System.currentTimeMillis() - msgExt.getBornTimestamp();//當前時間戳與prepare訊息傳送時間戳差。bornTimestamp是producer產生的
                        long checkImmunityTime = transactionTimeout;
                        String checkImmunityTimeStr = msgExt.getUserProperty(MessageConst.PROPERTY_CHECK_IMMUNITY_TIME_IN_SECONDS);//原始碼內沒有地方給訊息屬性設定PROPERTY_CHECK_IMMUNITY_TIME_IN_SECONDS
                        if (null != checkImmunityTimeStr) {//false
                            checkImmunityTime = getImmunityTime(checkImmunityTimeStr, transactionTimeout);
                            if (valueOfCurrentMinusBorn < checkImmunityTime) {
                                if (checkPrepareQueueOffset(removeMap, doneOpOffset, msgExt)) {
                                    newOffset = i + 1;
                                    i++;
                                    continue;
                                }
                            }
                        } else {//程式走該分支	如果當前時間還未過(應用程式事務結束時間),則跳出本次回查處理的,等下一次再試
                            if ((0 <= valueOfCurrentMinusBorn) && (valueOfCurrentMinusBorn < checkImmunityTime)) {//訊息儲存時間戳在距離當前時間是60s內,則不回查
                                log.info("New arrived, the miss offset={}, check it later checkImmunity={}, born={}", i,
                                    checkImmunityTime, new Date(msgExt.getBornTimestamp()));
                                break;//退出回查
                            }
                        }
                        List<MessageExt> opMsg = pullResult.getMsgFoundList();//op half msg
                        
                        boolean isNeedCheck = (opMsg == null && valueOfCurrentMinusBorn > checkImmunityTime)//訊息未被刪除且訊息儲存時間距離當前超過了60s
                            || (opMsg != null && (opMsg.get(opMsg.size() - 1).getBornTimestamp() - startTime > transactionTimeout))//判斷當前獲取的最後一條OpMsg的儲存時間是否超過了事務超時時間,如果為true也要進行事務狀態回查,為什麼要這麼做呢?
                            || (valueOfCurrentMinusBorn <= -1);
 
                        if (isNeedCheck) {//需要回查
                            if (!putBackHalfMsgQueue(msgExt, i)) {//如果需要傳送事務狀態回查訊息,則先將訊息再次傳送到HALF_TOPIC主題中,傳送成功則返回true,否則返回false, 如果傳送成功,會將該訊息的queueOffset、commitLogOffset設定為重新存入的偏移量
                                continue;
                            }
                            listener.resolveHalfMsg(msgExt);//異步向producer傳送CHECK_TRANSACTION_STATE命令查詢producer本地事務狀態,此時broker作為client端,producer作為服務端
                        } else {
                        	//如果無法判斷是否傳送回查訊息,則載入更多的op(已處理)訊息進行篩選
                            pullResult = fillOpRemoveMap(removeMap, opQueue, pullResult.getNextBeginOffset(), halfOffset, doneOpOffset);
                            log.info("The miss offset:{} in messageQueue:{} need to get more opMsg, result is:{}", i,
                                messageQueue, pullResult);
                            continue;
                        }
                    }//end else
                    newOffset = i + 1;
                    i++;
                }//end while
                if (newOffset != halfOffset) {
                    /*
                     *	 儲存(Prepare)訊息佇列的回查進度。儲存到ConsumerOffsetManager.offsetTable,key是RMQ_SYS_TRACE_TOPIC@CID_RMQ_SYS_TRANS,
                     * 	跟普通訊息的topic@groupname不同,half和op half訊息訊息沒有使用真實的groupname,而是重新定義了系統groupname==CID_RMQ_SYS_TRANS         
                     */
                	transactionalMessageBridge.updateConsumeOffset(messageQueue, newOffset);
                }
                long newOpOffset = calculateOpOffset(doneOpOffset, opOffset);
                if (newOpOffset != opOffset) {
                	//儲存處理佇列(op)的進度。儲存到ConsumerOffsetManager.offsetTable,key是RMQ_SYS_TRANS_OP_HALF_TOPIC@CID_RMQ_SYS_TRANS
                    transactionalMessageBridge.updateConsumeOffset(opQueue, newOpOffset);
                }
            }//end for
        } catch (Exception e) {
            e.printStackTrace();
            log.error("Check error", e);
        }
 
    }

該方法核心功能就是判斷prepare訊息是否在op half內,如果不在,說明prepare訊息未被commit/rollback處理過,需要發起回查,如果在,則不需要發起回查。裡面的fillOpRemoveMap方法難理解,下面看該方法程式碼和註釋

/**
     * Read op message, parse op message, and fill removeMap
     *
     * @param removeMap Half message to be remove, key:halfOffset, value: opOffset.
     * @param opQueue Op message queue.
     * @param pullOffsetOfOp The begin offset of op message queue.
     * @param miniOffset The current minimum offset of half message queue.
     * @param doneOpOffset Stored op messages that have been processed.
     * @return Op message result.
     */
    /*
     * 	傳入引數解釋:
     * removeMap:處理過的prepare訊息儲存到該集合,key:halfqueueOffset, value: opqueueOffset.
     * opQueue: op half queue
     * pullOffsetOfOp: op half queue上當前queueoffset
     * miniOffset:	half訊息佇列上當前queueoffset。不要被英文註釋給矇蔽了,不是最小offset,而是當前half上的queueoffset
     * doneOpOffset: 已經被處理過的op half訊息的queueuoffset儲存到該集合
     * 
     *	 功能:具體實現邏輯是從op half主題訊息佇列中拉取32條,如果拉取的訊息佇列偏移量大於等於half toic訊息佇列的當前queueoffset時,會新增到removeMap中,表示已處理過。
     * removeMap裡存放prepare訊息佇列中已經commit或者rollback的偏移量和待操作佇列的訊息偏移量(傳送commit或rollback後,會往待操作佇列中寫)
     * doneOpOffset存放待操作佇列的訊息偏移量
     * 
     */
    private PullResult fillOpRemoveMap(HashMap<Long, Long> removeMap,
        MessageQueue opQueue, long pullOffsetOfOp, long miniOffset, List<Long> doneOpOffset) {
        PullResult pullResult = pullOpMsg(opQueue, pullOffsetOfOp, 32);//從commitlog上拉取pullOffsetOfOp位置開始OP HALF主題訊息佇列下的32條訊息
        if (null == pullResult) {
            return null;
        }
        if (pullResult.getPullStatus() == PullStatus.OFFSET_ILLEGAL
            || pullResult.getPullStatus() == PullStatus.NO_MATCHED_MSG) {//go
            log.warn("The miss op offset={} in queue={} is illegal, pullResult={}", pullOffsetOfOp, opQueue,
                pullResult);
            //offset非法or沒有匹配的msg,說明需要更新op half的offset啦
            transactionalMessageBridge.updateConsumeOffset(opQueue, pullResult.getNextBeginOffset());//更新op half topic的queueOffset到ConsumerOffsetManager.offsetTable,注意key是RMQ_SYS_TRANS_OP_HALF_TOPIC@CID_RMQ_SYS_TRANS
            return pullResult;
        } else if (pullResult.getPullStatus() == PullStatus.NO_NEW_MSG) {
            log.warn("The miss op offset={} in queue={} is NO_NEW_MSG, pullResult={}", pullOffsetOfOp, opQueue,
                pullResult);
          //該pullOffsetOfOp位置後沒有訊息,說明不需要更新op half的offset
            return pullResult;
        }
        List<MessageExt> opMsg = pullResult.getMsgFoundList();//拉取到的op half下的訊息集合
        if (opMsg == null) {
            log.warn("The miss op offset={} in queue={} is empty, pullResult={}", pullOffsetOfOp, opQueue, pullResult);
            return pullResult;
        }
        for (MessageExt opMessageExt : opMsg) {//遍歷拉取到的op half topic佇列的訊息集合
            /*
             * 	對於op half佇列內儲存的訊息來說
             *	 訊息的body是prepare訊息在consumequeu上的queueOffset
             *	 訊息的tag是TransactionalMessageUtil.REMOVETAG
             *	 在TransactionalMessageBridge.addRemoveTagInTransactionOp(MessageExt, MessageQueue)做的
             * queueOffset變數就是prepare訊息在consumequeue上的offset
             */
        	Long queueOffset = getLong(new String(opMessageExt.getBody(), TransactionalMessageUtil.charset));//獲取op half訊息的body,即prepare訊息在cq上的queueOffset。
            log.info("Topic: {} tags: {}, OpOffset: {}, HalfOffset: {}", opMessageExt.getTopic(),
                opMessageExt.getTags(), opMessageExt.getQueueOffset(), queueOffset);
            if (TransactionalMessageUtil.REMOVETAG.equals(opMessageExt.getTags())) {//true,op訊息的tag就是TransactionalMessageUtil.REMOVETAG
                if (queueOffset < miniOffset) {
                	/*
                	 * 	op half訊息的body儲存的是對應的prepare的queueoffset,這點首先要明白
                	 * 	事務訊息的流程是先發prepare訊息到broker(訊息儲存到commitlog,topic是half),接著執行producer端本地db事務,事務執行後傳送commit/rollback/unknow訊息到broker,
                	 * 	無論是commit/rollback,都會在op half儲存一條訊息,該訊息存在,說明對應的prepare訊息就是被刪除了。那麼從op half拉取出來的訊息都是需要進行回查的了,這麼理解沒錯,但是每次都回查
                	 * 	那麼多,是否可以進行下過濾,過濾掉已經被回查過的呢?因此就有doneOpOffset,當op half訊息對應的prepare訊息queueoffset小於當前half訊息的queueoffset,說明該prepare訊息
                	 * 	已經被(處理過且)回查過了,因此無需再進行回查,儲存到doneOpOffset。
                	 */
                    doneOpOffset.add(opMessageExt.getQueueOffset());
                } else {
                	//把已經被commit/rollback處理過的訊息儲存到removeMap
                    removeMap.put(queueOffset, opMessageExt.getQueueOffset());
                }
            } else {
                log.error("Found a illegal tag in opMessageExt= {} ", opMessageExt);
            }
        }//for end
        log.debug("Remove map: {}", removeMap);
        log.debug("Done op list: {}", doneOpOffset);
        return pullResult;
    }

在op half上的訊息都是被commit/rollback處理過的訊息,那麼都儲存到removeMap,為什麼還要有doneOpOffset呢?是為了減少訊息的判斷,為了過濾,如果op half對應的prepare訊息,那說明prepare不僅被處理過了,而且被回查過了,不再需要參與判斷了。這個理解有些難,參考https://itzones.cn/2019/07/09/RocketMQ事務訊息/ ,該文章有圖很能說明:

接著看org.apache.rocketmq.broker.transaction.AbstractTransactionalMessageCheckListener.resolveHalfMsg(MessageExt),該方法非同步傳送命令CHECK_TRANSACTION_STATE到producer查詢事務狀態,對應的producer處理器是ClientRemotingProcessor,最終由TransactionMQProducer.checkExecutor執行緒池執行task,查詢事務的狀態,總體流程如下圖。

producer端流程和程式碼比較簡單,需要TransactionMQProducer設定執行緒池處理接收、事務監聽器TransactionListener處理回查事務狀態,開發人員需要自己實現事務監聽器來回查事務執行狀態。

有個點需要注意,在broker發producer進行回查的方法org.apache.rocketmq.broker.transaction.AbstractTransactionalMessageCheckListener.sendCheckMessage(MessageExt)內,並不一定實際是傳送給prepare訊息的生產的那個producer(具體程式碼是Channel channel = brokerController.getProducerManager().getAvaliableChannel(groupId);),它是通過broker儲存的producerGroup內選擇一個producer進行回查(通常producer也是一組叢集),因此producer端事務狀態和transactionId需要儲存在db or redis等,這樣才可以被同組內的其它producer查詢到事務狀態。

還有個點需要注意,對於prepare訊息是傳送到了broker1,那麼commit訊息也是要發broker1才行,在org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.endTransaction(SendResult, LocalTransactionState, Throwable)方法內的final String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(sendResult.getMessageQueue().getBrokerName());設定的,而sendResult又是prepare訊息的傳送結果,因此保證了commit/rollback訊息都是發的同一臺broker,不然broker無法回查了。需要注意的是prepare預設傳送失敗的情況下跟普通訊息一樣最大重發3次,但是commit/rollback只發送一次,傳送失敗了,只能由broker回查決定了。

producer傳送的commit訊息就屬於正常待消費的訊息了,客戶端可以選擇順序/併發消費都行,消費和普通訊息消費沒有不同。

考慮幾個事務訊息的異常狀態:

1.preprare訊息傳送成功,本地事務執行成功,但是producer宕機

該情況broker會進行回查事務狀態,從而提交事務,傳送訊息給下游系統。

2.preprare訊息傳送成功,本地事務執行過程中producer宕機了

事務執行過程宕機了,那麼資料庫自動會回滾事務,事務就是沒執行成功,因此broker回查從而刪除preprae訊息。

3.preprare訊息傳送成功,本地事務執行成功,但是傳送commit訊息給broker失敗(傳送給prepare訊息接收的那臺broker),因為broker宕機?

啟動該broker,broker回查到事務執行成功,從而提交訊息,傳送訊息給下游系統進行消費,該情況會導致下游有長時間延遲才收到訊息消費。

4.客戶端消費訊息失敗了,怎麼辦?

rocketmq給出的方案是人工解決,這樣的情況不能多,如果多了,需要優化業務和程式碼,實際用rocketmq的事務訊息,客戶端消費失敗情況是少的,比如扣庫存動作,基本都是成功的。客戶端消費失敗的情況通常是通過對賬根據業務情況解決。

來段網上總結的話,自己對這種文字性總結總是說的不好,感覺自己說的比較大白話,別人說的更加專業:

使用rocketmq來保證分散式事務屬於訊息一致性方案,通過訊息中介軟體保證上、下游應用資料操作的一致性。基本思路是將本地操作和傳送訊息放在一個事務中,保證本地操作和訊息傳送要麼兩者都成功或者都失敗。下游應用向訊息系統訂閱該訊息,收到訊息後執行相應操作。
訊息方案從本質上講是將分散式事務轉換為兩個本地事務,然後依靠下游業務的重試機制達到最終一致性。基於訊息的最終一致性方案對應用侵入性也很高,應用需要進行大量業務改造,成本較高。