1. 程式人生 > 程式設計 >跟我學RocketMQ之訊息消費原始碼解析(1)

跟我學RocketMQ之訊息消費原始碼解析(1)

本文我們接著分析一下RocektMQ實現訊息消費的原始碼細節,這部分的內容較多,因此拆分為幾個章節分別進行講解。

本章節重點講解DefaultMQPushConsumer的程式碼邏輯。

DefaultMQPushConsumer使用樣例

按照慣例還是先看一下DefaultMQPushConsumer的使用樣例。

    @PostConstruct
    public void init() {
        defaultMQPushConsumer = new DefaultMQPushConsumer("ORDER_RESULT_NOTIFY_GROUP");
        defaultMQPushConsumer.setNamesrvAddr(nameSrvAddr);
        // 從頭開始消費
        defaultMQPushConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
        // 消費模式:叢集模式
        defaultMQPushConsumer.setMessageModel(MessageModel.CLUSTERING);
        // 註冊監聽器
        defaultMQPushConsumer.registerMessageListener(messageListener);
        // 訂閱所有訊息
        try {
            defaultMQPushConsumer.subscribe("ORDER_RESULT_NOTIFY_TOPIC","*");
            defaultMQPushConsumer.start();
        } catch (MQClientException e) {
            throw new RuntimeException("[訂單結果通知訊息消費者]--NotifySendConsumer載入異常!",e);
        }
        LOGGER.info("[訂單結果通知訊息消費者]--NotifySendConsumer載入完成!");
    }複製程式碼

