1. 程式人生 > >RocketMQ學習筆記二之【DefaultMQPushConsumer使用與流程原理分析】

RocketMQ學習筆記二之【DefaultMQPushConsumer使用與流程原理分析】

版本:


        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-client</artifactId>
            <version>4.3.0</version>
        </dependency>

首先看下DefaultMQPushConsumer使用示例:

package com.swk.springboot.rocketmq;
import java.util.List;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
public class MQPushConsumer {
    public static void main(String[] args) throws MQClientException {
        String groupName = "rocketMqGroup1";
        // 用於把多個Consumer組織到一起,提高併發處理能力
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(groupName);
        // 設定nameServer地址,多個以;分隔
        consumer.setNamesrvAddr("name-serverl-ip:9876;name-server2-ip:9876");
        /**
         * 1. CONSUME_FROM_LAST_OFFSET:第一次啟動從佇列最後位置消費,後續再啟動接著上次消費的進度開始消費 
           2. CONSUME_FROM_FIRST_OFFSET:第一次啟動從佇列初始位置消費,後續再啟動接著上次消費的進度開始消費 
           3. CONSUME_FROM_TIMESTAMP:第一次啟動從指定時間點位置消費,後續再啟動接著上次消費的進度開始消費 
                以上所說的第一次啟動是指從來沒有消費過的消費者,如果該消費者消費過,那麼會在broker端記錄該消費者的消費位置,如果該消費者掛了再啟動,那麼自動從上次消費的進度開始
         */
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
        /**
         * CLUSTERING:預設模式,同一個ConsumerGroup(groupName相同)每個consumer只消費所訂閱訊息的一部分內容,同一個ConsumerGroup裡所有的Consumer訊息加起來才是所
         *  訂閱topic整體,從而達到負載均衡的目的
         * BROADCASTING:同一個ConsumerGroup每個consumer都消費到所訂閱topic所有訊息,也就是一個消費會被多次分發,被多個consumer消費。
         * 
         */
        consumer.setMessageModel(MessageModel.BROADCASTING);
        // 訂閱topic,可以對指定訊息進行過濾,例如:"TopicTest","tagl||tag2||tag3",*或null表示topic所有訊息
        consumer.subscribe("order-topic", "*");
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> mgs,
                    ConsumeConcurrentlyContext consumeconcurrentlycontext) {
                System.out.println(Thread.currentThread().getName()+"Receive New Messages:"+mgs);
                // ConsumeConcurrentlyStatus.RECONSUME_LATER boker會根據設定的messageDelayLevel發起重試,預設16次
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        consumer.start();
    }
    
}


DefaultMQPushConsumerImpl中各個物件的主要功能如下:
RebalancePushImpl:主要負責決定,當前的consumer應該從哪些Queue中消費訊息;
1)PullAPIWrapper:長連線,負責從broker處拉取訊息,然後利用ConsumeMessageService回撥使用者的Listener執行訊息消費邏輯;
2)ConsumeMessageService:實現所謂的"Push-被動"消費機制;從Broker拉取的訊息後,封裝成ConsumeRequest提交給ConsumeMessageSerivce,此service負責回撥使用者的Listener消費訊息;
3)OffsetStore:維護當前consumer的消費記錄(offset);有兩種實現,Local和Rmote,Local儲存在本地磁碟上,適用於BROADCASTING廣播消費模式;而Remote則將消費進度儲存在Broker上,適用於CLUSTERING叢集消費模式;
4)MQClientFactory:負責管理client(consumer、producer),並提供多中功能介面供各個Service(Rebalance、PullMessage等)呼叫;大部分邏輯均在這個類中完成;
下面我們來分析下consumer.registerMessageListener執行過程:(RocketMQ的原始碼基本上沒有註釋,閱讀起來有點費勁)

/**
     * Register a callback to execute on message arrival for concurrent consuming.
     *
     * @param messageListener message handling callback.
     */
    @Override
    public void registerMessageListener(MessageListenerConcurrently messageListener) {
        this.messageListener = messageListener;
        this.defaultMQPushConsumerImpl.registerMessageListener(messageListener);
    }
