RocketMQ(一) - NameServer 啟動原始碼分析
阿新 • • 發佈:2022-02-19
RocketMQ(一) - NameServer 啟動原始碼分析
NameServer 的定義以及用處,本篇文章就不做介紹了,此文章主要分析其原始碼。
1. 入口
NamesrvStartup 是的NameServer服務的啟動類。 其入口是 main0( ) 方法。
public static NamesrvController main0(String[] args) { try { // 建立 namesrv 控制器 (nameServer核心類) NamesrvController controller = createNamesrvController(args); // 啟動nameSrv start(controller); String tip = "The Name Server boot success. serializeType=" + RemotingCommand.getSerializeTypeConfigInThisServer(); log.info(tip); System.out.printf("%s%n", tip); return controller; } catch (Throwable e) { e.printStackTrace(); System.exit(-1); } return null; }
上述程式碼非常簡單, 其功能就是:
- 建立 NamesrvController 物件
createNamesrvController(args)
- 啟動 NameServer
start(controller);
下面分別 根據這兩個方法 來做入口分析。
2. NamesrvController
在分析如何建立 NamesrvController 物件之前, 先來介紹下該類
上面是 NamesrvController 類的簡圖, 可以結合此圖來看下面的屬性分析:
屬性:
// 日誌 private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME); // NameSrv的配置資訊類 private final NamesrvConfig namesrvConfig; // NameSrv底層的核心伺服器 - Netty配置類 private final NettyServerConfig nettyServerConfig; // 排程執行緒池, 執行定時任務, 兩件事 : 1.檢查存活的broker狀態 2.列印配置 private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl( "NSScheduledThread")); // 管理 NameServer的 kv配置 private final KVConfigManager kvConfigManager; // 管理 路由資訊的物件 ***重要*** private final RouteInfoManager routeInfoManager; // Netty網路層封裝物件 ***重要*** private RemotingServer remotingServer; // ChannelEventListener介面類 用於監聽channel 狀態, // 當channel狀態發生改變時 例如: close idle ... 等狀態時 ,該service會監聽並處理 private BrokerHousekeepingService brokerHousekeepingService; // 業務執行緒池,netty執行緒主要任務 是 解析報文, 將報文解析成 RemotingCommand物件, 然後就將該物件交給業務執行緒池繼續處理 private ExecutorService remotingExecutor; // 總配置 (其中包含 NameSrv配置 和 NettyServer配置) private Configuration configuration; private FileWatchService fileWatchService;
構造方法:
// 引數1: namesrvConfig // 引數2: 網路層配置 nettyServerConfig public NamesrvController(NamesrvConfig namesrvConfig, NettyServerConfig nettyServerConfig) { this.namesrvConfig = namesrvConfig; this.nettyServerConfig = nettyServerConfig; this.kvConfigManager = new KVConfigManager(this); this.routeInfoManager = new RouteInfoManager(); this.brokerHousekeepingService = new BrokerHousekeepingService(this); this.configuration = new Configuration( log, this.namesrvConfig, this.nettyServerConfig ); this.configuration.setStorePathFromConfig(this.namesrvConfig, "configStorePath"); }
上述程式碼不用多說, 就是簡單的 建立物件賦值操作。 由於本篇重點分析的是 NameServer的啟動流程,因此我們重點關注 nameSrvConfig , 至於其他的物件會在之後的文章中介紹。
3.NamesrvConfig
public class NamesrvConfig {
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME);
// 從環境變數中 獲取 ROCKETMQ_HOME 值
private String rocketmqHome = System.getProperty(MixAll.ROCKETMQ_HOME_PROPERTY, System.getenv(MixAll.ROCKETMQ_HOME_ENV));
// kvConfig.json 路徑 kv的儲存檔案路徑
private String kvConfigPath = System.getProperty("user.home") + File.separator + "namesrv" + File.separator + "kvConfig.json";
// namesrv.properties 配置檔案路徑
private String configStorePath = System.getProperty("user.home") + File.separator + "namesrv" + File.separator + "namesrv.properties";
private String productEnvName = "center";
private boolean clusterTest = false;
// 順序訊息功能 是否開啟 預設關閉
private boolean orderMessageEnable = false;
// .... 省略 get/set 方法
}
實際上該類也沒什麼好說的, 核心就是儲存一些 NameServer的配置檔案路徑 和一些關鍵值。
4. 建立控制器
介紹完了 NameSrvController ,重回到入口的 createNamesrvController(args)
,看看是如何 建立 NameSrvController 物件的。
/**
* 建立 namesrv 控制器
* @param args 啟動引數
*/
public static NamesrvController createNamesrvController(String[] args) throws IOException, JoranException {
System.setProperty(RemotingCommand.REMOTING_VERSION_KEY, Integer.toString(MQVersion.CURRENT_VERSION));
Options options = ServerUtil.buildCommandlineOptions(new Options());
// 啟動時的引數資訊, 由commandLine 管理
commandLine = ServerUtil.parseCmdLine("mqnamesrv", args, buildCommandlineOptions(options), new PosixParser());
if (null == commandLine) {
System.exit(-1);
return null;
}
// nameSrvConfig nameSrv配置
final NamesrvConfig namesrvConfig = new NamesrvConfig();
// netty 伺服器配置
final NettyServerConfig nettyServerConfig = new NettyServerConfig();
// namesrv 伺服器 監聽埠 修改為 9876
nettyServerConfig.setListenPort(9876);
if (commandLine.hasOption('c')) {
// 讀取 -c 選項值
String file = commandLine.getOptionValue('c');
if (file != null) {
// 讀取 config 檔案資料 到 properties 內
InputStream in = new BufferedInputStream(new FileInputStream(file));
properties = new Properties();
properties.load(in);
// 如果 config 配置檔案 內的配置 涉及到 namesrvConfig 或者 nettyServerConfig 欄位進行復寫
MixAll.properties2Object(properties, namesrvConfig);
MixAll.properties2Object(properties, nettyServerConfig);
// 將讀取的 配置檔案 路徑 儲存到 欄位
namesrvConfig.setConfigStorePath(file);
System.out.printf("load config properties file OK, %s%n", file);
in.close();
}
}
if (commandLine.hasOption('p')) {
InternalLogger console = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_CONSOLE_NAME);
MixAll.printObjectProperties(console, namesrvConfig);
MixAll.printObjectProperties(console, nettyServerConfig);
System.exit(0);
}
// 將啟動時 命令列 設定的 kv 複寫到 namesrvConfig 內
MixAll.properties2Object(ServerUtil.commandLine2Properties(commandLine), namesrvConfig);
if (null == namesrvConfig.getRocketmqHome()) {
System.out.printf("Please set the %s variable in your environment to match the location of the RocketMQ installation%n", MixAll.ROCKETMQ_HOME_ENV);
System.exit(-2);
}
// 建立日誌物件。
LoggerContext lc = (LoggerContext) LoggerFactory.getILoggerFactory();
JoranConfigurator configurator = new JoranConfigurator();
configurator.setContext(lc);
lc.reset();
configurator.doConfigure(namesrvConfig.getRocketmqHome() + "/conf/logback_namesrv.xml");
log = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME);
MixAll.printObjectProperties(log, namesrvConfig);
MixAll.printObjectProperties(log, nettyServerConfig);
// 建立 控制器
// 引數1: namesrvConfig
// 引數2: 網路層配置 nettyServerConfig
final NamesrvController controller = new NamesrvController(namesrvConfig, nettyServerConfig);
// remember all configs to prevent discard
controller.getConfiguration().registerConfig(properties);
return controller;
}
上述程式碼雖然很長, 實際上就是 讀取系統環境變數 和 啟動引數 來設定 NameSrvConfig 和 NettyServeConfig 配置類, 並通過這倆類構造出 NamesrvController物件。
5. 啟動控制器
下面再來看下 是如何啟動 NamesrvController 的。
public static NamesrvController start(final NamesrvController controller) throws Exception {
if (null == controller) {
throw new IllegalArgumentException("NamesrvController is null");
}
// 初始化方法...
boolean initResult = controller.initialize();
if (!initResult) {
controller.shutdown();
System.exit(-3);
}
// 註冊 JVM級別的 Hook, 平滑關機的邏輯。 當JVM被關閉時,主動呼叫 controller.shutdown()方法平滑關機
Runtime.getRuntime().addShutdownHook(new ShutdownHookThread(log, new Callable<Void>() {
@Override
public Void call() throws Exception {
controller.shutdown();
return null;
}
}));
// 伺服器網路層 啟動
controller.start();
return controller;
}
上述程式碼主要 做了 三件事:
- 初始化
- 註冊 JVM Hook 回撥平滑關機。
- 啟動
重點需要關注 第 1點 初始化 , 而第3點的邏輯實際上就是 NettyServer的啟動入口了。
6. 初始化控制器
/**
* 初始化 NameSrv 控制器
* @return
*/
public boolean initialize() {
// 載入本地 kv 配置
this.kvConfigManager.load();
// 建立 網路伺服器 物件
this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.brokerHousekeepingService);
// 建立業務執行緒池 預設執行緒數 8
this.remotingExecutor =
Executors.newFixedThreadPool(nettyServerConfig.getServerWorkerThreads(), new ThreadFactoryImpl("RemotingExecutorThread_"));
// 註冊協議處理器 (預設協議處理器)
this.registerProcessor();
// 定時任務1: 每10s中檢查 broker 存活狀態, 將idle狀態的broker移除
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
NamesrvController.this.routeInfoManager.scanNotActiveBroker();
}
}, 5, 10, TimeUnit.SECONDS);
// 定時任務2: 每10min中, 列印一遍 kv配置
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
NamesrvController.this.kvConfigManager.printAllPeriodically();
}
}, 1, 10, TimeUnit.MINUTES);
// ... 省略 SSL證書的操作 (不在重點關注範圍內)
return true;
}
上述程式碼主要做了如下幾件事:
- 載入NameServer配置資訊到 KV配置管理中心 中
- 建立NettyServer 物件
- 建立 Netty的 業務執行緒池 remotingExecutor
- 將 預設協議處理器 DefaultRequestProcessor 註冊到NettyServer物件中
- 分別開啟 兩個 定時任務:
- 檢查 broker 存活狀態
- 列印 kv配置資訊