1. 程式人生 > 程式設計 >聊聊rocketmq的sendHeartbeatToAllBrokerWithLock

聊聊rocketmq的sendHeartbeatToAllBrokerWithLock

本文主要研究一下rocketmq的sendHeartbeatToAllBrokerWithLock

sendHeartbeatToAllBrokerWithLock

rocketmq-client-4.6.0-sources.jar!/org/apache/rocketmq/client/impl/factory/MQClientInstance.java

public class MQClientInstance {
    private final static long LOCK_TIMEOUT_MILLIS = 3000;
    private final InternalLogger log = ClientLogger.getLog();

    //......

    public void sendHeartbeatToAllBrokerWithLock
() { if (this.lockHeartbeat.tryLock()) { try { this.sendHeartbeatToAllBroker(); this.uploadFilterClassSource(); } catch (final Exception e) { log.error("sendHeartbeatToAllBroker exception",e); } finally { this.lockHeartbeat.unlock(); } } else
{ log.warn("lock heartBeat,but failed."); } } private void sendHeartbeatToAllBroker() { final HeartbeatData heartbeatData = this.prepareHeartbeatData(); final boolean producerEmpty = heartbeatData.getProducerDataSet().isEmpty(); final boolean consumerEmpty = heartbeatData.getConsumerDataSet().isEmpty(); if
(producerEmpty && consumerEmpty) { log.warn("sending heartbeat,but no consumer and no producer"); return; } if (!this.brokerAddrTable.isEmpty()) { long times = this.sendHeartbeatTimesTotal.getAndIncrement(); Iterator<Entry<String,HashMap<Long,String>>> it = this.brokerAddrTable.entrySet().iterator(); while (it.hasNext()) { Entry<String,String>> entry = it.next(); String brokerName = entry.getKey(); HashMap<Long,String> oneTable = entry.getValue(); if (oneTable != null) { for (Map.Entry<Long,String> entry1 : oneTable.entrySet()) { Long id = entry1.getKey(); String addr = entry1.getValue(); if (addr != null) { if (consumerEmpty) { if (id != MixAll.MASTER_ID) continue; } try { int version = this.mQClientAPIImpl.sendHearbeat(addr,heartbeatData,3000); if (!this.brokerVersionTable.containsKey(brokerName)) { this.brokerVersionTable.put(brokerName,new HashMap<String,Integer>(4)); } this.brokerVersionTable.get(brokerName).put(addr,version); if (times % 20 == 0) { log.info("send heart beat to broker[{} {} {}] success",brokerName,id,addr); log.info(heartbeatData.toString()); } } catch (Exception e) { if (this.isBrokerInNameServer(addr)) { log.info("send heart beat to broker[{} {} {}] failed",addr,e); } else { log.info("send heart beat to broker[{} {} {}] exception,because the broker not up,forget it",e); } } } } } } } } private void uploadFilterClassSource() { Iterator<Entry<String,MQConsumerInner>> it = this.consumerTable.entrySet().iterator(); while (it.hasNext()) { Entry<String,MQConsumerInner> next = it.next(); MQConsumerInner consumer = next.getValue(); if (ConsumeType.CONSUME_PASSIVELY == consumer.consumeType()) { Set<SubscriptionData> subscriptions = consumer.subscriptions(); for (SubscriptionData sub : subscriptions) { if (sub.isClassFilterMode() && sub.getFilterClassSource() != null) { final String consumerGroup = consumer.groupName(); final String className = sub.getSubString(); final String topic = sub.getTopic(); final String filterClassSource = sub.getFilterClassSource(); try { this.uploadFilterClassToAllFilterServer(consumerGroup,className,topic,filterClassSource); } catch (Exception e) { log.error("uploadFilterClassToAllFilterServer Exception",e); } } } } } } //...... } 複製程式碼
  • MQClientInstance的sendHeartbeatToAllBrokerWithLock方法首先會執行lockHeartbeat.tryLock(),然後執行sendHeartbeatToAllBroker()及uploadFilterClassSource(),最後在finally中執行lockHeartbeat.unlock()
  • sendHeartbeatToAllBroker方法會通過prepareHeartbeatData構建heartbeatData,之後遍歷brokerAddrTable,執行mQClientAPIImpl.sendHearbeat(addr,3000)
  • uploadFilterClassSource方法會遍歷consumerTable,對於consumeType為ConsumeType.CONSUME_PASSIVELY型別的,會遍歷其subscriptions,對於sub.isClassFilterMode()且sub.getFilterClassSource()不為null的執行uploadFilterClassToAllFilterServer方法

sendHearbeat

rocketmq-client-4.6.0-sources.jar!/org/apache/rocketmq/client/impl/MQClientAPIImpl.java

public class MQClientAPIImpl {

    private final static InternalLogger log = ClientLogger.getLog();
    private static boolean sendSmartMsg =
        Boolean.parseBoolean(System.getProperty("org.apache.rocketmq.client.sendSmartMsg","true"));

    static {
        System.setProperty(RemotingCommand.REMOTING_VERSION_KEY,Integer.toString(MQVersion.CURRENT_VERSION));
    }

    //......

