1. 程式人生 > 其它 >🏆【Alibaba中介軟體技術系列】「RocketMQ技術專題」Broker服務端自動建立topic的原理分析和問題要點指南

🏆【Alibaba中介軟體技術系列】「RocketMQ技術專題」Broker服務端自動建立topic的原理分析和問題要點指南

前提背景

使用RocketMQ進行發訊息時,一般我們是必須要指定topic,此外topic必須要提前建立,但是topic的建立(自動或者手動方式)的設定有一個開關autoCreateTopicEnable,此部分主要會在broker節點的配置檔案的時候進行設定,執行環境中會使用預設設定autoCreateTopicEnable = true,但是這樣就會導致topic的設定不容易規範管理,所以在生產環境中會在Broker設定引數autoCreateTopicEnable = false。那麼如果此引數稍有偏差,或者沒有提前手動建立topic,則會頻繁出現No route info of this topic這個錯誤,那麼接下來我們探索一下此問題的出現原因以及系統如何進行建立topic。

No route info of this topic

相信做過RocketMQ專案的小夥伴們,可能對No route info of this topic一點都不陌生,說明的含義起始就是無法解析或者路由這個topic,但是造成的原因有很多種。

沒有配置NameServer服務

Broker啟動時我們沒有配置NameSrv地址,傳送程式會報錯:No route info of this topic。但當我們配上NameSrv地址後,再次啟動,可以正常傳送訊息。

沒有建立autoCreateTopicEnable=true且沒有建立該topic

當autoCreateTopicEnable=false時,DefaultMQProducerImpl.sendDefaultImpl,當發訊息的時候肯定先要獲取關於topic的一些資訊,比如有幾個訊息佇列,是不時有序topic,有這個topic的Broker列表等,當獲取不到正確的資訊時,就會丟擲異常

RocketMQ的客戶端版本與服務端版本不一致

RocketMQ Java客戶端呼叫No route info of this topic錯誤(原因版本不一致)。此時,即使啟動broker的時候設定autoCreateTopicEnable=true也沒有用,假如,使用的rocketmq的版本是4.9.0,java client端版本4.3.0

RocketMQ 4.3.0版本的自動建立(autoCreateTopicEnable),客戶端傳遞使用的AUTO_CREATE_TOPIC_KEY_TOPIC是”AUTO_CREATE_TOPIC_KEY”,新版本的client,客戶端傳遞的預設AUTO_CREATE_TOPIC_KEY_TOPIC是“TBW102”。

org.apache.rocketmq.client.producer.DefaultMQProducer#createTopicKey
org.apache.rocketmq.common.MixAll#AUTO_CREATE_TOPIC_KEY_TOPIC
public static final String AUTO_CREATE_TOPIC_KEY_TOPIC = "TBW102";

實際程式碼

> 4.4.0版本
<=4.3.0版本

方案1:要不就進行調整client客戶端版本的version

<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-client</artifactId>
    <version>4.5.1</version>
</dependency>

方案2:調整自動建立程式碼為AUTO_CREATE_TOPIC_KEY

DefaultMQProducer producer = new DefaultMQProducer("please_rename_unidcque_group_name");
//設定自動建立topic的key值
producer.setCreateTopicKey("AUTO_CREATE_TOPIC_KEY");

Topic之前並未建立過,Broker未配置NameSrv地址,無法傳送,而配置NameSrv後則可以正常傳送。這中間有2個問題:
1、topic是怎麼自動建立的?
2、topic自動建立過程中Broker、NameSrv如何協作配合的?

分析以下如何自動建立topic的原始碼流程

RocketMQ基本路由規則

  1. Broker在啟動時向Nameserver註冊儲存在該伺服器上的路由資訊,並每隔30s向Nameserver傳送心跳包,並更新路由資訊。
    Nameserver每隔10s掃描路由表,如果檢測到Broker服務宕機,則移除對應的路由資訊。

  2. 訊息生產者每隔30s會從Nameserver重新拉取Topic的路由資訊並更新本地路由表;在訊息傳送之前,如果本地路由表中不存在對應主題的路由訊息時,會主動向Nameserver拉取該主題的訊息。

  3. 如果autoCreateTopicEnable設定為true,訊息傳送者向NameServer查詢主題的路由訊息返回空時,會嘗試用一個系統預設的主題名稱(MixAll.AUTO_CREATE_TOPIC_KEY_TOPIC),此時訊息傳送者得到的路由資訊為:

