1. 程式人生 > >RocketMQ原始碼分析之rocketmq-broker啟動 (二)

RocketMQ原始碼分析之rocketmq-broker啟動 (二)

本文章會從以下幾個方面介紹rocketmq-broker啟動流程
1. Broker簡單介紹
1.1 initialize
1.2 start
2. broker啟動程式碼分析
2.1 roketmq-broker啟動入口
2.2 通過createBrokerController建立BrokerController物件
2.3 執行initialize方法進行初始化操作
2.4. 通過start啟動broker資訊

一,broker簡單介紹

broker是訊息接收處理,落地的核心模組。這個模組用於接收producer傳送的訊息以及consumer消費的訊息。
broker啟動主要分為initialize和start兩個步驟。

broker的啟動流程

圖一、broker啟動流程圖

1.1 initialize

1) broker啟動會獲取外部傳入的初始化引數, 也會從配置檔案中載入相關的配置引數 ,將引數儲存在對應的Configuration類當中, Configuration類包含的訊息如下

  • broker自身的配置: 包括根目錄 , namesrv地址,broker的IP和名稱,訊息佇列數,收發訊息執行緒池數等引數
  • netty啟動配置: 包括監聽埠, 工作執行緒數, 非同步傳送訊息訊號量數量等引數。
  • 儲存層配置:包括儲存根目錄,CommitLog 配置,持久化策略配置等引數

2) 當配置資訊設定完畢後, broker會將這些資訊傳入brokerController控制器當中,這個控制器會初始化載入很多的管理器

  • topicManager : 用於管理broker中儲存的所有topic的配置
  • consumerOffsetManager: 管理Consumer的消費進度
  • subscriptionGroupManager: 用來管理訂閱組,包括訂閱許可權等
  • messageStore: 用於broker層的訊息落地儲存

3) 當管理器全部載入完畢後,控制器將開始進入下一步初始化

  • 啟動netty伺服器,用於接收訊息
  • 初始化多個執行緒池,包括sendMessageExecutor、pullMessageExecutor、adminBrokerExecutor、clientManagerExecutor, 分別用於傳送訊息執行器、拉取訊息執行器、broker管理器執行器、客戶端管理執行器, 這些執行器回放如執行緒池中處理, 來做併發執行。

1.2 start
當broker初始化了配置引數後,就可以開始啟動了

  • 啟動剛剛初始化的各個管理器以後:topicManager , consumerOffsetManager, subscriptionGroupManager, messgeStoreManager
  • 開啟定時排程執行緒, 每隔30s向nameSrv上報自己的資訊
  • 啟動執行緒處理無用的topic

二、broker啟動程式碼分析

2.1 roketmq-broker啟動入口

    public class BrokerStartup {
        //...
        public static void main(String[] args) {
            start(createBrokerController(args));
        }
        //...
    }
  • 第3-5行, broker啟動程式入口, 包含兩部分, createBrokerController(args) 和 start(brokerController)

2.2 通過createBrokerController建立BrokerController物件

