🏆【Alibaba中介軟體技術系列】「RocketMQ技術專題」Broker服務端自動建立topic的原理分析和問題要點指南
前提背景
使用RocketMQ進行發訊息時,一般我們是必須要指定topic,此外topic必須要提前建立,但是topic的建立(自動或者手動方式)的設定有一個開關autoCreateTopicEnable,此部分主要會在broker節點的配置檔案的時候進行設定,執行環境中會使用預設設定autoCreateTopicEnable = true,但是這樣就會導致topic的設定不容易規範管理,所以在生產環境中會在Broker設定引數autoCreateTopicEnable = false。那麼如果此引數稍有偏差,或者沒有提前手動建立topic,則會頻繁出現No route info of this topic這個錯誤,那麼接下來我們探索一下此問題的出現原因以及系統如何進行建立topic。
No route info of this topic
相信做過RocketMQ專案的小夥伴們,可能對No route info of this topic一點都不陌生,說明的含義起始就是無法解析或者路由這個topic,但是造成的原因有很多種。
沒有配置NameServer服務
Broker啟動時我們沒有配置NameSrv地址,傳送程式會報錯:No route info of this topic。但當我們配上NameSrv地址後,再次啟動,可以正常傳送訊息。
沒有建立autoCreateTopicEnable=true且沒有建立該topic
當autoCreateTopicEnable=false時,DefaultMQProducerImpl.sendDefaultImpl,當發訊息的時候肯定先要獲取關於topic的一些資訊,比如有幾個訊息佇列,是不時有序topic,有這個topic的Broker列表等,當獲取不到正確的資訊時,就會丟擲異常
RocketMQ的客戶端版本與服務端版本不一致
RocketMQ Java客戶端呼叫No route info of this topic錯誤(原因版本不一致)。此時,即使啟動broker的時候設定autoCreateTopicEnable=true也沒有用,假如,使用的rocketmq的版本是4.9.0,java client端版本4.3.0
RocketMQ 4.3.0版本的自動建立(autoCreateTopicEnable),客戶端傳遞使用的AUTO_CREATE_TOPIC_KEY_TOPIC是”AUTO_CREATE_TOPIC_KEY”,新版本的client,客戶端傳遞的預設AUTO_CREATE_TOPIC_KEY_TOPIC是“TBW102”。
org.apache.rocketmq.client.producer.DefaultMQProducer#createTopicKey
org.apache.rocketmq.common.MixAll#AUTO_CREATE_TOPIC_KEY_TOPIC
public static final String AUTO_CREATE_TOPIC_KEY_TOPIC = "TBW102";
實際程式碼
> 4.4.0版本
<=4.3.0版本
方案1:要不就進行調整client客戶端版本的version
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.5.1</version>
</dependency>
方案2:調整自動建立程式碼為AUTO_CREATE_TOPIC_KEY
DefaultMQProducer producer = new DefaultMQProducer("please_rename_unidcque_group_name");
//設定自動建立topic的key值
producer.setCreateTopicKey("AUTO_CREATE_TOPIC_KEY");
Topic之前並未建立過,Broker未配置NameSrv地址,無法傳送,而配置NameSrv後則可以正常傳送。這中間有2個問題:
1、topic是怎麼自動建立的?
2、topic自動建立過程中Broker、NameSrv如何協作配合的?
分析以下如何自動建立topic的原始碼流程
RocketMQ基本路由規則
-
Broker在啟動時向Nameserver註冊儲存在該伺服器上的路由資訊,並每隔30s向Nameserver傳送心跳包,並更新路由資訊。
Nameserver每隔10s掃描路由表,如果檢測到Broker服務宕機,則移除對應的路由資訊。 -
訊息生產者每隔30s會從Nameserver重新拉取Topic的路由資訊並更新本地路由表;在訊息傳送之前,如果本地路由表中不存在對應主題的路由訊息時,會主動向Nameserver拉取該主題的訊息。
-
如果autoCreateTopicEnable設定為true,訊息傳送者向NameServer查詢主題的路由訊息返回空時,會嘗試用一個系統預設的主題名稱(MixAll.AUTO_CREATE_TOPIC_KEY_TOPIC),此時訊息傳送者得到的路由資訊為:
預設Topic的路由資訊是如何建立的?
Nameserver?broker?當autoCreateTopicEnable=false時,DefaultMQProducerImpl.sendDefaultImpl,當發訊息的時候肯定先要獲取關於topic的一些資訊,比如有幾個訊息佇列,是不時有序topic,有這個topic的Broker列表等,當獲取不到正確的資訊時,就會丟擲異常
private SendResult sendDefaultImpl(
Message msg,
final CommunicationMode communicationMode,
final SendCallback sendCallback,
final long timeout
) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
this.makeSureStateOK();
Validators.checkMessage(msg, this.defaultMQProducer);
final long invokeID = random.nextLong();
long beginTimestampFirst = System.currentTimeMillis();
long beginTimestampPrev = beginTimestampFirst;
long endTimestamp = beginTimestampFirst;
// 如果獲取到topic的路由資訊,則傳送,否則拋異常
TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
if (topicPublishInfo != null && topicPublishInfo.ok()) {
... ...
}
List<String> nsList = this.getmQClientFactory().getMQClientAPIImpl().getNameServerAddressList();
if (null == nsList || nsList.isEmpty()) {
throw new MQClientException(
"No name server address, please set it." + FAQUrl.suggestTodo(FAQUrl.NAME_SERVER_ADDR_NOT_EXIST_URL), null).setResponseCode(ClientErrorCode.NO_NAME_SERVER_EXCEPTION);
}
throw new MQClientException("No route info of this topic, " + msg.getTopic() + FAQUrl.suggestTodo(FAQUrl.NO_TOPIC_ROUTE_INFO),
null).setResponseCode(ClientErrorCode.NOT_FOUND_TOPIC_EXCEPTION);
}
tryToFindTopicPublishInfo是傳送的關鍵,如果獲取到topic的資訊,則傳送,否則就異常;因此之前No route info of this topic的異常,就是Producer獲取不到Topic的資訊,導致傳送失敗。
先從topicPublishInfoTable快取中獲取
private TopicPublishInfo tryToFindTopicPublishInfo(final String topic) {
// topicPublishInfoTable是Producer本地快取的topic資訊表
// Producer啟動後,會新增預設的topic:TBW102
TopicPublishInfo topicPublishInfo = this.topicPublishInfoTable.get(topic);
if (null == topicPublishInfo || !topicPublishInfo.ok()) {
this.topicPublishInfoTable.putIfAbsent(topic, new TopicPublishInfo());
// 未獲取到,從NameSrv獲取該topic的資訊
this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);
topicPublishInfo = this.topicPublishInfoTable.get(topic);
}
// 獲取到了,則返回
if (topicPublishInfo.isHaveTopicRouterInfo() || topicPublishInfo.ok()) {
return topicPublishInfo;
} else {
// 沒獲取到,再換種方式從NameSrv獲取
// 如果再獲取不到,那後續就無法傳送了
this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic, true, this.defaultMQProducer);
topicPublishInfo = this.topicPublishInfoTable.get(topic);
return topicPublishInfo;
}
}
-
Producer本地topicPublishInfoTable變數中沒有topic的資訊,只快取了TBW102。
-
嘗試從NameSrv獲取Topic的資訊。獲取失敗,NameSrv中根本沒有Topic,因為這個topic是Producer傳送時設定的,沒有同步到NameSrv。
-
再換種方式從NameSrv獲取,如果獲取到了,那麼可以執行傳送流程,如果還是沒有獲取到,就會拋No route info of this topic的異常了。
再從NameServer服務中進行獲取
public boolean updateTopicRouteInfoFromNameServer(final String topic) {
return updateTopicRouteInfoFromNameServer(topic, false, null);
}
-
第1次獲取時,isDefault傳的false,defaultMQProducer傳的null,因此在updateTopicRouteInfoFromNameServer會走else分支,用Topic去獲取
-
第2次獲取時,isDefault傳的true,defaultMQProducer也傳值了,因此會走if分支,將入參的topic轉換為預設的TBW102,獲取TBW102的資訊
-
不管Broker配沒配NameSrv地址,獲取Topic的資訊,必失敗
-
獲取TBW102資訊:
- 2.1 Broker配置了NameSrv地址,成功
- 2.2 Broker沒有配置NameSrv地址,失敗
生產者首先向NameServer查詢路由資訊,由於是一個不存在的主題,故此時返回的路由資訊為空,RocketMQ會使用預設的主題再次尋找,由於開啟了自動建立路由資訊,NameServer會向生產者返回預設主題的路由資訊。
然後從返回的路由資訊中選擇一個佇列(預設輪詢)。訊息傳送者從Nameserver獲取到預設的Topic的佇列資訊後,佇列的個數會改變嗎?
從NameServer中獲取,注意這個isDefault=false,defaultMQProducer=null
溫馨提示:訊息傳送者在到預設路由資訊時,其佇列數量,會選擇DefaultMQProducer#defaultTopicQueueNums與Nameserver返回的的佇列數取最小值,DefaultMQProducer#defaultTopicQueueNums預設值為4,故自動建立的主題,其佇列數量預設為4。
獲取訊息對應的topic資訊
發請求RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_ROUTEINTO_BY_TOPIC, requestHeader)
,但是因為沒有任何一個Broker有關於這個topic的資訊,所以namesrv就會返回topic不存在,處理請求的程式碼在DefaultRequestProcessor的。
case RequestCode.GET_ROUTEINTO_BY_TOPIC: return this.getRouteInfoByTopic(ctx, request);
也就是迴應碼ResponseCode.TOPIC_NOT_EXIST,然後丟擲異常 throw new MQClientException(response.getCode(), response.getRemark());被捕獲之後退出返回false。
從NameServer獲取相關的Topic資訊資料
updateTopicRouteInfoFromNameServer最終會發給NameSrv一個GET_ROUTEINTO_BY_TOPIC請求
public boolean updateTopicRouteInfoFromNameServer(final String topic, boolean isDefault,
DefaultMQProducer defaultMQProducer) {
try {
if (this.lockNamesrv.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {
try {
TopicRouteData topicRouteData;
if (isDefault && defaultMQProducer != null) {
topicRouteData = this.mQClientAPIImpl.getDefaultTopicRouteInfoFromNameServer(defaultMQProducer.getCreateTopicKey(),
1000 * 3);
if (topicRouteData != null) {
for (QueueData data : topicRouteData.getQueueDatas()) {
int queueNums = Math.min(defaultMQProducer.getDefaultTopicQueueNums(), data.getReadQueueNums());
data.setReadQueueNums(queueNums);
data.setWriteQueueNums(queueNums);
}
}
} else {
topicRouteData = this.mQClientAPIImpl.getTopicRouteInfoFromNameServer(topic, 1000 * 3);
}
} catch (Exception e) {
if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX) && !topic.equals(MixAll.DEFAULT_TOPIC)) {
log.warn("updateTopicRouteInfoFromNameServer Exception", e);
}
} finally {
this.lockNamesrv.unlock();
}
} else {
log.warn("updateTopicRouteInfoFromNameServer tryLock timeout {}ms", LOCK_TIMEOUT_MILLIS);
}
} catch (InterruptedException e) {
log.warn("updateTopicRouteInfoFromNameServer Exception", e);
}
return false;
}
因為if條件不滿足,所以獲取預設的topic資訊,注意isDefault=true,defaultMQProducer=defaultMQProducer
if (topicPublishInfo.isHaveTopicRouterInfo() || topicPublishInfo.ok()) {
return topicPublishInfo;
} else {
this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic, true, this.defaultMQProducer);
topicPublishInfo = this.topicPublishInfoTable.get(topic);
return topicPublishInfo;
}
預設的topic為"TBW102",這個時候如果namesrv中如果還是沒有這個topic的資訊的話,就會丟擲異常No route info of this topic。
autoCreateTopicEnable=true的作用。
Broker啟動流程自動建立topic
-
在Broker啟動流程中,會構建TopicConfigManager物件,其構造方法中首先會判斷是否開啟了允許自動建立主題,如果啟用了自動建立主題,則向topicConfigTable中新增預設主題的路由資訊。
-
當Broker啟動時,TopicConfigManager初始化,這裡會判斷該標識,建立TBW102topic,並且在後續的心跳中把資訊更新到namesrv中,這樣在發訊息的時候就不會丟擲不存在的異常。
// MixAll.DEFAULT_TOPIC
if (this.brokerController.getBrokerConfig().isAutoCreateTopicEnable()) {
String topic = MixAll.DEFAULT_TOPIC;
TopicConfig topicConfig = new TopicConfig(topic);
this.systemTopicList.add(topic);
topicConfig.setReadQueueNums(this.brokerController.getBrokerConfig()
.getDefaultTopicQueueNums());
topicConfig.setWriteQueueNums(this.brokerController.getBrokerConfig()
.getDefaultTopicQueueNums());
int perm = PermName.PERM_INHERIT | PermName.PERM_READ | PermName.PERM_WRITE;
topicConfig.setPerm(perm);
this.topicConfigTable.put(topicConfig.getTopicName(), topicConfig);
}
該topicConfigTable中所有的路由資訊,會隨著Broker向Nameserver傳送心跳包中,Nameserver收到這些資訊後,更新對應Topic的路由資訊表。
BrokerConfig的defaultTopicQueueNum預設為8。兩臺Broker伺服器都會執行上面的過程,故最終Nameserver中關於預設主題的路由資訊中,會包含兩個Broker分別各8個佇列資訊。
TopicConfigManager構造方法
當從namesrv查出Topic相關的資訊時,在topicRouteData2TopicPublishInfo設定訊息佇列數量 info.getMessageQueueList().add(mq);,呼叫updateTopicPublishInfo方法更新快取topicPublishInfoTable
// Update Pub info
{
TopicPublishInfo publishInfo = topicRouteData2TopicPublishInfo(topic, topicRouteData);
publishInfo.setHaveTopicRouterInfo(true);
Iterator<Entry<String, MQProducerInner>> it = this.producerTable.entrySet().iterator();
while (it.hasNext()) {
Entry<String, MQProducerInner> entry = it.next();
MQProducerInner impl = entry.getValue();
if (impl != null) {
impl.updateTopicPublishInfo(topic, publishInfo);
}
}
}
然後if (topicPublishInfo != null && topicPublishInfo.ok()) 這個條件就會符合,那個異常就不會丟擲。
當autoCreateTopicEnable=false時
- 建立topic的類UpdateTopicSubCommand(),設定相應的資訊,最後呼叫defaultMQAdminExt.createAndUpdateTopicConfig(addr, topicConfig);
- 發訊息RequestCode.UPDATE_AND_CREATE_TOPIC,AdminBrokerProcessor處理訊息 case RequestCode.UPDATE_AND_CREATE_TOPIC: return this.updateAndCreateTopic(ctx, request);
- 同步給其他Broker
this.brokerController.getTopicConfigManager().updateTopicConfig(topicConfig);
this.brokerController.registerBrokerAll(false, true);
Broker端收到訊息後的處理流程
服務端收到訊息傳送的處理器為:SendMessageProcessor,在處理訊息傳送時,會呼叫super.msgCheck方法:
AbstractSendMessageProcessor#msgCheck
在Broker端,首先會使用TopicConfigManager根據topic查詢路由資訊,如果Broker端不存在該主題的路由配置(路由資訊),此時如果Broker中存在預設主題的路由配置資訊,則根據訊息傳送請求中的佇列數量,在Broker建立新Topic的路由資訊。這樣Broker服務端就會存在主題的路由資訊。
在Broker端的topic配置管理器中存在的路由資訊,一會向Nameserver傳送心跳包,彙報到Nameserver,另一方面會有一個定時任務,定時儲存在broker端,具體路徑為${ROCKET_HOME}/store/config/topics.json中,這樣在Broker關閉後再重啟,並不會丟失路由資訊。
TBW102是為何物?
TBW102是Broker啟動時,當autoCreateTopicEnable的配置為true時,會自動建立該預設topic。
public TopicConfigManager(BrokerController brokerController) {
this.brokerController = brokerController;
// ...
{
// MixAll.AUTO_CREATE_TOPIC_KEY_TOPIC
if (this.brokerController.getBrokerConfig().isAutoCreateTopicEnable()) {
String topic = MixAll.AUTO_CREATE_TOPIC_KEY_TOPIC;
TopicConfig topicConfig = new TopicConfig(topic);
this.systemTopicList.add(topic);
topicConfig.setReadQueueNums(this.brokerController.getBrokerConfig()
.getDefaultTopicQueueNums());
topicConfig.setWriteQueueNums(this.brokerController.getBrokerConfig()
.getDefaultTopicQueueNums());
int perm = PermName.PERM_INHERIT | PermName.PERM_READ | PermName.PERM_WRITE;
topicConfig.setPerm(perm);
this.topicConfigTable.put(topicConfig.getTopicName(), topicConfig);
}
}
// ...
}
autoCreateTopicEnable的預設值是true,可以同步外部配置檔案,讓Broker啟動時載入,來改變該值。我理解的TBW102的作用是當開啟自動建立topic功能,傳送時用了未配置的topic,可以讓該topic繼承預設TBW102的配置,實現訊息的傳送。
總結分析
-
client本地首先沒有快取對應topic的路由資訊,然後先去nameserver去查詢,nameserver中也沒有此topic的路由資訊,然後返回給client。client接收到返回後再向nameserver請求topic為tbw102的路由資訊。
-
如果有broker設定了autocreateTopic,則broker在啟動的時候會在topicManager中建立對應的topicconfig通過心跳傳送給nameserver,namerserver會將其儲存。nameserver將之前儲存的tbw102的路由資訊返回給請求的client。
-
client拿到了topic為tbw102的路由資訊後返回,client根據返回的tbw102路由資訊(裡面包含所有設定了autocreateTopic為true的broker,預設每個broker會在client本地建立DefaultTopicQueueNums=4個讀寫佇列選擇,假設兩個broker則會有8個佇列讓你選擇)先快取到本地的topicPublishInfoTable表中,key為此topic ,value為此topicRouteData,輪詢選擇一個佇列進行傳送。
根據選擇到的佇列對應的broker傳送該topic訊息。
broker在接收到此訊息後會在msgcheck方法中呼叫createTopicInSendMessageMethod方法建立topicConfig資訊塞進topicConfigTable表中,然後就跟傳送已經建立的topic的流程一樣傳送訊息了。
同時topicConfigTable會通過心跳將新的這個topicConfig資訊傳送給nameserver。
nameserver接收到後會更新topic的路由資訊,如果之前接收到訊息的broker沒有全部覆蓋到,因為broker會30S向nameserver傳送一次心跳,心跳包裡包含topicconfig,覆蓋到的broker會將自動建立好的topicconfig資訊傳送給nameserver,從而在nameserver那邊接收到後會註冊這個新的topic資訊,因為消費者每30S也會到nameserver去更新本地的topicrouteinfo,請求傳送到nameserver得到了之前覆蓋到的broker傳送的心跳包更新後的最新topic路由資訊,那麼未被覆蓋的broker就永遠不會加入到這個負載均衡了,就會造成負載均衡達不到預期了,即所有能自動建立topic的broker不能全部都參與進來。
參考資料
https://www.cnblogs.com/dingwpmz/p/11809404.html
極限就是為了超越而存在的