預設Topic的路由資訊是如何建立的?

Nameserver?broker?當autoCreateTopicEnable=false時,DefaultMQProducerImpl.sendDefaultImpl,當發訊息的時候肯定先要獲取關於topic的一些資訊,比如有幾個訊息佇列,是不時有序topic,有這個topic的Broker列表等,當獲取不到正確的資訊時,就會丟擲異常

 private SendResult sendDefaultImpl(
        Message msg,
        final CommunicationMode communicationMode,
        final SendCallback sendCallback,
        final long timeout
    ) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
        this.makeSureStateOK();
        Validators.checkMessage(msg, this.defaultMQProducer);
        final long invokeID = random.nextLong();
        long beginTimestampFirst = System.currentTimeMillis();
        long beginTimestampPrev = beginTimestampFirst;
        long endTimestamp = beginTimestampFirst;
        // 如果獲取到topic的路由資訊,則傳送,否則拋異常
        TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
        if (topicPublishInfo != null && topicPublishInfo.ok()) {
           ... ...
        }
        List<String> nsList = this.getmQClientFactory().getMQClientAPIImpl().getNameServerAddressList();
        if (null == nsList || nsList.isEmpty()) {
            throw new MQClientException(
                "No name server address, please set it." + FAQUrl.suggestTodo(FAQUrl.NAME_SERVER_ADDR_NOT_EXIST_URL), null).setResponseCode(ClientErrorCode.NO_NAME_SERVER_EXCEPTION);
        }
        throw new MQClientException("No route info of this topic, " + msg.getTopic() + FAQUrl.suggestTodo(FAQUrl.NO_TOPIC_ROUTE_INFO),
            null).setResponseCode(ClientErrorCode.NOT_FOUND_TOPIC_EXCEPTION);
    }

tryToFindTopicPublishInfo是傳送的關鍵,如果獲取到topic的資訊,則傳送,否則就異常;因此之前No route info of this topic的異常,就是Producer獲取不到Topic的資訊,導致傳送失敗。

先從topicPublishInfoTable快取中獲取

private TopicPublishInfo tryToFindTopicPublishInfo(final String topic) {
    // topicPublishInfoTable是Producer本地快取的topic資訊表
    // Producer啟動後,會新增預設的topic:TBW102
    TopicPublishInfo topicPublishInfo = this.topicPublishInfoTable.get(topic);
    if (null == topicPublishInfo || !topicPublishInfo.ok()) {
        this.topicPublishInfoTable.putIfAbsent(topic, new TopicPublishInfo());
        // 未獲取到,從NameSrv獲取該topic的資訊
        this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);
        topicPublishInfo = this.topicPublishInfoTable.get(topic);
    }
    // 獲取到了,則返回
    if (topicPublishInfo.isHaveTopicRouterInfo() || topicPublishInfo.ok()) {
        return topicPublishInfo;
    } else {
        // 沒獲取到,再換種方式從NameSrv獲取
        // 如果再獲取不到,那後續就無法傳送了
        this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic, true, this.defaultMQProducer);
        topicPublishInfo = this.topicPublishInfoTable.get(topic);
        return topicPublishInfo;
    }
}
  1. Producer本地topicPublishInfoTable變數中沒有topic的資訊,只快取了TBW102。

  2. 嘗試從NameSrv獲取Topic的資訊。獲取失敗,NameSrv中根本沒有Topic,因為這個topic是Producer傳送時設定的,沒有同步到NameSrv。

  3. 再換種方式從NameSrv獲取,如果獲取到了,那麼可以執行傳送流程,如果還是沒有獲取到,就會拋No route info of this topic的異常了。

再從NameServer服務中進行獲取