public static BrokerController createBrokerController(String[] args) {
        System.setProperty(RemotingCommand.REMOTING_VERSION_KEY, Integer.toString(MQVersion.CURRENT_VERSION));

        if (null == System.getProperty(NettySystemConfig.COM_ROCKETMQ_REMOTING_SOCKET_SNDBUF_SIZE)) {
            NettySystemConfig.socketSndbufSize = 131072;
        }

        if (null == System.getProperty(NettySystemConfig.COM_ROCKETMQ_REMOTING_SOCKET_RCVBUF_SIZE)) {
            NettySystemConfig.socketRcvbufSize = 131072;
        }

        try {
            //PackageConflictDetect.detectFastjson();
            Options options = ServerUtil.buildCommandlineOptions(new Options());
            commandLine = ServerUtil.parseCmdLine("mqbroker", args, buildCommandlineOptions(options),
                new PosixParser());
            if (null == commandLine) {
                System.exit(-1);
            }

            final BrokerConfig brokerConfig = new BrokerConfig();
            final NettyServerConfig nettyServerConfig = new NettyServerConfig();
            final NettyClientConfig nettyClientConfig = new NettyClientConfig();

            nettyClientConfig.setUseTLS(Boolean.parseBoolean(System.getProperty(TLS_ENABLE,
                String.valueOf(TlsSystemConfig.tlsMode == TlsMode.ENFORCING))));
            nettyServerConfig.setListenPort(10911);
            final MessageStoreConfig messageStoreConfig = new MessageStoreConfig();

            if (BrokerRole.SLAVE == messageStoreConfig.getBrokerRole()) {
                int ratio = messageStoreConfig.getAccessMessageInMemoryMaxRatio() - 10;
                messageStoreConfig.setAccessMessageInMemoryMaxRatio(ratio);
            }

            if (commandLine.hasOption('c')) {
                String file = commandLine.getOptionValue('c');
                if (file != null) {
                    configFile = file;
                    InputStream in = new BufferedInputStream(new FileInputStream(file));
                    properties = new Properties();
                    properties.load(in);

                    properties2SystemEnv(properties);
                    MixAll.properties2Object(properties, brokerConfig);
                    MixAll.properties2Object(properties, nettyServerConfig);
                    MixAll.properties2Object(properties, nettyClientConfig);
                    MixAll.properties2Object(properties, messageStoreConfig);

                    BrokerPathConfigHelper.setBrokerConfigPath(file);
                    in.close();
                }
            }

            MixAll.properties2Object(ServerUtil.commandLine2Properties(commandLine), brokerConfig);

            if (null == brokerConfig.getRocketmqHome()) {
                System.out.printf("Please set the " + MixAll.ROCKETMQ_HOME_ENV
                    + " variable in your environment to match the location of the RocketMQ installation");
                System.exit(-2);
            }

            String namesrvAddr = brokerConfig.getNamesrvAddr();
            if (null != namesrvAddr) {
                try {
                    String[] addrArray = namesrvAddr.split(";");
                    for (String addr : addrArray) {
                        RemotingUtil.string2SocketAddress(addr);
                    }
                } catch (Exception e) {
                    System.out.printf(
                        "The Name Server Address[%s] illegal, please set it as follows, \"127.0.0.1:9876;192.168.0.1:9876\"%n",
                        namesrvAddr);
                    System.exit(-3);
                }
            }

            switch (messageStoreConfig.getBrokerRole()) {
                case ASYNC_MASTER:
                case SYNC_MASTER:
                    brokerConfig.setBrokerId(MixAll.MASTER_ID);
                    break;
                case SLAVE:
                    if (brokerConfig.getBrokerId() <= 0) {
                        System.out.printf("Slave's brokerId must be > 0");
                        System.exit(-3);
                    }

                    break;
                default:
                    break;
            }

            messageStoreConfig.setHaListenPort(nettyServerConfig.getListenPort() + 1);
            LoggerContext lc = (LoggerContext) LoggerFactory.getILoggerFactory();
            JoranConfigurator configurator = new JoranConfigurator();
            configurator.setContext(lc);
            lc.reset();
            configurator.doConfigure(brokerConfig.getRocketmqHome() + "/conf/logback_broker.xml");

            if (commandLine.hasOption('p')) {
                Logger console = LoggerFactory.getLogger(LoggerName.BROKER_CONSOLE_NAME);
                MixAll.printObjectProperties(console, brokerConfig);
                MixAll.printObjectProperties(console, nettyServerConfig);
                MixAll.printObjectProperties(console, nettyClientConfig);
                MixAll.printObjectProperties(console, messageStoreConfig);
                System.exit(0);
            } else if (commandLine.hasOption('m')) {
                Logger console = LoggerFactory.getLogger(LoggerName.BROKER_CONSOLE_NAME);
                MixAll.printObjectProperties(console, brokerConfig, true);
                MixAll.printObjectProperties(console, nettyServerConfig, true);
                MixAll.printObjectProperties(console, nettyClientConfig, true);
                MixAll.printObjectProperties(console, messageStoreConfig, true);
                System.exit(0);
            }

            log = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
            MixAll.printObjectProperties(log, brokerConfig);
            MixAll.printObjectProperties(log, nettyServerConfig);
            MixAll.printObjectProperties(log, nettyClientConfig);
            MixAll.printObjectProperties(log, messageStoreConfig);

            final BrokerController controller = new BrokerController(
                brokerConfig,
                nettyServerConfig,
                nettyClientConfig,
                messageStoreConfig);
            // remember all configs to prevent discard
            controller.getConfiguration().registerConfig(properties);

            boolean initResult = controller.initialize();
            if (!initResult) {
                controller.shutdown();
                System.exit(-3);
            }

            Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
                private volatile boolean hasShutdown = false;
                private AtomicInteger shutdownTimes = new AtomicInteger(0);

                @Override
                public void run() {
                    synchronized (this) {
                        log.info("Shutdown hook was invoked, {}", this.shutdownTimes.incrementAndGet());
                        if (!this.hasShutdown) {
                            this.hasShutdown = true;
                            long beginTime = System.currentTimeMillis();
                            controller.shutdown();
                            long consumingTimeTotal = System.currentTimeMillis() - beginTime;
                            log.info("Shutdown hook over, consuming total time(ms): {}", consumingTimeTotal);
                        }
                    }
                }
            }, "ShutdownHook"));

            return controller;
        } catch (Throwable e) {
            e.printStackTrace();
            System.exit(-1);
        }

        return null;
    }
  • 第2行,設定rocketMQ版本資訊
  • 第4-10行,校驗遠端通訊的傳送快取和接收快取是否為空, 如果為空則設定預設值大小為131072
    • NettySystemConfig.socketSndbufSize = 131072;
    • NettySystemConfig.socketRcvbufSize = 131072;
  • 第14-19行,buildCommandlineOptions()構造命令列解析Options, 這個過程已經設定了h(help)、n(namesrvAddr)兩個引數, 後面還會配置c(configFile), p(printConfigItem), m(printImportantConfig);parseCmdLine()方法會對mqbroker啟動命令進行解析(*nix系統), 比如: nohup ./mqbroker -n 192.168.2.1:9876 ; 如果解析的commandLine==null, 則退出
  • 第35-52行,如果啟動命令列引數包含 -c 引數,會讀取配置到Propertis中, 讓後通過MixAll.properties2Object(), 將讀取的配置檔案資訊存入brokerConfig, nettyServerConfig, nettyClientConfig, messageStoreConfig對應的實體類中,最後將配置檔案路徑資訊儲存到BrokerPathConfigHelper中brokerConfigPath變數中
  • 第54行,將其它命令列引數讀取到Properties中,並將資訊儲存到brokerConfig物件中
  • 第62-75行,從brokerConfig獲取namesrvAddr資訊,並將地址資訊轉為SocketAddress物件
  • 第77-91行,設定當前broker的角色(master,slave), 如果是同步/非同步MASTER資訊,brokerId=0 ;如果是SLAVE資訊,brokerId > 0 ; 如果brokerId < 0 , 會丟擲異常
  • 第116-121行,將log的資訊配置到brokerConfig,nettyServerConfig,nettyClientConfig,messageStoreConfig物件中
  • 第122-128行,通過brokerConfig,nettyServerConfig,nettyClientConfig,messageStoreConfig物件建立BrokerController物件,並將propertis中儲存的資訊儲存到brokerController的Configuration屬性中
  • 第136-159行,通過Runtime.getRuntime().addShutdownHook()設定,在jvm關閉之前需要處理的一些事情,系統會處理記憶體清理、物件銷燬等一系列操作, 這裡是對brokerController進行關閉操作。