通過原始碼可以看出主要實現過程在DefaultMQPushConsumerImpl類中,consumer.start後呼叫DefaultMQPushConsumerImpl的同步start方法
 public synchronized void start() throws MQClientException {
        switch (this.serviceState) {
            case CREATE_JUST:
                log.info("the consumer [{}] start beginning. messageModel={}, isUnitMode={}", this.defaultMQPushConsumer.getConsumerGroup(),
                    this.defaultMQPushConsumer.getMessageModel(), this.defaultMQPushConsumer.isUnitMode());
                this.serviceState = ServiceState.START_FAILED;
                this.checkConfig();
                this.copySubscription();
                if (this.defaultMQPushConsumer.getMessageModel() == MessageModel.CLUSTERING) {
                    this.defaultMQPushConsumer.changeInstanceNameToPID();
                }
                this.mQClientFactory = MQClientManager.getInstance().getAndCreateMQClientInstance(this.defaultMQPushConsumer, this.rpcHook);
                this.rebalanceImpl.setConsumerGroup(this.defaultMQPushConsumer.getConsumerGroup());
                this.rebalanceImpl.setMessageModel(this.defaultMQPushConsumer.getMessageModel());
                this.rebalanceImpl.setAllocateMessageQueueStrategy(this.defaultMQPushConsumer.getAllocateMessageQueueStrategy());
                this.rebalanceImpl.setmQClientFactory(this.mQClientFactory);
                this.pullAPIWrapper = new PullAPIWrapper(
                    mQClientFactory,
                    this.defaultMQPushConsumer.getConsumerGroup(), isUnitMode());
                this.pullAPIWrapper.registerFilterMessageHook(filterMessageHookList);
                if (this.defaultMQPushConsumer.getOffsetStore() != null) {
                    this.offsetStore = this.defaultMQPushConsumer.getOffsetStore();
                } else {
                    switch (this.defaultMQPushConsumer.getMessageModel()) {
                        case BROADCASTING:
                            this.offsetStore = new LocalFileOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());
                            break;
                        case CLUSTERING:
                            this.offsetStore = new RemoteBrokerOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());
                            break;
                        default:
                            break;
                    }
                    this.defaultMQPushConsumer.setOffsetStore(this.offsetStore);
                }
                this.offsetStore.load();
                if (this.getMessageListenerInner() instanceof MessageListenerOrderly) {
                    this.consumeOrderly = true;
                    this.consumeMessageService =
                        new ConsumeMessageOrderlyService(this, (MessageListenerOrderly) this.getMessageListenerInner());
                } else if (this.getMessageListenerInner() instanceof MessageListenerConcurrently) {
                    this.consumeOrderly = false;
                    this.consumeMessageService =
                        new ConsumeMessageConcurrentlyService(this, (MessageListenerConcurrently) this.getMessageListenerInner());
                }
                this.consumeMessageService.start();
                boolean registerOK = mQClientFactory.registerConsumer(this.defaultMQPushConsumer.getConsumerGroup(), this);
                if (!registerOK) {
                    this.serviceState = ServiceState.CREATE_JUST;
                    this.consumeMessageService.shutdown();
                    throw new MQClientException("The consumer group[" + this.defaultMQPushConsumer.getConsumerGroup()
                        + "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),
                        null);
                }
                mQClientFactory.start();
                log.info("the consumer [{}] start OK.", this.defaultMQPushConsumer.getConsumerGroup());
                this.serviceState = ServiceState.RUNNING;
                break;
            case RUNNING:
            case START_FAILED:
            case SHUTDOWN_ALREADY:
                throw new MQClientException("The PushConsumer service state not OK, maybe started once, "
                    + this.serviceState
                    + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),
                    null);
            default:
                break;
        }
        this.updateTopicSubscribeInfoWhenSubscriptionChanged();
        this.mQClientFactory.checkClientInBroker();
        this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
        this.mQClientFactory.rebalanceImmediately();
    }

略過一些細節設定和校驗,通過mQClientFactory.start();發我們發現他呼叫

public void start() throws MQClientException {
        synchronized (this) {
            switch (this.serviceState) {
                case CREATE_JUST:
                    this.serviceState = ServiceState.START_FAILED;
                    // If not specified,looking address from name server
                    if (null == this.clientConfig.getNamesrvAddr()) {
                        this.mQClientAPIImpl.fetchNameServerAddr();
                    }
                    // Start request-response channel
                    this.mQClientAPIImpl.start();
                    // Start various schedule tasks
                    this.startScheduledTask();
                    // Start pull service
                    this.pullMessageService.start();
                    // Start rebalance service
                    this.rebalanceService.start();
                    // Start push service
                    this.defaultMQProducer.getDefaultMQProducerImpl().start(false);
                    log.info("the client factory [{}] start OK", this.clientId);
                    this.serviceState = ServiceState.RUNNING;
                    break;
                case RUNNING:
                    break;
                case SHUTDOWN_ALREADY:
                    break;
                case START_FAILED:
                    throw new MQClientException("The Factory object[" + this.getClientId() + "] has been created before, and failed.", null);
                default:
                    break;
            }
        }
    }

在這個方法中有多個start,我們主要看pullMessageService.start();通過這裡我們發現RocketMQ的Push模式底層其實也是通過pull實現的,下面我們來看下pullMessageService處理了哪些邏輯:

private void pullMessage(final PullRequest pullRequest) {
        final MQConsumerInner consumer = this.mQClientFactory.selectConsumer(pullRequest.getConsumerGroup());
        if (consumer != null) {
            DefaultMQPushConsumerImpl impl = (DefaultMQPushConsumerImpl) consumer;
            impl.pullMessage(pullRequest);
        } else {
            log.warn("No matched consumer for the PullRequest {}, drop it", pullRequest);
        }
    }

我們發現其實他還是通過DefaultMQPushConsumerImpl類的pullMessage方法來進行訊息的邏輯處理,這部分程式碼較多,我們只分析一些關鍵的步驟
【訊息拉取入口】