初始化過程中需要呼叫registerMessageListener將具體的消費實現Listener注入。

    @Component(value = "notifySendListenerImpl")
    public class NotifySendListenerImpl implements MessageListenerConcurrently {複製程式碼

        ...省略部分程式碼...複製程式碼
        @Override
        public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,ConsumeConcurrentlyContext context) {複製程式碼
            try {
                for (MessageExt msg : msgs) {
                    // 訊息解碼
                    String message = new String(msg.getBody());
                    // 消費次數
                    int reconsumeTimes = msg.getReconsumeTimes();
                    String msgId = msg.getMsgId();
                    String logSuffix = ",msgId=" + msgId + ",reconsumeTimes=" + reconsumeTimes;複製程式碼
                    LOGGER.info("[通知傳送訊息消費者]-OrderNotifySendProducer-接收到訊息,message={},{}",message,logSuffix);
                    // 請求組裝
                    OrderResultNofityProtocol protocol = new OrderResultNofityProtocol();
                    protocol.decode(message);
                    // 引數加簽,獲取使用者privatekey
                    String privateKey = protocol.getPrivateKey();
                    String notifyUrl = protocol.getMerchantNotifyUrl();
                    String purseId = protocol.getPurseId();
                    ChargeNotifyRequest chargeNotifyRequest = new ChargeNotifyRequest();
                    chargeNotifyRequest.setChannel_orderid(protocol.getChannelOrderId())
                            .setFinish_time(DateUtil.formatDate(new Date(System.currentTimeMillis())))
                            .setOrder_status(NotifyConstant.NOTIFY_SUCCESS)
                            .setPlat_orderid(protocol.getOrderId())
                            .setSign(chargeNotifyRequest.sign(privateKey));
                    LOGGER.info("[通知傳送訊息消費者]-OrderNotifySendProducer-訂單結果通知入參:{},chargeNotifyRequest.toString(),logSuffix);
                    // 通知傳送
                    return sendNotifyByPost(reconsumeTimes,logSuffix,protocol,notifyUrl,purseId,chargeNotifyRequest);
                }
            } catch (Exception e) {
                LOGGER.error("[通知傳送訊息消費者]消費異常,e={}",LogExceptionWapper.getStackTrace(e));
            }
            return ConsumeConcurrentlyStatus.RECONSUME_LATER;
        }複製程式碼

上面就是一個較為標準的在spring框架中使用RocektMQ的DefaultMQPushConsumer進行消費的主流程。

接下來我們重點分析一下原始碼實現。

初始化DefaultMQPushConsumer

首先看一下DefaultMQPushConsumer的初始化過程。

進入DefaultMQPushConsumer.java類,檢視構造方法:

    public DefaultMQPushConsumer(final String consumerGroup) {
        this(null,consumerGroup,null,new AllocateMessageQueueAveragely());
    }複製程式碼

呼叫了它的同名構造,採用AllocateMessageQueueAveragely策略(平均雜湊佇列演演算法)

    public DefaultMQPushConsumer(final String namespace,final String consumerGroup,RPCHook rpcHook,AllocateMessageQueueStrategy allocateMessageQueueStrategy) {
        this.consumerGroup = consumerGroup;
        this.namespace = namespace;
        this.allocateMessageQueueStrategy = allocateMessageQueueStrategy;
        defaultMQPushConsumerImpl = new DefaultMQPushConsumerImpl(this,rpcHook);
    }複製程式碼

可以看到實際初始化是通過DefaultMQPushConsumerImpl實現的,DefaultMQPushConsumer持有一個defaultMQPushConsumerImpl的引用。

    [DefaultMQPushConsumerImpl.java]
    public DefaultMQPushConsumerImpl(DefaultMQPushConsumer defaultMQPushConsumer,RPCHook rpcHook) {
        // 初始化DefaultMQPushConsumerImpl,將defaultMQPushConsumer的實際引用傳入
        this.defaultMQPushConsumer = defaultMQPushConsumer;
        // 傳入rpcHook並指向本類的引用
        this.rpcHook = rpcHook;
    }
複製程式碼

註冊消費監聽MessageListener

我們接著看一下注冊消費監聽器的流程。

消費監聽介面MessageListener有兩個具體的實現,分別為

    MessageListenerConcurrently     -- 並行消費監聽
    MessageListenerOrderly          -- 順序消費監聽複製程式碼

本文以MessageListenerConcurrently為主要講解的物件。

檢視MessageListenerConcurrently的註冊過程。

    @Override
    public void registerMessageListener(
                MessageListenerConcurrently messageListener) {
        // 將實現指向本類引用
        this.messageListener = messageListener;
        // 進行真實註冊
        this.defaultMQPushConsumerImpl.registerMessageListener(messageListener);
    }複製程式碼

接著看defaultMQPushConsumerImpl.registerMessageListener

    DefaultMQPushConsumerImpl.java
    public void registerMessageListener(MessageListener messageListener) {
        this.messageListenerInner = messageListener;
    }複製程式碼

可以看到DefaultMQPushConsumerImpl將真實的messageListener實現指向它本類的messageListener引用。

訂閱topic

接著看一下訂閱topic的主流程。

topic訂閱主要通過方法subscribe實現,首先看一下DefaultMQPushConsumer的subscribe實現

    @Override
    public void subscribe(String topic,String subExpression) 
                                        throws MQClientException {
        this.defaultMQPushConsumerImpl
            .subscribe(withNamespace(topic),subExpression);
    }複製程式碼

可以看到是呼叫了DefaultMQPushConsumerImpl的subscribe方法。

    public void subscribe(String topic,String subExpression) throws MQClientException {
        try {
            // 構建主題的訂閱資料,預設為叢集消費
            SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPushConsumer.getConsumerGroup(),topic,subExpression);
            // 將topic的訂閱資料進行儲存
            this.rebalanceImpl.getSubscriptionInner().put(topic,subscriptionData);
            if (this.mQClientFactory != null) {
                // 如果MQClientInstance不為空,則向所有的broker傳送心跳包,加鎖
                this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
            }
        } catch (Exception e) {
            throw new MQClientException("subscription exception",e);
        }
    }複製程式碼

