1. 程式人生 > 程式設計 >聊聊rocketmq的updateTopicRouteInfoFromNameServer

聊聊rocketmq的updateTopicRouteInfoFromNameServer

本文主要研究一下rocketmq的updateTopicRouteInfoFromNameServer

updateTopicRouteInfoFromNameServer

rocketmq-client-4.5.2-sources.jar!/org/apache/rocketmq/client/impl/factory/MQClientInstance.java

public class MQClientInstance {
    private final static long LOCK_TIMEOUT_MILLIS = 3000;
    private final InternalLogger log
= ClientLogger.getLog(); private final ClientConfig clientConfig; private final int instanceIndex; private final String clientId; private final long bootTimestamp = System.currentTimeMillis(); private final ConcurrentMap<String/* group */,MQProducerInner> producerTable = new ConcurrentHashMap<String,MQProducerInner>(); private final ConcurrentMap<String/* group */,MQConsumerInner> consumerTable = new ConcurrentHashMap<String,MQConsumerInner>(); //...... public void updateTopicRouteInfoFromNameServer
() { Set<String> topicList = new HashSet<String>(); // Consumer { Iterator<Entry<String,MQConsumerInner>> it = this.consumerTable.entrySet().iterator(); while (it.hasNext()) { Entry<String,MQConsumerInner> entry = it.next(); MQConsumerInner impl = entry.getValue(); if
(impl != null) { Set<SubscriptionData> subList = impl.subscriptions(); if (subList != null) { for (SubscriptionData subData : subList) { topicList.add(subData.getTopic()); } } } } } // Producer { Iterator<Entry<String,MQProducerInner>> it = this.producerTable.entrySet().iterator(); while (it.hasNext()) { Entry<String,MQProducerInner> entry = it.next(); MQProducerInner impl = entry.getValue(); if (impl != null) { Set<String> lst = impl.getPublishTopicList(); topicList.addAll(lst); } } } for (String topic : topicList) { this.updateTopicRouteInfoFromNameServer(topic); } } public boolean updateTopicRouteInfoFromNameServer(final String topic) { return updateTopicRouteInfoFromNameServer(topic,false,null); } public boolean updateTopicRouteInfoFromNameServer(final String topic,boolean isDefault,DefaultMQProducer defaultMQProducer) { try { if (this.lockNamesrv.tryLock(LOCK_TIMEOUT_MILLIS,TimeUnit.MILLISECONDS)) { try { TopicRouteData topicRouteData; if (isDefault && defaultMQProducer != null) { topicRouteData = this.mQClientAPIImpl.getDefaultTopicRouteInfoFromNameServer(defaultMQProducer.getCreateTopicKey(),1000 * 3); if (topicRouteData != null) { for (QueueData data : topicRouteData.getQueueDatas()) { int queueNums = Math.min(defaultMQProducer.getDefaultTopicQueueNums(),data.getReadQueueNums()); data.setReadQueueNums(queueNums); data.setWriteQueueNums(queueNums); } } } else { topicRouteData = this.mQClientAPIImpl.getTopicRouteInfoFromNameServer(topic,1000 * 3); } if (topicRouteData != null) { TopicRouteData old = this.topicRouteTable.get(topic); boolean changed = topicRouteDataIsChange(old,topicRouteData); if (!changed) { changed = this.isNeedUpdateTopicRouteInfo(topic); } else { log.info("the topic[{}] route info changed,old[{}],new[{}]",topic,old,topicRouteData); } if (changed) { TopicRouteData cloneTopicRouteData = topicRouteData.cloneTopicRouteData(); for (BrokerData bd : topicRouteData.getBrokerDatas()) { this.brokerAddrTable.put(bd.getBrokerName(),bd.getBrokerAddrs()); } // Update Pub info { TopicPublishInfo publishInfo = topicRouteData2TopicPublishInfo(topic,topicRouteData); publishInfo.setHaveTopicRouterInfo(true); Iterator<Entry<String,MQProducerInner>> it = this.producerTable.entrySet().iterator(); while (it.hasNext()) { Entry<String,MQProducerInner> entry = it.next(); MQProducerInner impl = entry.getValue(); if (impl != null) { impl.updateTopicPublishInfo(topic,publishInfo); } } } // Update sub info { Set<MessageQueue> subscribeInfo = topicRouteData2TopicSubscribeInfo(topic,topicRouteData); Iterator<Entry<String,MQConsumerInner>> it = this.consumerTable.entrySet().iterator(); while (it.hasNext()) { Entry<String,MQConsumerInner> entry = it.next(); MQConsumerInner impl = entry.getValue(); if (impl != null) { impl.updateTopicSubscribeInfo(topic,subscribeInfo); } } } log.info("topicRouteTable.put. Topic = {},TopicRouteData[{}]",cloneTopicRouteData); this.topicRouteTable.put(topic,cloneTopicRouteData); return true; } } else { log.warn("updateTopicRouteInfoFromNameServer,getTopicRouteInfoFromNameServer return null,Topic: {}",topic); } } catch (Exception e) { if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX) && !topic.equals(MixAll.AUTO_CREATE_TOPIC_KEY_TOPIC)) { log.warn("updateTopicRouteInfoFromNameServer Exception",e); } } finally { this.lockNamesrv.unlock(); } } else { log.warn("updateTopicRouteInfoFromNameServer tryLock timeout {}ms",LOCK_TIMEOUT_MILLIS); } } catch (InterruptedException e) { log.warn("updateTopicRouteInfoFromNameServer Exception",e); } return false; } //...... } 複製程式碼
  • updateTopicRouteInfoFromNameServer首先從consumerTable及producerTable獲取topicList,然後遍歷topicList執行updateTopicRouteInfoFromNameServer,最後執行的是updateTopicRouteInfoFromNameServer(topic,false,null)
  • 這裡會執行mQClientAPIImpl.getTopicRouteInfoFromNameServer(topic,1000 * 3)獲取topicRouteData然後與topicRouteTable中的TopicRouteData進行對比,先通過topicRouteDataIsChange判斷是否有變化,沒有的話再通過isNeedUpdateTopicRouteInfo進一步判斷
  • 若有變化則更新brokerAddrTable,遍歷producerTable執行impl.updateTopicPublishInfo(topic,publishInfo);遍歷consumerTable執行impl.updateTopicSubscribeInfo(topic,subscribeInfo),最後將cloneTopicRouteData更新到topicRouteTable

