聊聊rocketmq的sendHeartbeatToAllBrokerWithLock
阿新 • • 發佈:2020-06-24
序
本文主要研究一下rocketmq的sendHeartbeatToAllBrokerWithLock
sendHeartbeatToAllBrokerWithLock
rocketmq-client-4.6.0-sources.jar!/org/apache/rocketmq/client/impl/factory/MQClientInstance.java
public class MQClientInstance {
private final static long LOCK_TIMEOUT_MILLIS = 3000;
private final InternalLogger log = ClientLogger.getLog();
//......
public void sendHeartbeatToAllBrokerWithLock () {
if (this.lockHeartbeat.tryLock()) {
try {
this.sendHeartbeatToAllBroker();
this.uploadFilterClassSource();
} catch (final Exception e) {
log.error("sendHeartbeatToAllBroker exception",e);
} finally {
this.lockHeartbeat.unlock();
}
} else {
log.warn("lock heartBeat,but failed.");
}
}
private void sendHeartbeatToAllBroker() {
final HeartbeatData heartbeatData = this.prepareHeartbeatData();
final boolean producerEmpty = heartbeatData.getProducerDataSet().isEmpty();
final boolean consumerEmpty = heartbeatData.getConsumerDataSet().isEmpty();
if (producerEmpty && consumerEmpty) {
log.warn("sending heartbeat,but no consumer and no producer");
return;
}
if (!this.brokerAddrTable.isEmpty()) {
long times = this.sendHeartbeatTimesTotal.getAndIncrement();
Iterator<Entry<String,HashMap<Long,String>>> it = this.brokerAddrTable.entrySet().iterator();
while (it.hasNext()) {
Entry<String,String>> entry = it.next();
String brokerName = entry.getKey();
HashMap<Long,String> oneTable = entry.getValue();
if (oneTable != null) {
for (Map.Entry<Long,String> entry1 : oneTable.entrySet()) {
Long id = entry1.getKey();
String addr = entry1.getValue();
if (addr != null) {
if (consumerEmpty) {
if (id != MixAll.MASTER_ID)
continue;
}
try {
int version = this.mQClientAPIImpl.sendHearbeat(addr,heartbeatData,3000);
if (!this.brokerVersionTable.containsKey(brokerName)) {
this.brokerVersionTable.put(brokerName,new HashMap<String,Integer>(4));
}
this.brokerVersionTable.get(brokerName).put(addr,version);
if (times % 20 == 0) {
log.info("send heart beat to broker[{} {} {}] success",brokerName,id,addr);
log.info(heartbeatData.toString());
}
} catch (Exception e) {
if (this.isBrokerInNameServer(addr)) {
log.info("send heart beat to broker[{} {} {}] failed",addr,e);
} else {
log.info("send heart beat to broker[{} {} {}] exception,because the broker not up,forget it",e);
}
}
}
}
}
}
}
}
private void uploadFilterClassSource() {
Iterator<Entry<String,MQConsumerInner>> it = this.consumerTable.entrySet().iterator();
while (it.hasNext()) {
Entry<String,MQConsumerInner> next = it.next();
MQConsumerInner consumer = next.getValue();
if (ConsumeType.CONSUME_PASSIVELY == consumer.consumeType()) {
Set<SubscriptionData> subscriptions = consumer.subscriptions();
for (SubscriptionData sub : subscriptions) {
if (sub.isClassFilterMode() && sub.getFilterClassSource() != null) {
final String consumerGroup = consumer.groupName();
final String className = sub.getSubString();
final String topic = sub.getTopic();
final String filterClassSource = sub.getFilterClassSource();
try {
this.uploadFilterClassToAllFilterServer(consumerGroup,className,topic,filterClassSource);
} catch (Exception e) {
log.error("uploadFilterClassToAllFilterServer Exception",e);
}
}
}
}
}
}
//......
}
複製程式碼
- MQClientInstance的sendHeartbeatToAllBrokerWithLock方法首先會執行lockHeartbeat.tryLock(),然後執行sendHeartbeatToAllBroker()及uploadFilterClassSource(),最後在finally中執行lockHeartbeat.unlock()
- sendHeartbeatToAllBroker方法會通過prepareHeartbeatData構建heartbeatData,之後遍歷brokerAddrTable,執行mQClientAPIImpl.sendHearbeat(addr,3000)
- uploadFilterClassSource方法會遍歷consumerTable,對於consumeType為ConsumeType.CONSUME_PASSIVELY型別的,會遍歷其subscriptions,對於sub.isClassFilterMode()且sub.getFilterClassSource()不為null的執行uploadFilterClassToAllFilterServer方法
sendHearbeat
rocketmq-client-4.6.0-sources.jar!/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
public class MQClientAPIImpl {
private final static InternalLogger log = ClientLogger.getLog();
private static boolean sendSmartMsg =
Boolean.parseBoolean(System.getProperty("org.apache.rocketmq.client.sendSmartMsg","true"));
static {
System.setProperty(RemotingCommand.REMOTING_VERSION_KEY,Integer.toString(MQVersion.CURRENT_VERSION));
}
//......
public int sendHearbeat(
final String addr,final HeartbeatData heartbeatData,final long timeoutMillis
) throws RemotingException,MQBrokerException,InterruptedException {
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.HEART_BEAT,null);
request.setLanguage(clientConfig.getLanguage());
request.setBody(heartbeatData.encode());
RemotingCommand response = this.remotingClient.invokeSync(addr,request,timeoutMillis);
assert response != null;
switch (response.getCode()) {
case ResponseCode.SUCCESS: {
return response.getVersion();
}
default:
break;
}
throw new MQBrokerException(response.getCode(),response.getRemark());
}
//......
}
複製程式碼
- MQClientAPIImpl的sendHearbeat方法首先構建RemotingCommand.createRequestCommand(RequestCode.HEART_BEAT,null),之後通過remotingClient.invokeSync(addr,timeoutMillis)執行請求,成功的話返回response.getVersion(),否則丟擲MQBrokerException(response.getCode(),response.getRemark())
processRequest
rocketmq-all-4.6.0-source-release/broker/src/main/java/org/apache/rocketmq/broker/processor/ClientManageProcessor.java
public class ClientManageProcessor implements NettyRequestProcessor {
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
private final BrokerController brokerController;
public ClientManageProcessor(final BrokerController brokerController) {
this.brokerController = brokerController;
}
//......
public RemotingCommand processRequest(ChannelHandlerContext ctx,RemotingCommand request)
throws RemotingCommandException {
switch (request.getCode()) {
case RequestCode.HEART_BEAT:
return this.heartBeat(ctx,request);
case RequestCode.UNREGISTER_CLIENT:
return this.unregisterClient(ctx,request);
case RequestCode.CHECK_CLIENT_CONFIG:
return this.checkClientConfig(ctx,request);
default:
break;
}
return null;
}
public RemotingCommand heartBeat(ChannelHandlerContext ctx,RemotingCommand request) {
RemotingCommand response = RemotingCommand.createResponseCommand(null);
HeartbeatData heartbeatData = HeartbeatData.decode(request.getBody(),HeartbeatData.class);
ClientChannelInfo clientChannelInfo = new ClientChannelInfo(
ctx.channel(),heartbeatData.getClientID(),request.getLanguage(),request.getVersion()
);
for (ConsumerData data : heartbeatData.getConsumerDataSet()) {
SubscriptionGroupConfig subscriptionGroupConfig =
this.brokerController.getSubscriptionGroupManager().findSubscriptionGroupConfig(
data.getGroupName());
boolean isNotifyConsumerIdsChangedEnable = true;
if (null != subscriptionGroupConfig) {
isNotifyConsumerIdsChangedEnable = subscriptionGroupConfig.isNotifyConsumerIdsChangedEnable();
int topicSysFlag = 0;
if (data.isUnitMode()) {
topicSysFlag = TopicSysFlag.buildSysFlag(false,true);
}
String newTopic = MixAll.getRetryTopic(data.getGroupName());
this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(
newTopic,subscriptionGroupConfig.getRetryQueueNums(),PermName.PERM_WRITE | PermName.PERM_READ,topicSysFlag);
}
boolean changed = this.brokerController.getConsumerManager().registerConsumer(
data.getGroupName(),clientChannelInfo,data.getConsumeType(),data.getMessageModel(),data.getConsumeFromWhere(),data.getSubscriptionDataSet(),isNotifyConsumerIdsChangedEnable
);
if (changed) {
log.info("registerConsumer info changed {} {}",data.toString(),RemotingHelper.parseChannelRemoteAddr(ctx.channel())
);
}
}
for (ProducerData data : heartbeatData.getProducerDataSet()) {
this.brokerController.getProducerManager().registerProducer(data.getGroupName(),clientChannelInfo);
}
response.setCode(ResponseCode.SUCCESS);
response.setRemark(null);
return response;
}
//......
}
複製程式碼
- processRequest方法對於RequestCode.HEART_BEAT會執行heartBeat(ctx,request)方法,該方法會解析body為heartbeatData,之後遍歷heartbeatData的consumerData及producerData做不同處理,最後返回處理結果
- 對於consumerData,會先取出subscriptionGroupConfig,對於config不為null的會執行brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod方法;最後執行brokerController.getConsumerManager().registerConsumer方法
- 對於producerData,則執行brokerController.getProducerManager().registerProducer(data.getGroupName(),clientChannelInfo)方法
小結
- MQClientInstance的sendHeartbeatToAllBrokerWithLock方法首先會執行lockHeartbeat.tryLock(),然後執行sendHeartbeatToAllBroker()及uploadFilterClassSource(),最後在finally中執行lockHeartbeat.unlock()
- sendHeartbeatToAllBroker方法會通過prepareHeartbeatData構建heartbeatData,之後遍歷brokerAddrTable,執行mQClientAPIImpl.sendHearbeat(addr,3000)
- uploadFilterClassSource方法會遍歷consumerTable,對於consumeType為ConsumeType.CONSUME_PASSIVELY型別的,會遍歷其subscriptions,對於sub.isClassFilterMode()且sub.getFilterClassSource()不為null的執行uploadFilterClassToAllFilterServer方法