/**
* PullRequest這裡說明一下,上面我們已經提了一下rocketmq的push模式其實是通過pull模式封裝實現的,pullrequest這裡是通過長輪詢的方式達到push效果。長輪詢方式既有pull的優點又有push模式的實時性有點。
push方式是server端接收到訊息後,主動把訊息推送給client端,實時性高。弊端是server端工作量大,影響效能,其次是client端處理能力不同且client端的狀態不受server端的控制,如果client端不能及時處理訊息容易導致訊息堆積已經影響正常業務等。
pull方式是client迴圈從server端拉取訊息,主動權在client端,自己處理完一個訊息再去拉取下一個,缺點是迴圈的時間不好設定,時間太短容易忙等,浪費CPU資源,時間間隔太長client的處理能力會下降,有時候有些訊息會處理不及時。
長輪詢的方式可以結合兩者優點
1、檢查PullRequest物件中的ProcessQueue物件的dropped是否為true(在RebalanceService執行緒中為topic下的MessageQueue建立拉取訊息請求時要維護對應的ProcessQueue物件,若Consumer不再訂閱該topic則會將該物件的dropped置為true);若是則認為該請求是已經取消的,則直接跳出該方法;
2、更新PullRequest物件中的ProcessQueue物件的時間戳(ProcessQueue.lastPullTimestamp)為當前時間戳;
3、檢查該Consumer是否執行中,即DefaultMQPushConsumerImpl.serviceState是否為RUNNING;若不是執行狀態或者是暫停狀態(DefaultMQPushConsumerImpl.pause=true),則呼叫PullMessageService.executePullRequestLater(PullRequest pullRequest, long timeDelay)方法延遲再拉取訊息,其中timeDelay=3000;該方法的目的是在3秒之後再次將該PullRequest物件放入PullMessageService. pullRequestQueue佇列中;並跳出該方法;
4、進行流控。若ProcessQueue物件的msgCount大於了消費端的流控閾值(DefaultMQPushConsumer.pullThresholdForQueue,預設值為1000),則呼叫PullMessageService.executePullRequestLater方法,在50毫秒之後重新該PullRequest請求放入PullMessageService.pullRequestQueue佇列中;並跳出該方法;
5、若不是順序消費(即DefaultMQPushConsumerImpl.consumeOrderly等於false),則檢查ProcessQueue物件的msgTreeMap:TreeMap<Long,MessageExt>變數的第一個key值與最後一個key值之間的差額,該key值表示查詢的佇列偏移量queueoffset;若差額大於閾值(由DefaultMQPushConsumer. consumeConcurrentlyMaxSpan指定,預設是2000),則呼叫PullMessageService.executePullRequestLater方法,在50毫秒之後重新將該PullRequest請求放入PullMessageService.pullRequestQueue佇列中;並跳出該方法;
6、以PullRequest.messageQueue物件的topic值為引數從RebalanceImpl.subscriptionInner: ConcurrentHashMap, SubscriptionData>中獲取對應的SubscriptionData物件,若該物件為null,考慮到併發的關係,呼叫executePullRequestLater方法,稍後重試;並跳出該方法;
7、若訊息模型為叢集模式(RebalanceImpl.messageModel等於CLUSTERING),則以PullRequest物件的MessageQueue變數值、type =READ_FROM_MEMORY(從記憶體中獲取消費進度offset值)為引數呼叫DefaultMQPushConsumerImpl. offsetStore物件(初始化為RemoteBrokerOffsetStore物件)的readOffset(MessageQueue mq, ReadOffsetType type)方法從本地記憶體中獲取消費進度offset值。若該offset值大於0 則置臨時變數commitOffsetEnable等於true否則為false;該offset值作為pullKernelImpl方法中的commitOffset引數,在Broker端拉取訊息之後根據commitOffsetEnable引數值決定是否用該offset更新訊息進度。該readOffset方法的邏輯是:以入參MessageQueue物件從RemoteBrokerOffsetStore.offsetTable:ConcurrentHashMap <MessageQueue,AtomicLong>變數中獲取消費進度偏移量;若該偏移量不為null則返回該值,否則返回-1;
8、當每次拉取訊息之後需要更新訂閱關係(由DefaultMQPushConsumer. postSubscriptionWhenPull引數表示,預設為false)並且以topic值引數從RebalanceImpl.subscriptionInner獲取的SubscriptionData物件的classFilterMode等於false(預設為false),則將sysFlag標記的第3個位元組置為1,否則該位元組置為0;
9、該sysFlag標記的第1個位元組置為commitOffsetEnable的值;第2個位元組(suspend標記)置為1;第4個位元組置為classFilterMode的值;
10、 初始化匿名內部類PullCallback,實現了onSucess/onException方法; 該方法只有在非同步請求的情況下才會回撥;
11、呼叫底層的拉取訊息API介面:PullAPIWrapper.pullKernelImpl(MessageQueue mq, String subExpression, long subVersion,long offset, int maxNums, int sysFlag,long commitOffset,long brokerSuspendMaxTimeMillis, long timeoutMillis, CommunicationMode communicationMode, PullCallback pullCallback)方法進行訊息拉取操作。將回調類PullCallback傳入該方法中,當採用非同步方式拉取訊息時,在收到響應之後會回撥該回調類的方法。
**/
public void pullMessage(final PullRequest pullRequest) {
        final ProcessQueue processQueue = pullRequest.getProcessQueue();
        if (processQueue.isDropped()) {
            log.info("the pull request[{}] is dropped.", pullRequest.toString());
            return;
        }
        pullRequest.getProcessQueue().setLastPullTimestamp(System.currentTimeMillis());
        try {
            this.makeSureStateOK();
        } catch (MQClientException e) {
            log.warn("pullMessage exception, consumer state not ok", e);
            this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION);
            return;
        }
        if (this.isPause()) {
            log.warn("consumer was paused, execute pull request later. instanceName={}, group={}", this.defaultMQPushConsumer.getInstanceName(), this.defaultMQPushConsumer.getConsumerGroup());
            this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_SUSPEND);
            return;
        }
        long cachedMessageCount = processQueue.getMsgCount().get();
        long cachedMessageSizeInMiB = processQueue.getMsgSize().get() / (1024 * 1024);
        if (cachedMessageCount > this.defaultMQPushConsumer.getPullThresholdForQueue()) {
            this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);
            if ((queueFlowControlTimes++ % 1000) == 0) {
                log.warn(
                    "the cached message count exceeds the threshold {}, so do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB, pullRequest={}, flowControlTimes={}",
                    this.defaultMQPushConsumer.getPullThresholdForQueue(), processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), cachedMessageCount, cachedMessageSizeInMiB, pullRequest, queueFlowControlTimes);
            }
            return;
        }
        if (cachedMessageSizeInMiB > this.defaultMQPushConsumer.getPullThresholdSizeForQueue()) {
            this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);
            if ((queueFlowControlTimes++ % 1000) == 0) {
                log.warn(
                    "the cached message size exceeds the threshold {} MiB, so do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB, pullRequest={}, flowControlTimes={}",
                    this.defaultMQPushConsumer.getPullThresholdSizeForQueue(), processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), cachedMessageCount, cachedMessageSizeInMiB, pullRequest, queueFlowControlTimes);
            }
            return;
        }
        if (!this.consumeOrderly) {
            if (processQueue.getMaxSpan() > this.defaultMQPushConsumer.getConsumeConcurrentlyMaxSpan()) {
                this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);
                if ((queueMaxSpanFlowControlTimes++ % 1000) == 0) {
                    log.warn(
                        "the queue's messages, span too long, so do flow control, minOffset={}, maxOffset={}, maxSpan={}, pullRequest={}, flowControlTimes={}",
                        processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), processQueue.getMaxSpan(),
                        pullRequest, queueMaxSpanFlowControlTimes);
                }
                return;
            }
        } else {
            if (processQueue.isLocked()) {
                if (!pullRequest.isLockedFirst()) {
                    final long offset = this.rebalanceImpl.computePullFromWhere(pullRequest.getMessageQueue());
                    boolean brokerBusy = offset < pullRequest.getNextOffset();
                    log.info("the first time to pull message, so fix offset from broker. pullRequest: {} NewOffset: {} brokerBusy: {}",
                        pullRequest, offset, brokerBusy);
                    if (brokerBusy) {
                        log.info("[NOTIFYME]the first time to pull message, but pull request offset larger than broker consume offset. pullRequest: {} NewOffset: {}",
                            pullRequest, offset);
                    }
                    pullRequest.setLockedFirst(true);
                    pullRequest.setNextOffset(offset);
                }
            } else {
                this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION);
                log.info("pull message later because not locked in broker, {}", pullRequest);
                return;
            }
        }
        final SubscriptionData subscriptionData = this.rebalanceImpl.getSubscriptionInner().get(pullRequest.getMessageQueue().getTopic());
        if (null == subscriptionData) {
            this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION);
            log.warn("find the consumer's subscription failed, {}", pullRequest);
            return;
        }
        final long beginTimestamp = System.currentTimeMillis();
        PullCallback pullCallback = new PullCallback() {
            @Override
            public void onSuccess(PullResult pullResult) {
                if (pullResult != null) {
                    pullResult = DefaultMQPushConsumerImpl.this.pullAPIWrapper.processPullResult(pullRequest.getMessageQueue(), pullResult,
                        subscriptionData);
                    switch (pullResult.getPullStatus()) {
                        case FOUND:
                            long prevRequestOffset = pullRequest.getNextOffset();
                            pullRequest.setNextOffset(pullResult.getNextBeginOffset());
                            long pullRT = System.currentTimeMillis() - beginTimestamp;
                            DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullRT(pullRequest.getConsumerGroup(),
                                pullRequest.getMessageQueue().getTopic(), pullRT);
                            long firstMsgOffset = Long.MAX_VALUE;
                            if (pullResult.getMsgFoundList() == null || pullResult.getMsgFoundList().isEmpty()) {
                                DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
                            } else {
                                firstMsgOffset = pullResult.getMsgFoundList().get(0).getQueueOffset();
                                DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullTPS(pullRequest.getConsumerGroup(),
                                    pullRequest.getMessageQueue().getTopic(), pullResult.getMsgFoundList().size());
                                boolean dispatchToConsume = processQueue.putMessage(pullResult.getMsgFoundList());
                                DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(
                                    pullResult.getMsgFoundList(),
                                    processQueue,
                                    pullRequest.getMessageQueue(),
                                    dispatchToConsume);
                                if (DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval() > 0) {
                                    DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest,
                                        DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval());
                                } else {
                                    DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
                                }
                            }
                            if (pullResult.getNextBeginOffset() < prevRequestOffset
                                || firstMsgOffset < prevRequestOffset) {
                                log.warn(
                                    "[BUG] pull message result maybe data wrong, nextBeginOffset: {} firstMsgOffset: {} prevRequestOffset: {}",
                                    pullResult.getNextBeginOffset(),
                                    firstMsgOffset,
                                    prevRequestOffset);
                            }
                            break;
                        case NO_NEW_MSG:
                            pullRequest.setNextOffset(pullResult.getNextBeginOffset());
                            DefaultMQPushConsumerImpl.this.correctTagsOffset(pullRequest);
                            DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
                            break;
                        case NO_MATCHED_MSG:
                            pullRequest.setNextOffset(pullResult.getNextBeginOffset());
                            DefaultMQPushConsumerImpl.this.correctTagsOffset(pullRequest);
                            DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
                            break;
                        case OFFSET_ILLEGAL:
                            log.warn("the pull request offset illegal, {} {}",
                                pullRequest.toString(), pullResult.toString());
                            pullRequest.setNextOffset(pullResult.getNextBeginOffset());
                            pullRequest.getProcessQueue().setDropped(true);
                            DefaultMQPushConsumerImpl.this.executeTaskLater(new Runnable() {
                                @Override
                                public void run() {
                                    try {
                                        DefaultMQPushConsumerImpl.this.offsetStore.updateOffset(pullRequest.getMessageQueue(),
                                            pullRequest.getNextOffset(), false);
                                        DefaultMQPushConsumerImpl.this.offsetStore.persist(pullRequest.getMessageQueue());
                                        DefaultMQPushConsumerImpl.this.rebalanceImpl.removeProcessQueue(pullRequest.getMessageQueue());
                                        log.warn("fix the pull request offset, {}", pullRequest);
                                    } catch (Throwable e) {
                                        log.error("executeTaskLater Exception", e);
                                    }
                                }
                            }, 10000);
                            break;
                        default:
                            break;
                    }
                }
            }
            @Override
            public void onException(Throwable e) {
                if (!pullRequest.getMessageQueue().getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
                    log.warn("execute the pull request exception", e);
                }
                DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION);
            }
        };
        boolean commitOffsetEnable = false;
        long commitOffsetValue = 0L;
        if (MessageModel.CLUSTERING == this.defaultMQPushConsumer.getMessageModel()) {
            commitOffsetValue = this.offsetStore.readOffset(pullRequest.getMessageQueue(), ReadOffsetType.READ_FROM_MEMORY);
            if (commitOffsetValue > 0) {
                commitOffsetEnable = true;
            }
        }
        String subExpression = null;
        boolean classFilter = false;
        SubscriptionData sd = this.rebalanceImpl.getSubscriptionInner().get(pullRequest.getMessageQueue().getTopic());
        if (sd != null) {
            if (this.defaultMQPushConsumer.isPostSubscriptionWhenPull() && !sd.isClassFilterMode()) {
                subExpression = sd.getSubString();
            }
            classFilter = sd.isClassFilterMode();
        }
        int sysFlag = PullSysFlag.buildSysFlag(
            commitOffsetEnable, // commitOffset
            true, // suspend
            subExpression != null, // subscription
            classFilter // class filter
        );
        try {
            // 下面我們看繼續跟進這個方法,這個方法已經就是客戶端如何拉取訊息
            this.pullAPIWrapper.pullKernelImpl(
                pullRequest.getMessageQueue(),
                subExpression,
                subscriptionData.getExpressionType(),
                subscriptionData.getSubVersion(),
                pullRequest.getNextOffset(),
                this.defaultMQPushConsumer.getPullBatchSize(),
                sysFlag,
                commitOffsetValue,
                BROKER_SUSPEND_MAX_TIME_MILLIS,
                CONSUMER_TIMEOUT_MILLIS_WHEN_SUSPEND,
                // 訊息的通訊方式為非同步
                CommunicationMode.ASYNC,
                pullCallback
            );
        } catch (Exception e) {
            log.error("pullKernelImpl exception", e);
            this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION);
        }
    }