    public int sendHearbeat(
        final String addr,final HeartbeatData heartbeatData,final long timeoutMillis
    ) throws RemotingException,MQBrokerException,InterruptedException {
        RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.HEART_BEAT,null);
        request.setLanguage(clientConfig.getLanguage());
        request.setBody(heartbeatData.encode());
        RemotingCommand response = this.remotingClient.invokeSync(addr,request,timeoutMillis);
        assert response != null;
        switch (response.getCode()) {
            case ResponseCode.SUCCESS: {
                return response.getVersion();
            }
            default:
                break;
        }

        throw new MQBrokerException(response.getCode(),response.getRemark());
    }

    //......
}
複製程式碼
  • MQClientAPIImpl的sendHearbeat方法首先構建RemotingCommand.createRequestCommand(RequestCode.HEART_BEAT,null),之後通過remotingClient.invokeSync(addr,timeoutMillis)執行請求,成功的話返回response.getVersion(),否則丟擲MQBrokerException(response.getCode(),response.getRemark())

processRequest

rocketmq-all-4.6.0-source-release/broker/src/main/java/org/apache/rocketmq/broker/processor/ClientManageProcessor.java

public class ClientManageProcessor implements NettyRequestProcessor {
    private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
    private final BrokerController brokerController;

    public ClientManageProcessor(final BrokerController brokerController) {
        this.brokerController = brokerController;
    }

    //......

    public RemotingCommand processRequest(ChannelHandlerContext ctx,RemotingCommand request)
        throws RemotingCommandException {
        switch (request.getCode()) {
            case RequestCode.HEART_BEAT:
                return this.heartBeat(ctx,request);
            case RequestCode.UNREGISTER_CLIENT:
                return this.unregisterClient(ctx,request);
            case RequestCode.CHECK_CLIENT_CONFIG:
                return this.checkClientConfig(ctx,request);
            default:
                break;
        }
        return null;
    }

    public RemotingCommand heartBeat(ChannelHandlerContext ctx,RemotingCommand request) {
        RemotingCommand response = RemotingCommand.createResponseCommand(null);
        HeartbeatData heartbeatData = HeartbeatData.decode(request.getBody(),HeartbeatData.class);
        ClientChannelInfo clientChannelInfo = new ClientChannelInfo(
            ctx.channel(),heartbeatData.getClientID(),request.getLanguage(),request.getVersion()
        );

        for (ConsumerData data : heartbeatData.getConsumerDataSet()) {
            SubscriptionGroupConfig subscriptionGroupConfig =
                this.brokerController.getSubscriptionGroupManager().findSubscriptionGroupConfig(
                    data.getGroupName());
            boolean isNotifyConsumerIdsChangedEnable = true;
            if (null != subscriptionGroupConfig) {
                isNotifyConsumerIdsChangedEnable = subscriptionGroupConfig.isNotifyConsumerIdsChangedEnable();
                int topicSysFlag = 0;
                if (data.isUnitMode()) {
                    topicSysFlag = TopicSysFlag.buildSysFlag(false,true);
                }
                String newTopic = MixAll.getRetryTopic(data.getGroupName());
                this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(
                    newTopic,subscriptionGroupConfig.getRetryQueueNums(),PermName.PERM_WRITE | PermName.PERM_READ,topicSysFlag);
            }

            boolean changed = this.brokerController.getConsumerManager().registerConsumer(
                data.getGroupName(),clientChannelInfo,data.getConsumeType(),data.getMessageModel(),data.getConsumeFromWhere(),data.getSubscriptionDataSet(),isNotifyConsumerIdsChangedEnable
            );

            if (changed) {
                log.info("registerConsumer info changed {} {}",data.toString(),RemotingHelper.parseChannelRemoteAddr(ctx.channel())
                );
            }
        }

        for (ProducerData data : heartbeatData.getProducerDataSet()) {
            this.brokerController.getProducerManager().registerProducer(data.getGroupName(),clientChannelInfo);
        }
        response.setCode(ResponseCode.SUCCESS);
        response.setRemark(null);
        return response;
    }

    //......
}
複製程式碼
  • processRequest方法對於RequestCode.HEART_BEAT會執行heartBeat(ctx,request)方法,該方法會解析body為heartbeatData,之後遍歷heartbeatData的consumerData及producerData做不同處理,最後返回處理結果
  • 對於consumerData,會先取出subscriptionGroupConfig,對於config不為null的會執行brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod方法;最後執行brokerController.getConsumerManager().registerConsumer方法
  • 對於producerData,則執行brokerController.getProducerManager().registerProducer(data.getGroupName(),clientChannelInfo)方法

小結

  • MQClientInstance的sendHeartbeatToAllBrokerWithLock方法首先會執行lockHeartbeat.tryLock(),然後執行sendHeartbeatToAllBroker()及uploadFilterClassSource(),最後在finally中執行lockHeartbeat.unlock()
  • sendHeartbeatToAllBroker方法會通過prepareHeartbeatData構建heartbeatData,之後遍歷brokerAddrTable,執行mQClientAPIImpl.sendHearbeat(addr,3000)
  • uploadFilterClassSource方法會遍歷consumerTable,對於consumeType為ConsumeType.CONSUME_PASSIVELY型別的,會遍歷其subscriptions,對於sub.isClassFilterMode()且sub.getFilterClassSource()不為null的執行uploadFilterClassToAllFilterServer方法

doc