2.3 執行initialize方法進行初始化操作

public boolean initialize() throws CloneNotSupportedException {
        boolean result = this.topicConfigManager.load();

        result = result && this.consumerOffsetManager.load();
        result = result && this.subscriptionGroupManager.load();
        result = result && this.consumerFilterManager.load();

        if (result) {
            try {
                this.messageStore =
                    new DefaultMessageStore(this.messageStoreConfig, this.brokerStatsManager, this.messageArrivingListener,
                        this.brokerConfig);
                this.brokerStats = new BrokerStats((DefaultMessageStore) this.messageStore);
                //load plugin
                MessageStorePluginContext context = new MessageStorePluginContext(messageStoreConfig, brokerStatsManager, messageArrivingListener, brokerConfig);
                this.messageStore = MessageStoreFactory.build(context, this.messageStore);
                this.messageStore.getDispatcherList().addFirst(new CommitLogDispatcherCalcBitMap(this.brokerConfig, this.consumerFilterManager));
            } catch (IOException e) {
                result = false;
                log.error("Failed to initialize", e);
            }
        }

        result = result && this.messageStore.load();

        if (result) {
            this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.clientHousekeepingService);
            NettyServerConfig fastConfig = (NettyServerConfig) this.nettyServerConfig.clone();
            fastConfig.setListenPort(nettyServerConfig.getListenPort() - 2);
            this.fastRemotingServer = new NettyRemotingServer(fastConfig, this.clientHousekeepingService);
            this.sendMessageExecutor = new BrokerFixedThreadPoolExecutor(
                this.brokerConfig.getSendMessageThreadPoolNums(),
                this.brokerConfig.getSendMessageThreadPoolNums(),
                1000 * 60,
                TimeUnit.MILLISECONDS,
                this.sendThreadPoolQueue,
                new ThreadFactoryImpl("SendMessageThread_"));

            this.pullMessageExecutor = new BrokerFixedThreadPoolExecutor(
                this.brokerConfig.getPullMessageThreadPoolNums(),
                this.brokerConfig.getPullMessageThreadPoolNums(),
                1000 * 60,
                TimeUnit.MILLISECONDS,
                this.pullThreadPoolQueue,
                new ThreadFactoryImpl("PullMessageThread_"));

            this.queryMessageExecutor = new BrokerFixedThreadPoolExecutor(
                this.brokerConfig.getQueryMessageThreadPoolNums(),
                this.brokerConfig.getQueryMessageThreadPoolNums(),
                1000 * 60,
                TimeUnit.MILLISECONDS,
                this.queryThreadPoolQueue,
                new ThreadFactoryImpl("QueryMessageThread_"));

            this.adminBrokerExecutor =
                Executors.newFixedThreadPool(this.brokerConfig.getAdminBrokerThreadPoolNums(), new ThreadFactoryImpl(
                    "AdminBrokerThread_"));

            this.clientManageExecutor = new ThreadPoolExecutor(
                this.brokerConfig.getClientManageThreadPoolNums(),
                this.brokerConfig.getClientManageThreadPoolNums(),
                1000 * 60,
                TimeUnit.MILLISECONDS,
                this.clientManagerThreadPoolQueue,
                new ThreadFactoryImpl("ClientManageThread_"));

            this.consumerManageExecutor =
                Executors.newFixedThreadPool(this.brokerConfig.getConsumerManageThreadPoolNums(), new ThreadFactoryImpl(
                    "ConsumerManageThread_"));

            this.registerProcessor();

            final long initialDelay = UtilAll.computNextMorningTimeMillis() - System.currentTimeMillis();
            final long period = 1000 * 60 * 60 * 24;
            this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
                @Override
                public void run() {
                    try {
                        BrokerController.this.getBrokerStats().record();
                    } catch (Throwable e) {
                        log.error("schedule record error.", e);
                    }
                }
            }, initialDelay, period, TimeUnit.MILLISECONDS);

            this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
                @Override
                public void run() {
                    try {
                        BrokerController.this.consumerOffsetManager.persist();
                    } catch (Throwable e) {
                        log.error("schedule persist consumerOffset error.", e);
                    }
                }
            }, 1000 * 10, this.brokerConfig.getFlushConsumerOffsetInterval(), TimeUnit.MILLISECONDS);

            this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
                @Override
                public void run() {
                    try {
                        BrokerController.this.consumerFilterManager.persist();
                    } catch (Throwable e) {
                        log.error("schedule persist consumer filter error.", e);
                    }
                }
            }, 1000 * 10, 1000 * 10, TimeUnit.MILLISECONDS);

            this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
                @Override
                public void run() {
                    try {
                        BrokerController.this.protectBroker();
                    } catch (Throwable e) {
                        log.error("protectBroker error.", e);
                    }
                }
            }, 3, 3, TimeUnit.MINUTES);

            this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
                @Override
                public void run() {
                    try {
                        BrokerController.this.printWaterMark();
                    } catch (Throwable e) {
                        log.error("printWaterMark error.", e);
                    }
                }
            }, 10, 1, TimeUnit.SECONDS);

            this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

                @Override
                public void run() {
                    try {
                        log.info("dispatch behind commit log {} bytes", BrokerController.this.getMessageStore().dispatchBehindBytes());
                    } catch (Throwable e) {
                        log.error("schedule dispatchBehindBytes error.", e);
                    }
                }
            }, 1000 * 10, 1000 * 60, TimeUnit.MILLISECONDS);

            if (this.brokerConfig.getNamesrvAddr() != null) {
                this.brokerOuterAPI.updateNameServerAddressList(this.brokerConfig.getNamesrvAddr());
                log.info("Set user specified name server address: {}", this.brokerConfig.getNamesrvAddr());
            } else if (this.brokerConfig.isFetchNamesrvAddrByAddressServer()) {
                this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

                    @Override
                    public void run() {
                        try {
                            BrokerController.this.brokerOuterAPI.fetchNameServerAddr();
                        } catch (Throwable e) {
                            log.error("ScheduledTask fetchNameServerAddr exception", e);
                        }
                    }
                }, 1000 * 10, 1000 * 60 * 2, TimeUnit.MILLISECONDS);
            }

            if (BrokerRole.SLAVE == this.messageStoreConfig.getBrokerRole()) {
                if (this.messageStoreConfig.getHaMasterAddress() != null && this.messageStoreConfig.getHaMasterAddress().length() >= 6) {
                    this.messageStore.updateHaMasterAddress(this.messageStoreConfig.getHaMasterAddress());
                    this.updateMasterHAServerAddrPeriodically = false;
                } else {
                    this.updateMasterHAServerAddrPeriodically = true;
                }

                this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

                    @Override
                    public void run() {
                        try {
                            BrokerController.this.slaveSynchronize.syncAll();
                        } catch (Throwable e) {
                            log.error("ScheduledTask syncAll slave exception", e);
                        }
                    }
                }, 1000 * 10, 1000 * 60, TimeUnit.MILLISECONDS);
            } else {
                this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

                    @Override
                    public void run() {
                        try {
                            BrokerController.this.printMasterAndSlaveDiff();
                        } catch (Throwable e) {
                            log.error("schedule printMasterAndSlaveDiff error.", e);
                        }
                    }
                }, 1000 * 10, 1000 * 60, TimeUnit.MILLISECONDS);
            }
        }

        return result;
    }
  • 第2-6行, 載入topicConfigManager、consumerOffsetManager、subscriptionGroupManager、consumerFilterManager ,將載入結果成功與否儲存在result中
  • 第8-22行,如果前面的資源載入成功,會構建messageStore物件資訊,在載入外掛plugins到content後,會通過MessageStoreFactory.build()再次處理messageStore資訊,最後通過load()方法載入messageStore資訊
  • 第26-191行,校驗資源載入result是否成功,如果不成功,則直接返回result結果false, 如果result = true,
    1) 建立sendMessageExecutor、pullMessageExecutor、queryMessageExecutor、adminBrokerExecutor、clientManageExecutor、consumerManageExecutor執行器資訊
    2) 通過registerProcessor()方法向remotingServer, fastRemotingServer物件中註冊剛剛建立的執行器(Excuter)資訊
    3) 通過scheduledExecutorService:ScheduledExecutorService 物件,定期查詢如下資訊
    • BrokerController.this.getBrokerStats().record(); 記錄broker的狀態
    • BrokerController.this.consumerOffsetManager.persist(); 消費者當前資訊的offset位置
    • BrokerController.this.consumerFilterManager.persist(); 消費者filterManaer資訊
    • BrokerController.this.protectBroker(); Broker資訊
    • BrokerController.this.printWaterMark();
    • BrokerController.this.getMessageStore().dispatchBehindBytes() 列印日誌資訊
    • BrokerController.this.brokerOuterAPI.fetchNameServerAddr(); 獲取namesrv地址資訊
    • BrokerController.this.slaveSynchronize.syncAll(); 同步從broker資訊,