getTopicRouteInfoFromNameServer

rocketmq-client-4.5.2-sources.jar!/org/apache/rocketmq/client/impl/MQClientAPIImpl.java

public class MQClientAPIImpl {

	//......

    public TopicRouteData getTopicRouteInfoFromNameServer(final String topic,final long timeoutMillis)
        throws RemotingException,MQClientException,InterruptedException {

        return getTopicRouteInfoFromNameServer(topic,timeoutMillis,true);
    }

    public TopicRouteData getTopicRouteInfoFromNameServer(final String topic,final long timeoutMillis,boolean allowTopicNotExist) throws MQClientException,InterruptedException,RemotingTimeoutException,RemotingSendRequestException,RemotingConnectException {
        GetRouteInfoRequestHeader requestHeader = new GetRouteInfoRequestHeader();
        requestHeader.setTopic(topic);

        RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_ROUTEINTO_BY_TOPIC,requestHeader);

        RemotingCommand response = this.remotingClient.invokeSync(null,request,timeoutMillis);
        assert response != null;
        switch (response.getCode()) {
            case ResponseCode.TOPIC_NOT_EXIST: {
                if (allowTopicNotExist && !topic.equals(MixAll.AUTO_CREATE_TOPIC_KEY_TOPIC)) {
                    log.warn("get Topic [{}] RouteInfoFromNameServer is not exist value",topic);
                }

                break;
            }
            case ResponseCode.SUCCESS: {
                byte[] body = response.getBody();
                if (body != null) {
                    return TopicRouteData.decode(body,TopicRouteData.class);
                }
            }
            default:
                break;
        }

        throw new MQClientException(response.getCode(),response.getRemark());
    }    

    //......
}
複製程式碼
  • getTopicRouteInfoFromNameServer方法構造RequestCode.GET_ROUTEINTO_BY_TOPIC,若response.getCode為ResponseCode.SUCCESS,則使用TopicRouteData.decode(body,TopicRouteData.class)解析為TopicRouteData;這裡remotingClient.invokeSync的addr引數為null

invokeSync

rocketmq-remoting-4.5.2-sources.jar!/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java

public class NettyRemotingClient extends NettyRemotingAbstract implements RemotingClient {

	//......

    private final AtomicReference<List<String>> namesrvAddrList = new AtomicReference<List<String>>();
    private final AtomicReference<String> namesrvAddrChoosed = new AtomicReference<String>();
    private final AtomicInteger namesrvIndex = new AtomicInteger(initValueIndex());

	//......