【拉取訊息底層API】

**
1、獲取Broker的ID。以入參MessageQueue物件為引數呼叫PullAPIWrapper.recalculatePullFromWhichNode(MessageQueue mq)方法,在該方法中,先判斷PullAPIWrapper.connectBrokerByUser變數是否為true(在FiltersrvController中啟動時會設定為true,預設為false),若是則直接返回0(主用Broker的brokerId);否則以MessageQueue物件為引數從PullAPIWrapper.pullFromWhichNodeTable:ConcurrentHashMap<MessageQueue, AtomicLong獲取brokerId,若該值不為null則返回該值,否則返回0(主用Broker的brokerId);
2、呼叫MQClientInstance.findBrokerAddressInSubscribe(String brokerName ,long brokerId,boolean onlyThisBroker) 方法查詢Broker地址,其中onlyThisBroker=false,表示若指定的brokerId未獲取到地址則獲取其他BrokerId的地址也行。在該方法中根據brokerName和brokerId引數從MQClientInstance.brokerAddrTable: ConcurrentHashMap<, HashMap變數中獲取對應的Broker地址,若獲取不到則從brokerName下面的Map列表中找其他地址返回即可;
3、若在上一步未獲取到Broker地址,則以topic引數呼叫MQClientInstance.updateTopicRouteInfoFromNameServer(String topic)方法,然後在執行第2步的操作,直到獲取到Broker地址為止;
4、若獲取的Broker地址是備用Broker,則將標記位sysFlag的第1個位元組置為0,即在消費完之後不提交消費進度;
5、檢查標記位sysFlag的第4個位元組(即SubscriptionData. classFilterMode)是否為1;若等於1,則呼叫PullAPIWrapper.computPullFromWhichFilterServer(String topic, String brokerAddr)方法獲取Filter伺服器地址。大致邏輯如下:
5.1)根據topic引數值從MQClientInstance.topicRouteTable: ConcurrentHashMapTopicRouteData>變數中獲取TopicRouteData物件,
5.2)以Broker地址為引數從該TopicRouteData物件的filterServerTable:HashMap變數中獲取該Broker下面的所有Filter伺服器地址列表;
5.3)若該地址列表不為空,則隨機選擇一個Filter伺服器地址返回;否則向呼叫層丟擲異常,該pullKernelImpl方法結束;
6、構建PullMessageRequestHeader物件,其中queueOffset變數值等於入參offset;
7、若執行了第5步則向獲取的Filter伺服器傳送PULL_MESSAGE請求資訊,否則向Broker傳送PULL_MESSAGE請求資訊。初始化PullMessageRequestHeader物件,然後呼叫MQClientAPIImpl.pullMessage(String addr, PullMessageRequestHeader requestHeader, long timeoutMillis, CommunicationMode communicationMode, PullCallback pullCallback)方法向Broker地址或者Filter地址傳送PULL_MESSAGE請求資訊
**/
public PullResult pullKernelImpl(
        final MessageQueue mq,
        final String subExpression,
        final String expressionType,
        final long subVersion,
        final long offset,
        final int maxNums,
        final int sysFlag,
        final long commitOffset,
        final long brokerSuspendMaxTimeMillis,
        final long timeoutMillis,
        final CommunicationMode communicationMode,
        final PullCallback pullCallback
    ) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
        FindBrokerResult findBrokerResult =
            this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(),
                this.recalculatePullFromWhichNode(mq), false);
        if (null == findBrokerResult) {
            this.mQClientFactory.updateTopicRouteInfoFromNameServer(mq.getTopic());
            findBrokerResult =
                this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(),
                    this.recalculatePullFromWhichNode(mq), false);
        }
        if (findBrokerResult != null) {
            {
                // check version
                if (!ExpressionType.isTagType(expressionType)
                    && findBrokerResult.getBrokerVersion() < MQVersion.Version.V4_1_0_SNAPSHOT.ordinal()) {
                    throw new MQClientException("The broker[" + mq.getBrokerName() + ", "
                        + findBrokerResult.getBrokerVersion() + "] does not upgrade to support for filter message by " + expressionType, null);
                }
            }
            int sysFlagInner = sysFlag;
            if (findBrokerResult.isSlave()) {
                sysFlagInner = PullSysFlag.clearCommitOffsetFlag(sysFlagInner);
            }
            PullMessageRequestHeader requestHeader = new PullMessageRequestHeader();
            requestHeader.setConsumerGroup(this.consumerGroup);
            requestHeader.setTopic(mq.getTopic());
            requestHeader.setQueueId(mq.getQueueId());
            requestHeader.setQueueOffset(offset);
            requestHeader.setMaxMsgNums(maxNums);
            requestHeader.setSysFlag(sysFlagInner);
            requestHeader.setCommitOffset(commitOffset);
            // 設定broker的最長阻塞時間,預設是15秒,broker只有在沒有訊息的時候才會阻塞,如果阻塞超過設定時間會返回null,如果有訊息會立即返回
            requestHeader.setSuspendTimeoutMillis(brokerSuspendMaxTimeMillis);
            requestHeader.setSubscription(subExpression);
            requestHeader.setSubVersion(subVersion);
            requestHeader.setExpressionType(expressionType);
            String brokerAddr = findBrokerResult.getBrokerAddr();
            if (PullSysFlag.hasClassFilterFlag(sysFlagInner)) {
                brokerAddr = computPullFromWhichFilterServer(mq.getTopic(), brokerAddr);
            }
            PullResult pullResult = this.mQClientFactory.getMQClientAPIImpl().pullMessage(
                brokerAddr,
                requestHeader,
                timeoutMillis,
                communicationMode,
                pullCallback);
            return pullResult;
        }
        throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist", null);
    }

