1. 程式人生 > >RocketMQ 原始碼分析 BrokerStartUp(七)

RocketMQ 原始碼分析 BrokerStartUp(七)

前言

broker是訊息接收處理,訊息落地的核心模組。這個模組用於接收producer傳送的訊息以及consumer消費的訊息。 本章從broker的啟動類入手分析一下broker啟動的過程。

分析broker是為了能夠搞明白master和slave之間的關係,以及訊息落地的具體過程。

createBrokerController

broker啟動會獲取外部傳入的初始化引數, 也會從配置檔案中載入相關的配置引數 ,將引數儲存在對應的Configuration類當中。

public static BrokerController createBrokerController
(String[] args) { //設定rocketMQ版本資訊 System.setProperty(RemotingCommand.REMOTING_VERSION_KEY, Integer.toString(MQVersion.CURRENT_VERSION)); //驗遠端通訊的傳送快取和接收快取是否為空, 如果為空則設定預設值大小為131072 if (null == System.getProperty(NettySystemConfig.COM_ROCKETMQ_REMOTING_SOCKET_SNDBUF_SIZE)) { NettySystemConfig.
socketSndbufSize = 131072; } if (null == System.getProperty(NettySystemConfig.COM_ROCKETMQ_REMOTING_SOCKET_RCVBUF_SIZE)) { NettySystemConfig.socketRcvbufSize = 131072; } try { //PackageConflictDetect.detectFastjson(); Options options = ServerUtil.buildCommandlineOptions
(new Options()); commandLine = ServerUtil.parseCmdLine("mqbroker", args, buildCommandlineOptions(options), new PosixParser()); if (null == commandLine) { System.exit(-1); } final BrokerConfig brokerConfig = new BrokerConfig(); final NettyServerConfig nettyServerConfig = new NettyServerConfig(); final NettyClientConfig nettyClientConfig = new NettyClientConfig(); nettyClientConfig.setUseTLS(Boolean.parseBoolean(System.getProperty(TLS_ENABLE, String.valueOf(TlsSystemConfig.tlsMode == TlsMode.ENFORCING)))); nettyServerConfig.setListenPort(10911); final MessageStoreConfig messageStoreConfig = new MessageStoreConfig(); if (BrokerRole.SLAVE == messageStoreConfig.getBrokerRole()) { int ratio = messageStoreConfig.getAccessMessageInMemoryMaxRatio() - 10; messageStoreConfig.setAccessMessageInMemoryMaxRatio(ratio); } /** * 如果啟動命令列引數包含 -c 引數,會讀取配置到Propertis中, 讓後通過MixAll.properties2Object(), * 將讀取的配置檔案資訊存入brokerConfig, nettyServerConfig, nettyClientConfig, messageStoreConfig對應的實體類中 */ if (commandLine.hasOption('c')) { String file = commandLine.getOptionValue('c'); if (file != null) { configFile = file; InputStream in = new BufferedInputStream(new FileInputStream(file)); properties = new Properties(); properties.load(in); properties2SystemEnv(properties); MixAll.properties2Object(properties, brokerConfig); MixAll.properties2Object(properties, nettyServerConfig); MixAll.properties2Object(properties, nettyClientConfig); MixAll.properties2Object(properties, messageStoreConfig); BrokerPathConfigHelper.setBrokerConfigPath(file); in.close(); } } MixAll.properties2Object(ServerUtil.commandLine2Properties(commandLine), brokerConfig); if (null == brokerConfig.getRocketmqHome()) { System.out.printf("Please set the %s variable in your environment to match the location of the RocketMQ installation", MixAll.ROCKETMQ_HOME_ENV); System.exit(-2); } //從brokerConfig獲取namesrvAddr資訊,並將地址資訊轉為SocketAddress物件 String namesrvAddr = brokerConfig.getNamesrvAddr(); if (null != namesrvAddr) { try { String[] addrArray = namesrvAddr.split(";"); for (String addr : addrArray) { RemotingUtil.string2SocketAddress(addr); } } catch (Exception e) { System.out.printf( "The Name Server Address[%s] illegal, please set it as follows, \"127.0.0.1:9876;192.168.0.1:9876\"%n", namesrvAddr); System.exit(-3); } } //設定當前broker的角色(master,slave), 如果是同步/非同步MASTER資訊,brokerId=0 ;如果是SLAVE資訊,brokerId > 0 ; 如果brokerId < 0 , 會丟擲異常 switch (messageStoreConfig.getBrokerRole()) { case ASYNC_MASTER: case SYNC_MASTER: brokerConfig.setBrokerId(MixAll.MASTER_ID); break; case SLAVE: if (brokerConfig.getBrokerId() <= 0) { System.out.printf("Slave's brokerId must be > 0"); System.exit(-3); } break; default: break; } messageStoreConfig.setHaListenPort(nettyServerConfig.getListenPort() + 1); LoggerContext lc = (LoggerContext) LoggerFactory.getILoggerFactory(); JoranConfigurator configurator = new JoranConfigurator(); configurator.setContext(lc); lc.reset(); configurator.doConfigure(brokerConfig.getRocketmqHome() + "/conf/logback_broker.xml"); if (commandLine.hasOption('p')) { //列印配置資訊 InternalLogger console = InternalLoggerFactory.getLogger(LoggerName.BROKER_CONSOLE_NAME); MixAll.printObjectProperties(console, brokerConfig); MixAll.printObjectProperties(console, nettyServerConfig); MixAll.printObjectProperties(console, nettyClientConfig); MixAll.printObjectProperties(console, messageStoreConfig); System.exit(0); } else if (commandLine.hasOption('m')) { InternalLogger console = InternalLoggerFactory.getLogger(LoggerName.BROKER_CONSOLE_NAME); MixAll.printObjectProperties(console, brokerConfig, true); MixAll.printObjectProperties(console, nettyServerConfig, true); MixAll.printObjectProperties(console, nettyClientConfig, true); MixAll.printObjectProperties(console, messageStoreConfig, true); System.exit(0); } //將log的資訊配置到brokerConfig,nettyServerConfig,nettyClientConfig,messageStoreConfig物件中 log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME); MixAll.printObjectProperties(log, brokerConfig); MixAll.printObjectProperties(log, nettyServerConfig); MixAll.printObjectProperties(log, nettyClientConfig); MixAll.printObjectProperties(log, messageStoreConfig); final BrokerController controller = new BrokerController( brokerConfig, nettyServerConfig, nettyClientConfig, messageStoreConfig); // remember all configs to prevent discard controller.getConfiguration().registerConfig(properties); boolean initResult = controller.initialize(); if (!initResult) { controller.shutdown(); System.exit(-3); } //通過Runtime.getRuntime().addShutdownHook()設定,在jvm關閉之前需要處理的一些事情,系統會處理記憶體清理、物件銷燬等一系列操作 Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() { private volatile boolean hasShutdown = false; private AtomicInteger shutdownTimes = new AtomicInteger(0); @Override public void run() { synchronized (this) { log.info("Shutdown hook was invoked, {}", this.shutdownTimes.incrementAndGet()); if (!this.hasShutdown) { this.hasShutdown = true; long beginTime = System.currentTimeMillis(); controller.shutdown(); long consumingTimeTotal = System.currentTimeMillis() - beginTime; log.info("Shutdown hook over, consuming total time(ms): {}", consumingTimeTotal); } } } }, "ShutdownHook")); return controller; } catch (Throwable e) { e.printStackTrace(); System.exit(-1); } return null; }

當配置資訊設定完畢後, broker會將這些資訊傳入brokerController控制器當中,這個控制器會初始化載入很多的管理器:

  • rocketmq配置檔案資訊可以參考如下連結地址
  • rocketMQ配置檔案資訊
  • topicManager : 用於管理broker中儲存的所有topic的配置
  • consumerOffsetManager: 管理Consumer的消費進度
  • subscriptionGroupManager: 用來管理訂閱組,包括訂閱許可權等。

當管理器全部載入完畢後,控制器將開始進入下一步初始化,初始化多個執行緒池,包括sendMessageExecutor、pullMessageExecutor、adminBrokerExecutor、clientManagerExecutor, 分別用於傳送訊息執行器、拉取訊息執行器、broker管理器執行器、客戶端管理執行器, 這些執行器回放如執行緒池中處理, 來做併發執行。

public boolean  initialize() throws CloneNotSupportedException {
    //用於管理broker中儲存的所有topic的配置
    boolean result = this.topicConfigManager.load();
    //管理Consumer的消費進度
    result = result && this.consumerOffsetManager.load();
    //用來管理訂閱組,包括訂閱許可權等
    result = result && this.subscriptionGroupManager.load();
    // 用於消費者過濾配置
    result = result && this.consumerFilterManager.load();

    if (result) {//載入topicConfigManager、consumerOffsetManager、subscriptionGroupManager、consumerFilterManager ,將載入結果成功與否儲存在result中

        try {
            //用於broker層的訊息落地儲存
            this.messageStore =
                new DefaultMessageStore(this.messageStoreConfig, this.brokerStatsManager, this.messageArrivingListener,
                    this.brokerConfig);
            this.brokerStats = new BrokerStats((DefaultMessageStore) this.messageStore);
            //load plugin
            MessageStorePluginContext context = new MessageStorePluginContext(messageStoreConfig, brokerStatsManager, messageArrivingListener, brokerConfig);
            this.messageStore = MessageStoreFactory.build(context, this.messageStore);
            this.messageStore.getDispatcherList().addFirst(new CommitLogDispatcherCalcBitMap(this.brokerConfig, this.consumerFilterManager));
        } catch (IOException e) {
            result = false;
            log.error("Failed to initialize", e);
        }
    }

    result = result && this.messageStore.load();

    if (result) {
        this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.clientHousekeepingService);
        NettyServerConfig fastConfig = (NettyServerConfig) this.nettyServerConfig.clone();
        fastConfig.setListenPort(nettyServerConfig.getListenPort() - 2);
        this.fastRemotingServer = new NettyRemotingServer(fastConfig, this.clientHousekeepingService);
        //傳送訊息的執行緒組
        this.sendMessageExecutor = new BrokerFixedThreadPoolExecutor(
            this.brokerConfig.getSendMessageThreadPoolNums(),
            this.brokerConfig.getSendMessageThreadPoolNums(),
            1000 * 60,
            TimeUnit.MILLISECONDS,
            this.sendThreadPoolQueue,
            new ThreadFactoryImpl("SendMessageThread_"));

        //設定拉取訊息的執行緒組
        this.pullMessageExecutor = new BrokerFixedThreadPoolExecutor(
            this.brokerConfig.getPullMessageThreadPoolNums(),
            this.brokerConfig.getPullMessageThreadPoolNums(),
            1000 * 60,
            TimeUnit.MILLISECONDS,
            this.pullThreadPoolQueue,
            new ThreadFactoryImpl("PullMessageThread_"));

        //設定查詢訊息的執行緒組
        this.queryMessageExecutor = new BrokerFixedThreadPoolExecutor(
            this.brokerConfig.getQueryMessageThreadPoolNums(),
            this.brokerConfig.getQueryMessageThreadPoolNums(),
            1000 * 60,
            TimeUnit.MILLISECONDS,
            this.queryThreadPoolQueue,
            new ThreadFactoryImpl("QueryMessageThread_"));

        //broker管理器執行器
        this.adminBrokerExecutor =
            Executors.newFixedThreadPool(this.brokerConfig.getAdminBrokerThreadPoolNums(), new ThreadFactoryImpl(
                "AdminBrokerThread_"));

        //客戶端管理執行器
        this.clientManageExecutor = new ThreadPoolExecutor(
            this.brokerConfig.getClientManageThreadPoolNums(),
            this.brokerConfig.getClientManageThreadPoolNums(),
            1000 * 60,
            TimeUnit.MILLISECONDS,
            this.clientManagerThreadPoolQueue,
            new ThreadFactoryImpl("ClientManageThread_"));

        //心跳
        this.heartbeatExecutor = new BrokerFixedThreadPoolExecutor(
            this.brokerConfig.getHeartbeatThreadPoolNums(),
            this.brokerConfig.getHeartbeatThreadPoolNums(),
            1000 * 60,
            TimeUnit.MILLISECONDS,
            this.heartbeatThreadPoolQueue,
            new ThreadFactoryImpl("HeartbeatThread_", true));

        this.endTransactionExecutor = new BrokerFixedThreadPoolExecutor(
            this.brokerConfig.getEndTransactionThreadPoolNums(),
            this.brokerConfig.getEndTransactionThreadPoolNums(),
            1000 * 60,
            TimeUnit.MILLISECONDS,
            this.endTransactionThreadPoolQueue,
            new ThreadFactoryImpl("EndTransactionThread_"));

        //消費者管理執行緒組
        this.consumerManageExecutor =
            Executors.newFixedThreadPool(this.brokerConfig.getConsumerManageThreadPoolNums(), new ThreadFactoryImpl(
                "ConsumerManageThread_"));

        //把剛剛建立的執行器註冊到 remotingServer, fastRemotingServer物件中
        this.registerProcessor();

        final long initialDelay = UtilAll.computNextMorningTimeMillis() - System.currentTimeMillis();
        final long period = 1000 * 60 * 60 * 24;
        //定期查詢如下資訊

        //記錄broker的狀態
        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
            @Override
            public void run() {
                try {
                    BrokerController.this.getBrokerStats().record();
                } catch (Throwable e) {
                    log.error("schedule record error.", e);
                }
            }
        }, initialDelay, period, TimeUnit.MILLISECONDS);

        //消費者當前資訊的offset位置
        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
            @Override
            public void