public boolean updateTopicRouteInfoFromNameServer(final String topic) {
        return updateTopicRouteInfoFromNameServer(topic, false, null);
}
  1. 第1次獲取時,isDefault傳的false,defaultMQProducer傳的null,因此在updateTopicRouteInfoFromNameServer會走else分支,用Topic去獲取

  2. 第2次獲取時,isDefault傳的true,defaultMQProducer也傳值了,因此會走if分支,將入參的topic轉換為預設的TBW102,獲取TBW102的資訊

  3. 不管Broker配沒配NameSrv地址,獲取Topic的資訊,必失敗

  4. 獲取TBW102資訊:

    • 2.1 Broker配置了NameSrv地址,成功
    • 2.2 Broker沒有配置NameSrv地址,失敗

生產者首先向NameServer查詢路由資訊,由於是一個不存在的主題,故此時返回的路由資訊為空,RocketMQ會使用預設的主題再次尋找,由於開啟了自動建立路由資訊,NameServer會向生產者返回預設主題的路由資訊。

然後從返回的路由資訊中選擇一個佇列(預設輪詢)。訊息傳送者從Nameserver獲取到預設的Topic的佇列資訊後,佇列的個數會改變嗎?

從NameServer中獲取,注意這個isDefault=false,defaultMQProducer=null

溫馨提示:訊息傳送者在到預設路由資訊時,其佇列數量,會選擇DefaultMQProducer#defaultTopicQueueNums與Nameserver返回的的佇列數取最小值,DefaultMQProducer#defaultTopicQueueNums預設值為4,故自動建立的主題,其佇列數量預設為4。

獲取訊息對應的topic資訊

發請求RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_ROUTEINTO_BY_TOPIC, requestHeader),但是因為沒有任何一個Broker有關於這個topic的資訊,所以namesrv就會返回topic不存在,處理請求的程式碼在DefaultRequestProcessor的。

case RequestCode.GET_ROUTEINTO_BY_TOPIC:  return this.getRouteInfoByTopic(ctx, request);

也就是迴應碼ResponseCode.TOPIC_NOT_EXIST,然後丟擲異常 throw new MQClientException(response.getCode(), response.getRemark());被捕獲之後退出返回false。

從NameServer獲取相關的Topic資訊資料

updateTopicRouteInfoFromNameServer最終會發給NameSrv一個GET_ROUTEINTO_BY_TOPIC請求

public boolean updateTopicRouteInfoFromNameServer(final String topic, boolean isDefault,
        DefaultMQProducer defaultMQProducer) {
        try {
            if (this.lockNamesrv.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {
                try {
                    TopicRouteData topicRouteData;
                    if (isDefault && defaultMQProducer != null) {
                        topicRouteData = this.mQClientAPIImpl.getDefaultTopicRouteInfoFromNameServer(defaultMQProducer.getCreateTopicKey(),
                            1000 * 3);
                        if (topicRouteData != null) {
                            for (QueueData data : topicRouteData.getQueueDatas()) {
                                int queueNums = Math.min(defaultMQProducer.getDefaultTopicQueueNums(), data.getReadQueueNums());
                                data.setReadQueueNums(queueNums);
                                data.setWriteQueueNums(queueNums);
                            }
                        }
                    } else {
                        topicRouteData = this.mQClientAPIImpl.getTopicRouteInfoFromNameServer(topic, 1000 * 3);
                    }
                } catch (Exception e) {
                    if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX) && !topic.equals(MixAll.DEFAULT_TOPIC)) {
                        log.warn("updateTopicRouteInfoFromNameServer Exception", e);
                    }
                } finally {
                    this.lockNamesrv.unlock();
                }
            } else {
                log.warn("updateTopicRouteInfoFromNameServer tryLock timeout {}ms", LOCK_TIMEOUT_MILLIS);
            }
        } catch (InterruptedException e) {
            log.warn("updateTopicRouteInfoFromNameServer Exception", e);
        }
        return false;
    }

因為if條件不滿足,所以獲取預設的topic資訊,注意isDefault=true,defaultMQProducer=defaultMQProducer

if (topicPublishInfo.isHaveTopicRouterInfo() || topicPublishInfo.ok()) {
            return topicPublishInfo;
        } else {
            this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic, true, this.defaultMQProducer);
            topicPublishInfo = this.topicPublishInfoTable.get(topic);
            return topicPublishInfo;
}