【傳送遠端請求拉取訊息】

/**
在MQClientAPIImpl.pullMessage方法中,根據入參communicationMode的值分為非同步拉取和同步拉取方式兩種。
無論是非同步方式拉取還是同步方式拉取,在傳送拉取請求之前都會構造一個ResponseFuture物件,以請求訊息的序列號為key值,存入NettyRemotingAbstract.responseTable:ConcurrentHashMap, ResponseFuture>變數中,對該變數有幾種情況會處理:
1、傳送失敗後直接刪掉responseTable變數中的相應記錄;
2、收到響應訊息之後,會以響應訊息中的序列號(由服務端根據請求訊息的序列號原樣返回)從responseTable中查詢ResponseFuture物件,並設定該物件的responseCommand變數。若是同步傳送會喚醒等待響應的ResponseFuture.waitResponse方法;若是非同步傳送會呼叫ResponseFuture.executeInvokeCallback()方法完成回撥邏輯處理;
3、在NettyRemotingClient.start()啟動時,也會初始化定時任務,該定時任務每隔1秒定期掃描responseTable列表,遍歷該列表中的ResponseFuture物件,檢查等待響應是否超時,若超時,則呼叫ResponseFuture. executeInvokeCallback()方法,並將該物件從responseTable列表中刪除;
**/
public PullResult pullMessage(
        final String addr,
        final PullMessageRequestHeader requestHeader,
        final long timeoutMillis,
        final CommunicationMode communicationMode,
        final PullCallback pullCallback
    ) throws RemotingException, MQBrokerException, InterruptedException {
        RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.PULL_MESSAGE, requestHeader);
        switch (communicationMode) {
            case ONEWAY:
                assert false;
                return null;
            case ASYNC:
                this.pullMessageAsync(addr, request, timeoutMillis, pullCallback);
                return null;
            case SYNC:
                return this.pullMessageSync(addr, request, timeoutMillis);
            default:
                assert false;
                break;
        }
        return null;
    }