看一下buildSubscriptionData程式碼邏輯

    [FilterAPI.java]
    public static SubscriptionData buildSubscriptionData(final String consumerGroup,String topic,String subString) throws Exception {
        // 構造一個SubscriptionData實體,設定topic、表示式(tag)
        SubscriptionData subscriptionData = new SubscriptionData();
        subscriptionData.setTopic(topic);
        subscriptionData.setSubString(subString);複製程式碼
        // 如果tag為空或者為"*",統一設定為"*",即訂閱所有訊息
        if (null == subString || subString.equals(SubscriptionData.SUB_ALL) || subString.length() == 0) {
            subscriptionData.setSubString(SubscriptionData.SUB_ALL);
        } else {
            // tag不為空,則先按照‘|’進行分割
            String[] tags = subString.split("\\|\\|");
            if (tags.length > 0) {
                // 遍歷tag表示式陣列
                for (String tag : tags) {
                    if (tag.length() > 0) {
                        String trimString = tag.trim();
                        if (trimString.length() > 0) {
                            // 將每個tag的值設定到tagSet中
                            subscriptionData.getTagsSet().add(trimString);
                            subscriptionData.getCodeSet().add(trimString.hashCode());
                        }
                    }
                }
            } else {
                // tag解析異常
                throw new Exception("subString split error");
            }
        }
        return subscriptionData;
    }複製程式碼

看一下sendHeartbeatToAllBrokerWithLock程式碼邏輯

    [MQClientInstance.java]
    public void sendHeartbeatToAllBrokerWithLock() {
        if (this.lockHeartbeat.tryLock()) {
            try {
                // 傳送心跳包
                this.sendHeartbeatToAllBroker();
                this.uploadFilterClassSource();
            } catch (final Exception e) {
                log.error("sendHeartbeatToAllBroker exception",e);
            } finally {
                this.lockHeartbeat.unlock();
            }
        } else {
            log.warn("lock heartBeat,but failed.");
        }
    }複製程式碼

可以看到,同步傳送心跳包給所有的broker,而該過程是通過RemotingClient統一實現的,通過呼叫RemotingClient.invokeSync實現心跳包的傳送,底層是通過Netty實現的。具體細節本文不進行展開。

啟動消費客戶端

上述初始化流程執行完畢之後,通過start()方法啟動消費客戶端。

    @Override
    public void start() throws MQClientException {
        // 設定消費者組
        setConsumerGroup(NamespaceUtil.wrapNamespace(this.getNamespace(),this.consumerGroup));
        // 啟動消費客戶端
        this.defaultMQPushConsumerImpl.start();
        // trace處理邏輯
        if (null != traceDispatcher) {
            try {
                traceDispatcher.start(this.getNamesrvAddr(),this.getAccessChannel());
            } catch (MQClientException e) {
                log.warn("trace dispatcher start failed ",e);
            }
        }
    }複製程式碼

關於trace的處理邏輯,本文不再展開,感興趣的同學可以移步 跟我學RocketMQ之訊息軌跡實戰與原始碼分析

