rocketmq之原始碼分析broker入口BrokerController初始化過程(十六)
阿新 • • 發佈:2019-06-07
接著上一章的BrokerController的基礎功能講,本章主要介紹的是BrokerController的初始化操作,在初始化的過程中做了哪些事情,使用了哪些技術設計完成了對broker的初始化操作。一切分析都是基於原始碼來講,同時對原始碼進行了註解支撐。
一,現在對建構函式的初始化
public BrokerController( final BrokerConfig brokerConfig, final NettyServerConfig nettyServerConfig, final NettyClientConfig nettyClientConfig, final MessageStoreConfig messageStoreConfig ) { this.brokerConfig = brokerConfig; this.nettyServerConfig = nettyServerConfig; this.nettyClientConfig = nettyClientConfig; this.messageStoreConfig = messageStoreConfig; //消費端的offset管理 this.consumerOffsetManager = new ConsumerOffsetManager(this); //topic的管理 this.topicConfigManager = new TopicConfigManager(this); //pull訊息的處理 this.pullMessageProcessor = new PullMessageProcessor(this); //pull訊息的服務 this.pullRequestHoldService = new PullRequestHoldService(this); //訊息到達後執行的監聽處理 this.messageArrivingListener = new NotifyMessageArrivingListener(this.pullRequestHoldService); //消費訊息的id監聽 this.consumerIdsChangeListener = new DefaultConsumerIdsChangeListener(this); //消費者管理,基於特定的消費id this.consumerManager = new ConsumerManager(this.consumerIdsChangeListener); this.consumerFilterManager = new ConsumerFilterManager(this); //傳送方管理 this.producerManager = new ProducerManager(); //監聽客戶端的網路 this.clientHousekeepingService = new ClientHousekeepingService(this); //broker作為客戶端進行心跳鑑定對訪問者的操作 this.broker2Client = new Broker2Client(this); //訂閱訊息的管理 this.subscriptionGroupManager = new SubscriptionGroupManager(this); //broker的介面服務管理,主要和namesrv互動 this.brokerOuterAPI = new BrokerOuterAPI(nettyClientConfig); //過濾管理 this.filterServerManager = new FilterServerManager(this); //主從同步管理,主要對slave有效,同步元資料 this.slaveSynchronize = new SlaveSynchronize(this); //內部操作佇列 this.sendThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getSendThreadPoolQueueCapacity()); this.pullThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getPullThreadPoolQueueCapacity()); this.queryThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getQueryThreadPoolQueueCapacity()); this.clientManagerThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getClientManagerThreadPoolQueueCapacity()); this.consumerManagerThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getConsumerManagerThreadPoolQueueCapacity()); this.heartbeatThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getHeartbeatThreadPoolQueueCapacity()); this.endTransactionThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getEndTransactionPoolQueueCapacity()); //當前broker的狀態管理 this.brokerStatsManager = new BrokerStatsManager(this.brokerConfig.getBrokerClusterName()); this.setStoreHost(new InetSocketAddress(this.getBrokerConfig().getBrokerIP1(), this.getNettyServerConfig().getListenPort())); //快速失敗策略 this.brokerFastFailure = new BrokerFastFailure(this); this.configuration = new Configuration( log, BrokerPathConfigHelper.getBrokerConfigPath(), this.brokerConfig, this.nettyServerConfig, this.nettyClientConfig, this.messageStoreConfig ); }
整體過程是:基於傳入的四個配置檔案,賦值給內部配置物件,然後初始化內部關聯的物件,主要是consumer,producer等的初始化操作。
中間說明一個事情,關於Config的關係管理,核心的Config都有共性的載入,編碼,解析,儲存到指定檔案的操作,內部採用的了共享的超級父類設計,Config的類關係如下圖:
二,對初始化方法的呼叫
public boolean initialize() throws CloneNotSupportedException { //載入topic的配置資訊,從歷史的配置檔案儲存中 boolean result = this.topicConfigManager.load(); //載入consumer的偏移量 result = result && this.consumerOffsetManager.load(); //載入訂閱的分組 result = result && this.subscriptionGroupManager.load(); //載入consumer的過濾器 result = result && this.consumerFilterManager.load(); if (result) { try { //核心檔案儲存實現類 this.messageStore = new DefaultMessageStore(this.messageStoreConfig, this.brokerStatsManager, this.messageArrivingListener, this.brokerConfig); if (messageStoreConfig.isEnableDLegerCommitLog()) { DLedgerRoleChangeHandler roleChangeHandler = new DLedgerRoleChangeHandler(this, (DefaultMessageStore) messageStore); ((DLedgerCommitLog)((DefaultMessageStore) messageStore).getCommitLog()).getdLedgerServer().getdLedgerLeaderElector().addRoleChangeHandler(roleChangeHandler); } //borker的狀態監控 this.brokerStats = new BrokerStats((DefaultMessageStore) this.messageStore); //load plugin MessageStorePluginContext context = new MessageStorePluginContext(messageStoreConfig, brokerStatsManager, messageArrivingListener, brokerConfig); this.messageStore = MessageStoreFactory.build(context, this.messageStore); this.messageStore.getDispatcherList().addFirst(new CommitLogDispatcherCalcBitMap(this.brokerConfig, this.consumerFilterManager)); } catch (IOException e) { result = false; log.error("Failed to initialize", e); } } //載入歷史資料 result = result && this.messageStore.load(); if (result) { //broker的服務端實現核心,網路通訊的netty實現 this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.clientHousekeepingService); NettyServerConfig fastConfig = (NettyServerConfig) this.nettyServerConfig.clone(); fastConfig.setListenPort(nettyServerConfig.getListenPort() - 2); //快速服務端實現,後期再詳細的分析 this.fastRemotingServer = new NettyRemotingServer(fastConfig, this.clientHousekeepingService); //傳送訊息的執行緒池配置 this.sendMessageExecutor = new BrokerFixedThreadPoolExecutor( this.brokerConfig.getSendMessageThreadPoolNums(), this.brokerConfig.getSendMessageThreadPoolNums(), 1000 * 60, TimeUnit.MILLISECONDS, this.sendThreadPoolQueue, new ThreadFactoryImpl("SendMessageThread_")); //拉取訊息的執行緒池配置 this.pullMessageExecutor = new BrokerFixedThreadPoolExecutor( this.brokerConfig.getPullMessageThreadPoolNums(), this.brokerConfig.getPullMessageThreadPoolNums(), 1000 * 60, TimeUnit.MILLISECONDS, this.pullThreadPoolQueue, new ThreadFactoryImpl("PullMessageThread_")); //查詢訊息的執行緒池配置 this.queryMessageExecutor = new BrokerFixedThreadPoolExecutor( this.brokerConfig.getQueryMessageThreadPoolNums(), this.brokerConfig.getQueryMessageThreadPoolNums(), 1000 * 60, TimeUnit.MILLISECONDS, this.queryThreadPoolQueue, new ThreadFactoryImpl("QueryMessageThread_")); //admin的broker的執行緒池 this.adminBrokerExecutor = Executors.newFixedThreadPool(this.brokerConfig.getAdminBrokerThreadPoolNums(), new ThreadFactoryImpl( "AdminBrokerThread_")); //客戶端管理的執行緒池 this.clientManageExecutor = new ThreadPoolExecutor( this.brokerConfig.getClientManageThreadPoolNums(), this.brokerConfig.getClientManageThreadPoolNums(), 1000 * 60, TimeUnit.MILLISECONDS, this.clientManagerThreadPoolQueue, new ThreadFactoryImpl("ClientManageThread_")); //broker的心跳檢車執行緒池 this.heartbeatExecutor = new BrokerFixedThreadPoolExecutor( this.brokerConfig.getHeartbeatThreadPoolNums(), this.brokerConfig.getHeartbeatThreadPoolNums(), 1000 * 60, TimeUnit.MILLISECONDS, this.heartbeatThreadPoolQueue, new ThreadFactoryImpl("HeartbeatThread_", true)); //執行事務提交或回滾的執行緒池 this.endTransactionExecutor = new BrokerFixedThreadPoolExecutor( this.brokerConfig.getEndTransactionThreadPoolNums(), this.brokerConfig.getEndTransactionThreadPoolNums(), 1000 * 60, TimeUnit.MILLISECONDS, this.endTransactionThreadPoolQueue, new ThreadFactoryImpl("EndTransactionThread_")); //consumer的管理執行緒池 this.consumerManageExecutor = Executors.newFixedThreadPool(this.brokerConfig.getConsumerManageThreadPoolNums(), new ThreadFactoryImpl( "ConsumerManageThread_")); //註冊時間處理機制及對應的實現類 this.registerProcessor(); final long initialDelay = UtilAll.computNextMorningTimeMillis() - System.currentTimeMillis(); final long period = 1000 * 60 * 60 * 24; //定時記錄broker的狀態資訊 this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { try { BrokerController.this.getBrokerStats().record(); } catch (Throwable e) { log.error("schedule record error.", e); } } }, initialDelay, period, TimeUnit.MILLISECONDS); //定時持久化consumer的偏移量內容 this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { try { BrokerController.this.consumerOffsetManager.persist(); } catch (Throwable e) { log.error("schedule persist consumerOffset error.", e); } } }, 1000 * 10, this.brokerConfig.getFlushConsumerOffsetInterval(), TimeUnit.MILLISECONDS); //定時持久化consumer的過濾器內容 this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { try { BrokerController.this.consumerFilterManager.persist(); } catch (Throwable e) { log.error("schedule persist consumer filter error.", e); } } }, 1000 * 10, 1000 * 10, TimeUnit.MILLISECONDS); //定時執行broker的保護驗證機制 this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { try { BrokerController.this.protectBroker(); } catch (Throwable e) { log.error("protectBroker error.", e); } } }, 3, 3, TimeUnit.MINUTES); //定時列印各個配置的內容量 this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { try { BrokerController.this.printWaterMark(); } catch (Throwable e) { log.error("printWaterMark error.", e); } } }, 10, 1, TimeUnit.SECONDS); //定時列印日誌內容的大小 this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { try { log.info("dispatch behind commit log {} bytes", BrokerController.this.getMessageStore().dispatchBehindBytes()); } catch (Throwable e) { log.error("schedule dispatchBehindBytes error.", e); } } }, 1000 * 10, 1000 * 60, TimeUnit.MILLISECONDS); //namesrv的配置 if (this.brokerConfig.getNamesrvAddr() != null) { //更新namesrv的配置 this.brokerOuterAPI.updateNameServerAddressList(this.brokerConfig.getNamesrvAddr()); log.info("Set user specified name server address: {}", this.brokerConfig.getNamesrvAddr()); } else if (this.brokerConfig.isFetchNamesrvAddrByAddressServer()) { this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { try { BrokerController.this.brokerOuterAPI.fetchNameServerAddr(); } catch (Throwable e) { log.error("ScheduledTask fetchNameServerAddr exception", e); } } }, 1000 * 10, 1000 * 60 * 2, TimeUnit.MILLISECONDS); } if (!messageStoreConfig.isEnableDLegerCommitLog()) { if (BrokerRole.SLAVE == this.messageStoreConfig.getBrokerRole()) { if (this.messageStoreConfig.getHaMasterAddress() != null && this.messageStoreConfig.getHaMasterAddress().length() >= 6) { this.messageStore.updateHaMasterAddress(this.messageStoreConfig.getHaMasterAddress()); this.updateMasterHAServerAddrPeriodically = false; } else { this.updateMasterHAServerAddrPeriodically = true; } } else { this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { try { BrokerController.this.printMasterAndSlaveDiff(); } catch (Throwable e) { log.error("schedule printMasterAndSlaveDiff error.", e); } } }, 1000 * 10, 1000 * 60, TimeUnit.MILLISECONDS); } } if (TlsSystemConfig.tlsMode != TlsMode.DISABLED) { // Register a listener to reload SslContext try { fileWatchService = new FileWatchService( new String[] { TlsSystemConfig.tlsServerCertPath, TlsSystemConfig.tlsServerKeyPath, TlsSystemConfig.tlsServerTrustCertPath }, new FileWatchService.Listener() { boolean certChanged, keyChanged = false; @Override public void onChanged(String path) { if (path.equals(TlsSystemConfig.tlsServerTrustCertPath)) { log.info("The trust certificate changed, reload the ssl context"); reloadServerSslContext(); } if (path.equals(TlsSystemConfig.tlsServerCertPath)) { certChanged = true; } if (path.equals(TlsSystemConfig.tlsServerKeyPath)) { keyChanged = true; } if (certChanged && keyChanged) { log.info("The certificate and private key changed, reload the ssl context"); certChanged = keyChanged = false; reloadServerSslContext(); } } private void reloadServerSslContext() { ((NettyRemotingServer) remotingServer).loadSslContext(); ((NettyRemotingServer) fastRemotingServer).loadSslContext(); } }); } catch (Exception e) { log.warn("FileWatchService created error, can't load the certificate dynamically"); } } //初始化事物管理機制 initialTransaction(); //初始化命令列管理執行,執行操作的管控 initialAcl(); //初始化rcp的hook機制 initialRpcHooks(); } return result; }
整體操作流程步驟是:
1,載入歷史的內容,歷史內容都是儲存到本地的檔案中,主要是做訊息接受,分發,過濾的管理內容載入
2,載入訊息儲存內容,核心重要原生的訊息讀取
3,構造netty的服務,當前作為broker的服務端
4,各種執行訊息操作的執行緒池配置,內部是基於各個功能做執行緒池隔離
5,註冊事件處理機制,特別說,這裡註冊時處理拉取訊息以外的事件處理
6,執行各種任務排程,主要是報告當前服務情況,儲存基於訊息的管理內容,和namesrv的資料互動
7,基於高可用的master,salve的配置及輸出
8,執行事物,