預設的topic為"TBW102",這個時候如果namesrv中如果還是沒有這個topic的資訊的話,就會丟擲異常No route info of this topic。
autoCreateTopicEnable=true的作用。

Broker啟動流程自動建立topic

  • 在Broker啟動流程中,會構建TopicConfigManager物件,其構造方法中首先會判斷是否開啟了允許自動建立主題,如果啟用了自動建立主題,則向topicConfigTable中新增預設主題的路由資訊。

  • 當Broker啟動時,TopicConfigManager初始化,這裡會判斷該標識,建立TBW102topic,並且在後續的心跳中把資訊更新到namesrv中,這樣在發訊息的時候就不會丟擲不存在的異常。

 // MixAll.DEFAULT_TOPIC
            if (this.brokerController.getBrokerConfig().isAutoCreateTopicEnable()) {
                String topic = MixAll.DEFAULT_TOPIC;
                TopicConfig topicConfig = new TopicConfig(topic);
                this.systemTopicList.add(topic);
                topicConfig.setReadQueueNums(this.brokerController.getBrokerConfig()
                    .getDefaultTopicQueueNums());
                topicConfig.setWriteQueueNums(this.brokerController.getBrokerConfig()
                    .getDefaultTopicQueueNums());
                int perm = PermName.PERM_INHERIT | PermName.PERM_READ | PermName.PERM_WRITE;
                topicConfig.setPerm(perm);
                this.topicConfigTable.put(topicConfig.getTopicName(), topicConfig);
            }

該topicConfigTable中所有的路由資訊,會隨著Broker向Nameserver傳送心跳包中,Nameserver收到這些資訊後,更新對應Topic的路由資訊表。

BrokerConfig的defaultTopicQueueNum預設為8。兩臺Broker伺服器都會執行上面的過程,故最終Nameserver中關於預設主題的路由資訊中,會包含兩個Broker分別各8個佇列資訊。

TopicConfigManager構造方法

當從namesrv查出Topic相關的資訊時,在topicRouteData2TopicPublishInfo設定訊息佇列數量 info.getMessageQueueList().add(mq);,呼叫updateTopicPublishInfo方法更新快取topicPublishInfoTable

 // Update Pub info
                            {
                                TopicPublishInfo publishInfo = topicRouteData2TopicPublishInfo(topic, topicRouteData);
                                publishInfo.setHaveTopicRouterInfo(true);
                                Iterator<Entry<String, MQProducerInner>> it = this.producerTable.entrySet().iterator();
                                while (it.hasNext()) {
                                    Entry<String, MQProducerInner> entry = it.next();
                                    MQProducerInner impl = entry.getValue();
                                    if (impl != null) {
                                        impl.updateTopicPublishInfo(topic, publishInfo);
                                    }
                                }
                            }

然後if (topicPublishInfo != null && topicPublishInfo.ok()) 這個條件就會符合,那個異常就不會丟擲。

當autoCreateTopicEnable=false時

  1. 建立topic的類UpdateTopicSubCommand(),設定相應的資訊,最後呼叫defaultMQAdminExt.createAndUpdateTopicConfig(addr, topicConfig);
  2. 發訊息RequestCode.UPDATE_AND_CREATE_TOPIC,AdminBrokerProcessor處理訊息 case RequestCode.UPDATE_AND_CREATE_TOPIC: return this.updateAndCreateTopic(ctx, request);
  3. 同步給其他Broker
this.brokerController.getTopicConfigManager().updateTopicConfig(topicConfig);
this.brokerController.registerBrokerAll(false, true);

Broker端收到訊息後的處理流程

服務端收到訊息傳送的處理器為:SendMessageProcessor,在處理訊息傳送時,會呼叫super.msgCheck方法:

AbstractSendMessageProcessor#msgCheck

在Broker端,首先會使用TopicConfigManager根據topic查詢路由資訊,如果Broker端不存在該主題的路由配置(路由資訊),此時如果Broker中存在預設主題的路由配置資訊,則根據訊息傳送請求中的佇列數量,在Broker建立新Topic的路由資訊。這樣Broker服務端就會存在主題的路由資訊。