    public RemotingCommand invokeSync(String addr,final RemotingCommand request,long timeoutMillis)
        throws InterruptedException,RemotingConnectException,RemotingTimeoutException {
        long beginStartTime = System.currentTimeMillis();
        final Channel channel = this.getAndCreateChannel(addr);
        if (channel != null && channel.isActive()) {
            try {
                doBeforeRpcHooks(addr,request);
                long costTime = System.currentTimeMillis() - beginStartTime;
                if (timeoutMillis < costTime) {
                    throw new RemotingTimeoutException("invokeSync call timeout");
                }
                RemotingCommand response = this.invokeSyncImpl(channel,timeoutMillis - costTime);
                doAfterRpcHooks(RemotingHelper.parseChannelRemoteAddr(channel),response);
                return response;
            } catch (RemotingSendRequestException e) {
                log.warn("invokeSync: send request exception,so close the channel[{}]",addr);
                this.closeChannel(addr,channel);
                throw e;
            } catch (RemotingTimeoutException e) {
                if (nettyClientConfig.isClientCloseSocketIfTimeout()) {
                    this.closeChannel(addr,channel);
                    log.warn("invokeSync: close socket because of timeout,{}ms,{}",addr);
                }
                log.warn("invokeSync: wait response timeout exception,the channel[{}]",addr);
                throw e;
            }
        } else {
            this.closeChannel(addr,channel);
            throw new RemotingConnectException(addr);
        }
    }

    private Channel getAndCreateChannel(final String addr) throws InterruptedException {
        if (null == addr) {
            return getAndCreateNameserverChannel();
        }

        ChannelWrapper cw = this.channelTables.get(addr);
        if (cw != null && cw.isOK()) {
            return cw.getChannel();
        }

        return this.createChannel(addr);
    }

    private Channel getAndCreateNameserverChannel() throws InterruptedException {
        String addr = this.namesrvAddrChoosed.get();
        if (addr != null) {
            ChannelWrapper cw = this.channelTables.get(addr);
            if (cw != null && cw.isOK()) {
                return cw.getChannel();
            }
        }

        final List<String> addrList = this.namesrvAddrList.get();
        if (this.lockNamesrvChannel.tryLock(LOCK_TIMEOUT_MILLIS,TimeUnit.MILLISECONDS)) {
            try {
                addr = this.namesrvAddrChoosed.get();
                if (addr != null) {
                    ChannelWrapper cw = this.channelTables.get(addr);
                    if (cw != null && cw.isOK()) {
                        return cw.getChannel();
                    }
                }

                if (addrList != null && !addrList.isEmpty()) {
                    for (int i = 0; i < addrList.size(); i++) {
                        int index = this.namesrvIndex.incrementAndGet();
                        index = Math.abs(index);
                        index = index % addrList.size();
                        String newAddr = addrList.get(index);

                        this.namesrvAddrChoosed.set(newAddr);
                        log.info("new name server is chosen. OLD: {},NEW: {}. namesrvIndex = {}",addr,newAddr,namesrvIndex);
                        Channel channelNew = this.createChannel(newAddr);
                        if (channelNew != null) {
                            return channelNew;
                        }
                    }
                }
            } catch (Exception e) {
                log.error("getAndCreateNameserverChannel: create name server channel exception",e);
            } finally {
                this.lockNamesrvChannel.unlock();
            }
        } else {
            log.warn("getAndCreateNameserverChannel: try to lock name server,but timeout,{}ms",LOCK_TIMEOUT_MILLIS);
        }

        return null;
    }

    private static int initValueIndex() {
        Random r = new Random();

        return Math.abs(r.nextInt() % 999) % 999;
    }

    //......
}          
複製程式碼
  • invokeSync首先通過getAndCreateChannel獲取channel,而getAndCreateChannel方法在addr為null時執行的是getAndCreateNameserverChannel;這裡取的是namesrvAddrChoosed.get(),若不為null則返回,為null的話則先從namesrvIndex.incrementAndGet()獲取index,取絕對值,然後再對addrList.size()取餘數作為選中的namesrv的地址,更新到namesrvAddrChoosed;namesrvIndex的初始值為initValueIndex,它通過Math.abs(r.nextInt() % 999) % 999算出一個隨機初始值

小結

  • MQClientInstance的updateTopicRouteInfoFromNameServer首先從consumerTable及producerTable獲取topicList,然後遍歷topicList執行updateTopicRouteInfoFromNameServer,最後執行的是updateTopicRouteInfoFromNameServer(topic,subscribeInfo),最後將cloneTopicRouteData更新到topicRouteTable

doc