接著看defaultMQPushConsumerImpl.start()方法邏輯

    [DefaultMQPushConsumerImpl.java]
    public synchronized void start() throws MQClientException {
        switch (this.serviceState) {
            case CREATE_JUST:
                log.info("the consumer [{}] start beginning. messageModel={},isUnitMode={}",this.defaultMQPushConsumer.getConsumerGroup(),this.defaultMQPushConsumer.getMessageModel(),this.defaultMQPushConsumer.isUnitMode());
                this.serviceState = ServiceState.START_FAILED;複製程式碼

首次啟動後,執行配置檢查,該方法為前置校驗方法,主要進行消費屬性校驗。

                this.checkConfig();複製程式碼

將訂閱關係配置資訊進行復制

                this.copySubscription();複製程式碼

如果當前為叢集消費模式,修改例項名為pid

                if (this.defaultMQPushConsumer.getMessageModel() == MessageModel.CLUSTERING) {
                    this.defaultMQPushConsumer.changeInstanceNameToPID();
                }複製程式碼

建立一個新的MQClientInstance例項,如果已經存在直接使用該存在的MQClientInstance

                this.mQClientFactory = MQClientManager.getInstance().getAndCreateMQClientInstance(this.defaultMQPushConsumer,this.rpcHook);複製程式碼

為消費者負載均衡實現rebalanceImpl設定屬性

                // 設定消費者組
                this.rebalanceImpl.setConsumerGroup(this.defaultMQPushConsumer.getConsumerGroup());
                // 設定消費模式
                this.rebalanceImpl.setMessageModel(this.defaultMQPushConsumer.getMessageModel());
                // 設定佇列分配策略
                this.rebalanceImpl.setAllocateMessageQueueStrategy(this.defaultMQPushConsumer.getAllocateMessageQueueStrategy());
                // 設定當前的MQClientInstance例項
                this.rebalanceImpl.setmQClientFactory(this.mQClientFactory);
複製程式碼
                this.pullAPIWrapper = new PullAPIWrapper(
                    mQClientFactory,isUnitMode());
                // 註冊訊息過濾鉤子
                this.pullAPIWrapper.registerFilterMessageHook(filterMessageHookList);複製程式碼

處理offset儲存方式

                // offsetStore不為空則使用當前的offsetStore方式
                if (this.defaultMQPushConsumer.getOffsetStore() != null) {
                    this.offsetStore = this.defaultMQPushConsumer.getOffsetStore();
                } else {
                    // 否則根據消費方式選擇具體的offsetStore方式儲存offset
                    switch (this.defaultMQPushConsumer.getMessageModel()) {
                        // 如果是廣播方式,則使用本地儲存方式
                        case BROADCASTING:
                            this.offsetStore = new LocalFileOffsetStore(this.mQClientFactory,this.defaultMQPushConsumer.getConsumerGroup());
                            break;
                        // 如果是叢集方式,則使用遠端broker儲存方式儲存offset
                        case CLUSTERING:
                            this.offsetStore = new RemoteBrokerOffsetStore(this.mQClientFactory,this.defaultMQPushConsumer.getConsumerGroup());
                            break;
                        default:
                            break;
                    }
                    this.defaultMQPushConsumer.setOffsetStore(this.offsetStore);
                }
                 // 載入當前的offset
                this.offsetStore.load();複製程式碼

根據MessageListener的具體實現方式選取具體的訊息拉取執行緒實現。

                // 如果是MessageListenerOrderly順序消費介面實現
                // 訊息消費服務選擇:ConsumeMessageOrderlyService(順序訊息消費服務)
                if (this.getMessageListenerInner() instanceof MessageListenerOrderly) {
                    this.consumeOrderly = true;
                    this.consumeMessageService =
                        new ConsumeMessageOrderlyService(this,(MessageListenerOrderly) this.getMessageListenerInner());
                } 
                // 如果是MessageListenerConcurrently並行訊息消費介面實現
                // 訊息消費服務選擇:ConsumeMessageConcurrentlyService(並行訊息消費服務)
                else if (this.getMessageListenerInner() instanceof MessageListenerConcurrently) {
                    this.consumeOrderly = false;
                    this.consumeMessageService =
                        new ConsumeMessageConcurrentlyService(this,(MessageListenerConcurrently) this.getMessageListenerInner());
                }複製程式碼

選擇並初始化完成具體的訊息消費服務之後,啟動訊息消費服務。consumeMessageService主要負責對訊息進行消費,它的內部維護了一個執行緒池。

                // 啟動訊息消費服務
                this.consumeMessageService.start();複製程式碼

接著向MQClientInstance註冊消費者,並啟動MQClientInstance。這裡再次強調

一個JVM中所有消費者、生產者持有同一個MQClientInstance,且MQClientInstance只會啟動一次

                boolean registerOK = mQClientFactory.registerConsumer(this.defaultMQPushConsumer.getConsumerGroup(),this);
                if (!registerOK) {
                    this.serviceState = ServiceState.CREATE_JUST;
                    this.consumeMessageService.shutdown();
                    throw new MQClientException("The consumer group[" + this.defaultMQPushConsumer.getConsumerGroup()
                        + "] has been created before,specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),null);
                }複製程式碼
                mQClientFactory.start();
                log.info("the consumer [{}] start OK.",this.defaultMQPushConsumer.getConsumerGroup());
                this.serviceState = ServiceState.RUNNING;
                break;複製程式碼

如果MQClientInstance已經啟動,或者已經關閉,或者啟動失敗,重複呼叫start會報錯。這裡也能直觀的反映出:MQClientInstance的啟動只有一次

            case RUNNING:
            case START_FAILED:
            case SHUTDOWN_ALREADY:
                throw new MQClientException("The PushConsumer service state not OK,maybe started once,"
                    + this.serviceState
                    + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),null);
            default:
                break;
        }複製程式碼

啟動完成執行後續收尾工作

        // 訂閱關係改變,更新Nameserver的訂閱關係表
        this.updateTopicSubscribeInfoWhenSubscriptionChanged();
        // 檢查客戶端狀態
        this.mQClientFactory.checkClientInBroker();
        // 傳送心跳包
        this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
        // 喚醒執行消費者負載均衡
        this.mQClientFactory.rebalanceImmediately();
    }