2.4. 通過start啟動broker資訊

public static BrokerController start(BrokerController controller) {
        try {
            controller.start();
            // ...
            return controller;
        } catch (Throwable e) {
            e.printStackTrace();
            System.exit(-1);
        }
        return null;
    }
  • 第3行,啟動brokerControler
public void start() throws Exception {
        if (this.messageStore != null) {
            this.messageStore.start();
        }

        if (this.remotingServer != null) {
            this.remotingServer.start();
        }

        if (this.fastRemotingServer != null) {
            this.fastRemotingServer.start();
        }

        if (this.brokerOuterAPI != null) {
            this.brokerOuterAPI.start();
        }

        if (this.pullRequestHoldService != null) {
            this.pullRequestHoldService.start();
        }

        if (this.clientHousekeepingService != null) {
            this.clientHousekeepingService.start();
        }

        if (this.filterServerManager != null) {
            this.filterServerManager.start();
        }

        this.registerBrokerAll(true, false);
        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
            @Override
            public void run() {
                try {
                    BrokerController.this.registerBrokerAll(true, false);
                } catch (Throwable e) {
                    log.error("registerBrokerAll Exception", e);
                }
            }
        }, 1000 * 10, 1000 * 30, TimeUnit.MILLISECONDS);
        //...
    }
  • 第2-28行,如果引數校驗不為空,則啟動messageStore、remotingServer、fastRemotingServer、brokerOuterAPI、pullRequestHoldService、clientHousekeepingService、filterServerManager相關資訊
  • 第30-40行,通過方法registerBrokerAll()開啟排程執行緒,每隔30s,向namesrv上報自己的資訊,處理無用的topic

上面是關於broker啟動的原始碼分析, 後面有新的理解會在此文章的基礎上進行修改。