在Broker端的topic配置管理器中存在的路由資訊,一會向Nameserver傳送心跳包,彙報到Nameserver,另一方面會有一個定時任務,定時儲存在broker端,具體路徑為${ROCKET_HOME}/store/config/topics.json中,這樣在Broker關閉後再重啟,並不會丟失路由資訊。

TBW102是為何物?

TBW102是Broker啟動時,當autoCreateTopicEnable的配置為true時,會自動建立該預設topic。

public TopicConfigManager(BrokerController brokerController) {
    this.brokerController = brokerController;
    // ...
    {
        // MixAll.AUTO_CREATE_TOPIC_KEY_TOPIC
        if (this.brokerController.getBrokerConfig().isAutoCreateTopicEnable()) {
            String topic = MixAll.AUTO_CREATE_TOPIC_KEY_TOPIC;
            TopicConfig topicConfig = new TopicConfig(topic);
            this.systemTopicList.add(topic);
            topicConfig.setReadQueueNums(this.brokerController.getBrokerConfig()
                .getDefaultTopicQueueNums());
            topicConfig.setWriteQueueNums(this.brokerController.getBrokerConfig()
                .getDefaultTopicQueueNums());
            int perm = PermName.PERM_INHERIT | PermName.PERM_READ | PermName.PERM_WRITE;
            topicConfig.setPerm(perm);
            this.topicConfigTable.put(topicConfig.getTopicName(), topicConfig);
        }
    }
    // ...
}

autoCreateTopicEnable的預設值是true,可以同步外部配置檔案,讓Broker啟動時載入,來改變該值。我理解的TBW102的作用是當開啟自動建立topic功能,傳送時用了未配置的topic,可以讓該topic繼承預設TBW102的配置,實現訊息的傳送。

總結分析

  1. client本地首先沒有快取對應topic的路由資訊,然後先去nameserver去查詢,nameserver中也沒有此topic的路由資訊,然後返回給client。client接收到返回後再向nameserver請求topic為tbw102的路由資訊。

  2. 如果有broker設定了autocreateTopic,則broker在啟動的時候會在topicManager中建立對應的topicconfig通過心跳傳送給nameserver,namerserver會將其儲存。nameserver將之前儲存的tbw102的路由資訊返回給請求的client。

  3. client拿到了topic為tbw102的路由資訊後返回,client根據返回的tbw102路由資訊(裡面包含所有設定了autocreateTopic為true的broker,預設每個broker會在client本地建立DefaultTopicQueueNums=4個讀寫佇列選擇,假設兩個broker則會有8個佇列讓你選擇)先快取到本地的topicPublishInfoTable表中,key為此topic ,value為此topicRouteData,輪詢選擇一個佇列進行傳送。

根據選擇到的佇列對應的broker傳送該topic訊息。

broker在接收到此訊息後會在msgcheck方法中呼叫createTopicInSendMessageMethod方法建立topicConfig資訊塞進topicConfigTable表中,然後就跟傳送已經建立的topic的流程一樣傳送訊息了。

同時topicConfigTable會通過心跳將新的這個topicConfig資訊傳送給nameserver。

nameserver接收到後會更新topic的路由資訊,如果之前接收到訊息的broker沒有全部覆蓋到,因為broker會30S向nameserver傳送一次心跳,心跳包裡包含topicconfig,覆蓋到的broker會將自動建立好的topicconfig資訊傳送給nameserver,從而在nameserver那邊接收到後會註冊這個新的topic資訊,因為消費者每30S也會到nameserver去更新本地的topicrouteinfo,請求傳送到nameserver得到了之前覆蓋到的broker傳送的心跳包更新後的最新topic路由資訊,那麼未被覆蓋的broker就永遠不會加入到這個負載均衡了,就會造成負載均衡達不到預期了,即所有能自動建立topic的broker不能全部都參與進來。

參考資料

https://www.cnblogs.com/dingwpmz/p/11809404.html

https://www.pianshen.com/article/24191855587/

https://www.jianshu.com/p/c8fd57a7f741

極限就是為了超越而存在的