複製程式碼

copySubscription(),訊息重試topic處理邏輯

消費者啟動流程較為重要,我們接著對其中的重點方法展開講解。這部分內容可以暫時跳過,不影響對主流程的把控。

我們研究一下copySubscription方法的實現細節。

    [DefaultMQPushConsumerImpl.java]
    private void copySubscription() throws MQClientException {
        try {複製程式碼
            // 首先獲取訂閱資訊
            Map<String,String> sub = this.defaultMQPushConsumer.getSubscription();
            if (sub != null) {
                for (final Map.Entry<String,String> entry : sub.entrySet()) {
                    final String topic = entry.getKey();
                    final String subString = entry.getValue();
                    SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPushConsumer.getConsumerGroup(),subString);
                    this.rebalanceImpl.getSubscriptionInner().put(topic,subscriptionData);
                }
            }複製程式碼
            // 為defaultMQPushConsumer設定具體的MessageListener實現
            if (null == this.messageListenerInner) {
                this.messageListenerInner = this.defaultMQPushConsumer.getMessageListener();
            }複製程式碼

根據消費型別選擇是否進行重試topic訂閱

            switch (this.defaultMQPushConsumer.getMessageModel()) {複製程式碼
                // 如果是廣播消費模式,則不進行任何處理,即無重試
                case BROADCASTING:
                    break;複製程式碼
                // 如果是叢集消費模式,訂閱重試主題訊息
                case CLUSTERING:
                    final String retryTopic = MixAll.getRetryTopic(this.defaultMQPushConsumer.getConsumerGroup());
                    SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPushConsumer.getConsumerGroup(),retryTopic,SubscriptionData.SUB_ALL);
                    this.rebalanceImpl.getSubscriptionInner().put(retryTopic,subscriptionData);
                    break;
                default:
                    break;
            }
        } catch (Exception e) {
            throw new MQClientException("subscription exception",e);
        }
    }複製程式碼

如果是叢集消費模式,會訂閱重試主題訊息

獲取重試topic,規則為 RETRYGROUPTOPIC_PREFIX + consumerGroup,即:"%RETRY%"+消費組名

為重試topic設定訂閱關係,訂閱所有的訊息;

消費者啟動的時候會自動訂閱該重試主題,並參與該topic的訊息佇列負載過程。

小結

到此,我們就DefaultMQPushConsumer的初始化、啟動、校驗以及topic訂閱、重試等程式碼實現細節進行了較為詳細的講解。

下一章節,我將帶領讀者對訊息消費執行緒 consumeMessageService 的實現進行分析,我們下篇文章見。


版權宣告:
原創不易,洗文可恥。除非註明,本博文章均為原創,轉載請以連結形式標明本文地址。