MQClientInstance 啟動定時任務之跟新主題對應的路由資訊
阿新 • • 發佈:2020-08-13
MQClientInstance 啟動定時任務之跟新主題對應的路由資訊
程式碼位於方法MQClientInstance#startScheduledTask中
//週期性從namesrv拉取主題資訊 30秒拉一次
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { try { //呼叫updateTopicRouteInfoFromNameServer跟新主題路由資訊 MQClientInstance.this.updateTopicRouteInfoFromNameServer(); } catch (Exception e) { log.error("ScheduledTask updateTopicRouteInfoFromNameServer exception", e); } } }, 10, this.clientConfig.getPollNameServerInterval(), TimeUnit.MILLISECONDS);
MQClientInstance#updateTopicRouteInfoFromNameServer
public void updateTopicRouteInfoFromNameServer() { Set<String> topicList = new HashSet<String>(); // Consumer 提取所有消費者訂閱的主題 { Iterator<Entry<String, MQConsumerInner>> it = this.consumerTable.entrySet().iterator(); while (it.hasNext()) { Entry<String, MQConsumerInner> entry = it.next(); MQConsumerInner impl = entry.getValue(); if (impl != null) { Set<SubscriptionData> subList = impl.subscriptions(); if (subList != null) { for (SubscriptionData subData : subList) { topicList.add(subData.getTopic()); } } } } } // Producer 提取所有生產者傳送的主題 { Iterator<Entry<String, MQProducerInner>> it = this.producerTable.entrySet().iterator(); while (it.hasNext()) { Entry<String, MQProducerInner> entry = it.next(); MQProducerInner impl = entry.getValue(); if (impl != null) { Set<String> lst = impl.getPublishTopicList(); topicList.addAll(lst); } } } //遍歷跟新所有的訊息對應的路由資訊 for (String topic : topicList) { this.updateTopicRouteInfoFromNameServer(topic); } }
public boolean updateTopicRouteInfoFromNameServer(final String topic) {
return updateTopicRouteInfoFromNameServer(topic, false, null);
}
public boolean updateTopicRouteInfoFromNameServer(final String topic, boolean isDefault, DefaultMQProducer defaultMQProducer) { try { //加鎖 if (this.lockNamesrv.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) { try { TopicRouteData topicRouteData; if (isDefault && defaultMQProducer != null) { topicRouteData = this.mQClientAPIImpl.getDefaultTopicRouteInfoFromNameServer(defaultMQProducer.getCreateTopicKey(), 1000 * 3); if (topicRouteData != null) { for (QueueData data : topicRouteData.getQueueDatas()) { int queueNums = Math.min(defaultMQProducer.getDefaultTopicQueueNums(), data.getReadQueueNums()); data.setReadQueueNums(queueNums); data.setWriteQueueNums(queueNums); } } } else { //從服務端獲取主題的路由資訊 topicRouteData = this.mQClientAPIImpl.getTopicRouteInfoFromNameServer(topic, 1000 * 3); } if (topicRouteData != null) { TopicRouteData old = this.topicRouteTable.get(topic); //路由資訊是否發生了變化 boolean changed = topicRouteDataIsChange(old, topicRouteData); if (!changed) { //路由資訊沒變再判斷生產者和消費者中的路由資訊有沒有儲存該topic對應的資訊 changed = this.isNeedUpdateTopicRouteInfo(topic); } else { log.info("the topic[{}] route info changed, old[{}] ,new[{}]", topic, old, topicRouteData); } //路由資訊變化了或者 生產者和消費者中沒有儲存該topic的路由資訊 if (changed) { //克隆一份主題路由資訊 TopicRouteData cloneTopicRouteData = topicRouteData.cloneTopicRouteData(); //跟新brokerName對應的broker地址資訊 for (BrokerData bd : topicRouteData.getBrokerDatas()) { this.brokerAddrTable.put(bd.getBrokerName(), bd.getBrokerAddrs()); } // Update Pub info 跟新發送者路由資訊 { TopicPublishInfo publishInfo = topicRouteData2TopicPublishInfo(topic, topicRouteData); publishInfo.setHaveTopicRouterInfo(true); Iterator<Entry<String, MQProducerInner>> it = this.producerTable.entrySet().iterator(); while (it.hasNext()) { Entry<String, MQProducerInner> entry = it.next(); MQProducerInner impl = entry.getValue(); if (impl != null) { impl.updateTopicPublishInfo(topic, publishInfo); } } } // Update sub info 跟新消費者路由資訊 { Set<MessageQueue> subscribeInfo = topicRouteData2TopicSubscribeInfo(topic, topicRouteData); Iterator<Entry<String, MQConsumerInner>> it = this.consumerTable.entrySet().iterator(); while (it.hasNext()) { Entry<String, MQConsumerInner> entry = it.next(); MQConsumerInner impl = entry.getValue(); if (impl != null) { impl.updateTopicSubscribeInfo(topic, subscribeInfo); } } } log.info("topicRouteTable.put. Topic = {}, TopicRouteData[{}]", topic, cloneTopicRouteData); this.topicRouteTable.put(topic, cloneTopicRouteData); return true; } } else { log.warn("updateTopicRouteInfoFromNameServer, getTopicRouteInfoFromNameServer return null, Topic: {}", topic); } } catch (Exception e) { if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX) && !topic.equals(MixAll.DEFAULT_TOPIC)) { log.warn("updateTopicRouteInfoFromNameServer Exception", e); } } finally { this.lockNamesrv.unlock(); } } else { log.warn("updateTopicRouteInfoFromNameServer tryLock timeout {}ms", LOCK_TIMEOUT_MILLIS); } } catch (InterruptedException e) { log.warn("updateTopicRouteInfoFromNameServer Exception", e); } return false; }