【同步拉取】

/**
對於同步傳送方式,呼叫MQClientAPIImpl.pullMessageSync(String addr, RemotingCommand request, long timeoutMillis)方法。大致步驟如下:
1、呼叫RemotingClient.invokeSync(String addr, RemotingCommand request, long timeoutMillis)方法:
1.1)獲取Broker地址的Channel資訊。根據broker地址從RemotingClient.channelTables:ConcurrentHashMap, ChannelWrapper>變數中獲取ChannelWrapper物件並返回該物件的Channel變數;若沒有ChannelWrapper物件則與broker地址建立新的連線並將連線資訊存入channelTables變數中,便於下次使用;
1.2)若NettyRemotingClient.rpcHook:RPCHook變數不為空(該變數在應用層初始化DefaultMQPushConsumer或者DefaultMQPullConsumer物件傳入該值),則呼叫RPCHook.doBeforeRequest(String remoteAddr, RemotingCommand request)方法;
1.3)呼叫NettyRemotingAbstract.invokeSyncImpl(Channel channel, RemotingCommand request, long timeoutMillis)方法,該方法的邏輯如下:
A)使用請求的序列號(opaue)、超時時間初始化ResponseFuture物件;並將該ResponseFuture物件存入NettyRemotingAbstract.responseTable: ConcurrentHashMap變數中;
B)呼叫Channel.writeAndFlush(Object msg)方法將請求物件RemotingCommand傳送給Broker;然後呼叫addListener(GenericFutureListener<? extends Future<? super Void>> listener)方法新增內部匿名類:該內部匿名類實現了ChannelFutureListener介面的operationComplete方法,在傳送完成之後回撥該監聽類的operationComplete方法,在該方法中,首先呼叫ChannelFuture. isSuccess()方法檢查是否傳送成功,若成功則置ResponseFuture物件的sendRequestOK等於true並退出此回撥方法等待響應結果;若不成功則置ResponseFuture物件的sendRequestOK等於false,然後從NettyRemotingAbstract.responseTable中刪除此請求序列號(opaue)的記錄,置ResponseFuture物件的responseCommand等於null,並喚醒ResponseFuture.waitResponse(long timeoutMillis)方法的等待;
C)呼叫ResponseFuture.waitResponse(long timeoutMillis)方法等待響應結果;在傳送失敗或者收到響應訊息(詳見5.10.3小節)或者超時的情況下會喚醒該方法返回ResponseFuture.responseCommand變數值;
D)若上一步返回的responseCommand值為null,則丟擲異常:若ResponseFuture.sendRequestOK為true,則丟擲RemotingTimeoutException異常,否則丟擲RemotingSendRequestException異常;
E)若上一步返回的responseCommand值不為null,則返回responseCommand變數值;
1.4)若NettyRemotingClient.rpcHook: RPCHook變數不為空,則呼叫RPCHook.doAfterResponse(String remoteAddr, RemotingCommand request)方法;
2、以上一步的返回值RemotingCommand物件為引數呼叫MQClientAPIImpl. processPullResponse (RemotingCommand response)方法將返回物件解析並封裝成PullResultExt物件然後返回給呼叫者,響應訊息的結果狀態轉換如下:
2.1)若RemotingCommand物件的Code等於SUCCESS,則PullResultExt.pullStatus=FOUND;
2.2)若RemotingCommand物件的Code等於PULL_NOT_FOUND,則PullResultExt.pullStatus= NO_NEW_MSG;
2.3)若RemotingCommand物件的Code等於PULL_RETRY_IMMEDIATELY,則PullResultExt.pullStatus= NO_MATCHED_MSG;
2.3)若RemotingCommand物件的Code等於PULL_OFFSET_MOVED,則PullResultExt.pullStatus= OFFSET_ILLEGAL;
**/
 @Override
    public RemotingCommand invokeSync(String addr, final RemotingCommand request, long timeoutMillis)
        throws InterruptedException, RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException {
        long beginStartTime = System.currentTimeMillis();
        final Channel channel = this.getAndCreateChannel(addr);
        if (channel != null && channel.isActive()) {
            try {
                if (this.rpcHook != null) {
                    this.rpcHook.doBeforeRequest(addr, request);
                }
                long costTime = System.currentTimeMillis() - beginStartTime;
                if (timeoutMillis < costTime) {
                    throw new RemotingTimeoutException("invokeSync call timeout");
                }
                RemotingCommand response = this.invokeSyncImpl(channel, request, timeoutMillis - costTime);
                if (this.rpcHook != null) {
                    this.rpcHook.doAfterResponse(RemotingHelper.parseChannelRemoteAddr(channel), request, response);
                }
                return response;
            } catch (RemotingSendRequestException e) {
                log.warn("invokeSync: send request exception, so close the channel[{}]", addr);
                this.closeChannel(addr, channel);
                throw e;
            } catch (RemotingTimeoutException e) {
                if (nettyClientConfig.isClientCloseSocketIfTimeout()) {
                    this.closeChannel(addr, channel);
                    log.warn("invokeSync: close socket because of timeout, {}ms, {}", timeoutMillis, addr);
                }
                log.warn("invokeSync: wait response timeout exception, the channel[{}]", addr);
                throw e;
            }
        } else {
            this.closeChannel(addr, channel);
            throw new RemotingConnectException(addr);
        }
    }

