聊聊rocketmq的updateTopicRouteInfoFromNameServer
阿新 • • 發佈:2020-06-24
序
本文主要研究一下rocketmq的updateTopicRouteInfoFromNameServer
updateTopicRouteInfoFromNameServer
rocketmq-client-4.5.2-sources.jar!/org/apache/rocketmq/client/impl/factory/MQClientInstance.java
public class MQClientInstance {
private final static long LOCK_TIMEOUT_MILLIS = 3000;
private final InternalLogger log = ClientLogger.getLog();
private final ClientConfig clientConfig;
private final int instanceIndex;
private final String clientId;
private final long bootTimestamp = System.currentTimeMillis();
private final ConcurrentMap<String/* group */,MQProducerInner> producerTable = new ConcurrentHashMap<String,MQProducerInner>();
private final ConcurrentMap<String/* group */,MQConsumerInner> consumerTable = new ConcurrentHashMap<String,MQConsumerInner>();
//......
public void updateTopicRouteInfoFromNameServer () {
Set<String> topicList = new HashSet<String>();
// Consumer
{
Iterator<Entry<String,MQConsumerInner>> it = this.consumerTable.entrySet().iterator();
while (it.hasNext()) {
Entry<String,MQConsumerInner> entry = it.next();
MQConsumerInner impl = entry.getValue();
if (impl != null) {
Set<SubscriptionData> subList = impl.subscriptions();
if (subList != null) {
for (SubscriptionData subData : subList) {
topicList.add(subData.getTopic());
}
}
}
}
}
// Producer
{
Iterator<Entry<String,MQProducerInner>> it = this.producerTable.entrySet().iterator();
while (it.hasNext()) {
Entry<String,MQProducerInner> entry = it.next();
MQProducerInner impl = entry.getValue();
if (impl != null) {
Set<String> lst = impl.getPublishTopicList();
topicList.addAll(lst);
}
}
}
for (String topic : topicList) {
this.updateTopicRouteInfoFromNameServer(topic);
}
}
public boolean updateTopicRouteInfoFromNameServer(final String topic) {
return updateTopicRouteInfoFromNameServer(topic,false,null);
}
public boolean updateTopicRouteInfoFromNameServer(final String topic,boolean isDefault,DefaultMQProducer defaultMQProducer) {
try {
if (this.lockNamesrv.tryLock(LOCK_TIMEOUT_MILLIS,TimeUnit.MILLISECONDS)) {
try {
TopicRouteData topicRouteData;
if (isDefault && defaultMQProducer != null) {
topicRouteData = this.mQClientAPIImpl.getDefaultTopicRouteInfoFromNameServer(defaultMQProducer.getCreateTopicKey(),1000 * 3);
if (topicRouteData != null) {
for (QueueData data : topicRouteData.getQueueDatas()) {
int queueNums = Math.min(defaultMQProducer.getDefaultTopicQueueNums(),data.getReadQueueNums());
data.setReadQueueNums(queueNums);
data.setWriteQueueNums(queueNums);
}
}
} else {
topicRouteData = this.mQClientAPIImpl.getTopicRouteInfoFromNameServer(topic,1000 * 3);
}
if (topicRouteData != null) {
TopicRouteData old = this.topicRouteTable.get(topic);
boolean changed = topicRouteDataIsChange(old,topicRouteData);
if (!changed) {
changed = this.isNeedUpdateTopicRouteInfo(topic);
} else {
log.info("the topic[{}] route info changed,old[{}],new[{}]",topic,old,topicRouteData);
}
if (changed) {
TopicRouteData cloneTopicRouteData = topicRouteData.cloneTopicRouteData();
for (BrokerData bd : topicRouteData.getBrokerDatas()) {
this.brokerAddrTable.put(bd.getBrokerName(),bd.getBrokerAddrs());
}
// Update Pub info
{
TopicPublishInfo publishInfo = topicRouteData2TopicPublishInfo(topic,topicRouteData);
publishInfo.setHaveTopicRouterInfo(true);
Iterator<Entry<String,MQProducerInner>> it = this.producerTable.entrySet().iterator();
while (it.hasNext()) {
Entry<String,MQProducerInner> entry = it.next();
MQProducerInner impl = entry.getValue();
if (impl != null) {
impl.updateTopicPublishInfo(topic,publishInfo);
}
}
}
// Update sub info
{
Set<MessageQueue> subscribeInfo = topicRouteData2TopicSubscribeInfo(topic,topicRouteData);
Iterator<Entry<String,MQConsumerInner>> it = this.consumerTable.entrySet().iterator();
while (it.hasNext()) {
Entry<String,MQConsumerInner> entry = it.next();
MQConsumerInner impl = entry.getValue();
if (impl != null) {
impl.updateTopicSubscribeInfo(topic,subscribeInfo);
}
}
}
log.info("topicRouteTable.put. Topic = {},TopicRouteData[{}]",cloneTopicRouteData);
this.topicRouteTable.put(topic,cloneTopicRouteData);
return true;
}
} else {
log.warn("updateTopicRouteInfoFromNameServer,getTopicRouteInfoFromNameServer return null,Topic: {}",topic);
}
} catch (Exception e) {
if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX) && !topic.equals(MixAll.AUTO_CREATE_TOPIC_KEY_TOPIC)) {
log.warn("updateTopicRouteInfoFromNameServer Exception",e);
}
} finally {
this.lockNamesrv.unlock();
}
} else {
log.warn("updateTopicRouteInfoFromNameServer tryLock timeout {}ms",LOCK_TIMEOUT_MILLIS);
}
} catch (InterruptedException e) {
log.warn("updateTopicRouteInfoFromNameServer Exception",e);
}
return false;
}
//......
}
複製程式碼
- updateTopicRouteInfoFromNameServer首先從consumerTable及producerTable獲取topicList,然後遍歷topicList執行updateTopicRouteInfoFromNameServer,最後執行的是updateTopicRouteInfoFromNameServer(topic,false,null)
- 這裡會執行mQClientAPIImpl.getTopicRouteInfoFromNameServer(topic,1000 * 3)獲取topicRouteData然後與topicRouteTable中的TopicRouteData進行對比,先通過topicRouteDataIsChange判斷是否有變化,沒有的話再通過isNeedUpdateTopicRouteInfo進一步判斷
- 若有變化則更新brokerAddrTable,遍歷producerTable執行impl.updateTopicPublishInfo(topic,publishInfo);遍歷consumerTable執行impl.updateTopicSubscribeInfo(topic,subscribeInfo),最後將cloneTopicRouteData更新到topicRouteTable
getTopicRouteInfoFromNameServer
rocketmq-client-4.5.2-sources.jar!/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
public class MQClientAPIImpl {
//......
public TopicRouteData getTopicRouteInfoFromNameServer(final String topic,final long timeoutMillis)
throws RemotingException,MQClientException,InterruptedException {
return getTopicRouteInfoFromNameServer(topic,timeoutMillis,true);
}
public TopicRouteData getTopicRouteInfoFromNameServer(final String topic,final long timeoutMillis,boolean allowTopicNotExist) throws MQClientException,InterruptedException,RemotingTimeoutException,RemotingSendRequestException,RemotingConnectException {
GetRouteInfoRequestHeader requestHeader = new GetRouteInfoRequestHeader();
requestHeader.setTopic(topic);
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_ROUTEINTO_BY_TOPIC,requestHeader);
RemotingCommand response = this.remotingClient.invokeSync(null,request,timeoutMillis);
assert response != null;
switch (response.getCode()) {
case ResponseCode.TOPIC_NOT_EXIST: {
if (allowTopicNotExist && !topic.equals(MixAll.AUTO_CREATE_TOPIC_KEY_TOPIC)) {
log.warn("get Topic [{}] RouteInfoFromNameServer is not exist value",topic);
}
break;
}
case ResponseCode.SUCCESS: {
byte[] body = response.getBody();
if (body != null) {
return TopicRouteData.decode(body,TopicRouteData.class);
}
}
default:
break;
}
throw new MQClientException(response.getCode(),response.getRemark());
}
//......
}
複製程式碼
- getTopicRouteInfoFromNameServer方法構造RequestCode.GET_ROUTEINTO_BY_TOPIC,若response.getCode為ResponseCode.SUCCESS,則使用TopicRouteData.decode(body,TopicRouteData.class)解析為TopicRouteData;這裡remotingClient.invokeSync的addr引數為null
invokeSync
rocketmq-remoting-4.5.2-sources.jar!/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java
public class NettyRemotingClient extends NettyRemotingAbstract implements RemotingClient {
//......
private final AtomicReference<List<String>> namesrvAddrList = new AtomicReference<List<String>>();
private final AtomicReference<String> namesrvAddrChoosed = new AtomicReference<String>();
private final AtomicInteger namesrvIndex = new AtomicInteger(initValueIndex());
//......
public RemotingCommand invokeSync(String addr,final RemotingCommand request,long timeoutMillis)
throws InterruptedException,RemotingConnectException,RemotingTimeoutException {
long beginStartTime = System.currentTimeMillis();
final Channel channel = this.getAndCreateChannel(addr);
if (channel != null && channel.isActive()) {
try {
doBeforeRpcHooks(addr,request);
long costTime = System.currentTimeMillis() - beginStartTime;
if (timeoutMillis < costTime) {
throw new RemotingTimeoutException("invokeSync call timeout");
}
RemotingCommand response = this.invokeSyncImpl(channel,timeoutMillis - costTime);
doAfterRpcHooks(RemotingHelper.parseChannelRemoteAddr(channel),response);
return response;
} catch (RemotingSendRequestException e) {
log.warn("invokeSync: send request exception,so close the channel[{}]",addr);
this.closeChannel(addr,channel);
throw e;
} catch (RemotingTimeoutException e) {
if (nettyClientConfig.isClientCloseSocketIfTimeout()) {
this.closeChannel(addr,channel);
log.warn("invokeSync: close socket because of timeout,{}ms,{}",addr);
}
log.warn("invokeSync: wait response timeout exception,the channel[{}]",addr);
throw e;
}
} else {
this.closeChannel(addr,channel);
throw new RemotingConnectException(addr);
}
}
private Channel getAndCreateChannel(final String addr) throws InterruptedException {
if (null == addr) {
return getAndCreateNameserverChannel();
}
ChannelWrapper cw = this.channelTables.get(addr);
if (cw != null && cw.isOK()) {
return cw.getChannel();
}
return this.createChannel(addr);
}
private Channel getAndCreateNameserverChannel() throws InterruptedException {
String addr = this.namesrvAddrChoosed.get();
if (addr != null) {
ChannelWrapper cw = this.channelTables.get(addr);
if (cw != null && cw.isOK()) {
return cw.getChannel();
}
}
final List<String> addrList = this.namesrvAddrList.get();
if (this.lockNamesrvChannel.tryLock(LOCK_TIMEOUT_MILLIS,TimeUnit.MILLISECONDS)) {
try {
addr = this.namesrvAddrChoosed.get();
if (addr != null) {
ChannelWrapper cw = this.channelTables.get(addr);
if (cw != null && cw.isOK()) {
return cw.getChannel();
}
}
if (addrList != null && !addrList.isEmpty()) {
for (int i = 0; i < addrList.size(); i++) {
int index = this.namesrvIndex.incrementAndGet();
index = Math.abs(index);
index = index % addrList.size();
String newAddr = addrList.get(index);
this.namesrvAddrChoosed.set(newAddr);
log.info("new name server is chosen. OLD: {},NEW: {}. namesrvIndex = {}",addr,newAddr,namesrvIndex);
Channel channelNew = this.createChannel(newAddr);
if (channelNew != null) {
return channelNew;
}
}
}
} catch (Exception e) {
log.error("getAndCreateNameserverChannel: create name server channel exception",e);
} finally {
this.lockNamesrvChannel.unlock();
}
} else {
log.warn("getAndCreateNameserverChannel: try to lock name server,but timeout,{}ms",LOCK_TIMEOUT_MILLIS);
}
return null;
}
private static int initValueIndex() {
Random r = new Random();
return Math.abs(r.nextInt() % 999) % 999;
}
//......
}
複製程式碼
- invokeSync首先通過getAndCreateChannel獲取channel,而getAndCreateChannel方法在addr為null時執行的是getAndCreateNameserverChannel;這裡取的是namesrvAddrChoosed.get(),若不為null則返回,為null的話則先從namesrvIndex.incrementAndGet()獲取index,取絕對值,然後再對addrList.size()取餘數作為選中的namesrv的地址,更新到namesrvAddrChoosed;namesrvIndex的初始值為initValueIndex,它通過
Math.abs(r.nextInt() % 999) % 999
算出一個隨機初始值
小結
- MQClientInstance的updateTopicRouteInfoFromNameServer首先從consumerTable及producerTable獲取topicList,然後遍歷topicList執行updateTopicRouteInfoFromNameServer,最後執行的是updateTopicRouteInfoFromNameServer(topic,subscribeInfo),最後將cloneTopicRouteData更新到topicRouteTable