【非同步拉取】

/**
對於非同步方式拉取訊息,呼叫MQClientAPIImpl.pullMessageAsync(String addr, RemotingCommand request, long timeoutMillis, PullCallback pullCallback)方法。大致邏輯如下:
1、定義了一個內部匿名InvokeCallback類並實現operationComplete (ResponseFuture responseFuture)方法;該方法的大致邏輯如下:
1.1)從入參ResponseFuture物件中獲取傳輸的響應物件RemotingCommand;
1.2)若該響應物件RemotingCommand不為空;則首先呼叫MQClientAPIImpl. processPullResponse (RemotingCommand response)方法對返回物件解析並封裝成PullResultExt物件,其中PullResultExt.messageBinary等於響應訊息的body;然後以PullResultExt物件為引數呼叫回撥類PullCallback物件的onSuccess方法(呼叫應用層定義的回撥方法,詳見5.5.2小節),若在此過程中出現異常則呼叫PullCallback物件的onException方法(呼叫應用層定義的回撥方法);
1.3)若該返回物件RemotingCommand為空;則檢查ResponseFuture.sendRequestOK是否為true,若不是則傳送請求失敗;若發生成功再檢查是否等待超時;對於每種異常情況均呼叫PullCallback物件的onException方法由應用層來處理異常情況;
2、以匿名類InvokeCallback為引數呼叫NettyRemotingClient.invokeAsync(String addr, RemotingCommand request, long timeoutMillis, InvokeCallback invokeCallback)方法,大致邏輯如下:
2.1)獲取Broker地址的Channel資訊。根據broker地址從RemotingClient.channelTables: ConcurrentHashMap, ChannelWrapper>變數中獲取ChannelWrapper物件並返回該物件的Channel變數;若沒有ChannelWrapper物件則與broker地址建立新的連線並將連線資訊存入channelTables變數中,便於下次使用;
2.2)若NettyRemotingClient.rpcHook: RPCHook變數不為空(該變數在應用層初始化DefaultMQPushConsumer或者DefaultMQPullConsumer物件傳入該值),則呼叫RPCHook.doBeforeRequest(String remoteAddr, RemotingCommand request)方法;
2.3)呼叫NettyRemotingAbstract.invokeAsyncImpl(Channel channel, RemotingCommand request,long timeoutMillis,InvokeCallback invokeCallback)方法,該方法的大致邏輯如下:
A)利用java.util.concurrent.Semaphore.tryAcquire(long timeout,TimeUnitunit)獲取訊號量,保證該方法的業務邏輯同時執行的執行緒個數;
B)使用請求的序列號(opaue)、超時時間、InvokeCallback物件、 用Semaphore初始化的SemaphoreReleaseOnlyOnce物件(該物件是確保在釋放訊號量是隻釋放一次)初始化ResponseFuture物件,並根據請求的序列號(opaue)作為key值,將該ResponseFuture物件存入NettyRemotingAbstract. responseTable物件