RocketMQ 原始碼分析 BrokerStartUp(七)
阿新 • • 發佈:2018-12-30
前言
broker是訊息接收處理,訊息落地的核心模組。這個模組用於接收producer傳送的訊息以及consumer消費的訊息。 本章從broker的啟動類入手分析一下broker啟動的過程。
分析broker是為了能夠搞明白master和slave之間的關係,以及訊息落地的具體過程。
createBrokerController
broker啟動會獲取外部傳入的初始化引數, 也會從配置檔案中載入相關的配置引數 ,將引數儲存在對應的Configuration類當中。
public static BrokerController createBrokerController (String[] args) {
//設定rocketMQ版本資訊
System.setProperty(RemotingCommand.REMOTING_VERSION_KEY, Integer.toString(MQVersion.CURRENT_VERSION));
//驗遠端通訊的傳送快取和接收快取是否為空, 如果為空則設定預設值大小為131072
if (null == System.getProperty(NettySystemConfig.COM_ROCKETMQ_REMOTING_SOCKET_SNDBUF_SIZE)) {
NettySystemConfig. socketSndbufSize = 131072;
}
if (null == System.getProperty(NettySystemConfig.COM_ROCKETMQ_REMOTING_SOCKET_RCVBUF_SIZE)) {
NettySystemConfig.socketRcvbufSize = 131072;
}
try {
//PackageConflictDetect.detectFastjson();
Options options = ServerUtil.buildCommandlineOptions (new Options());
commandLine = ServerUtil.parseCmdLine("mqbroker", args, buildCommandlineOptions(options),
new PosixParser());
if (null == commandLine) {
System.exit(-1);
}
final BrokerConfig brokerConfig = new BrokerConfig();
final NettyServerConfig nettyServerConfig = new NettyServerConfig();
final NettyClientConfig nettyClientConfig = new NettyClientConfig();
nettyClientConfig.setUseTLS(Boolean.parseBoolean(System.getProperty(TLS_ENABLE,
String.valueOf(TlsSystemConfig.tlsMode == TlsMode.ENFORCING))));
nettyServerConfig.setListenPort(10911);
final MessageStoreConfig messageStoreConfig = new MessageStoreConfig();
if (BrokerRole.SLAVE == messageStoreConfig.getBrokerRole()) {
int ratio = messageStoreConfig.getAccessMessageInMemoryMaxRatio() - 10;
messageStoreConfig.setAccessMessageInMemoryMaxRatio(ratio);
}
/**
* 如果啟動命令列引數包含 -c 引數,會讀取配置到Propertis中, 讓後通過MixAll.properties2Object(),
* 將讀取的配置檔案資訊存入brokerConfig, nettyServerConfig, nettyClientConfig, messageStoreConfig對應的實體類中
*/
if (commandLine.hasOption('c')) {
String file = commandLine.getOptionValue('c');
if (file != null) {
configFile = file;
InputStream in = new BufferedInputStream(new FileInputStream(file));
properties = new Properties();
properties.load(in);
properties2SystemEnv(properties);
MixAll.properties2Object(properties, brokerConfig);
MixAll.properties2Object(properties, nettyServerConfig);
MixAll.properties2Object(properties, nettyClientConfig);
MixAll.properties2Object(properties, messageStoreConfig);
BrokerPathConfigHelper.setBrokerConfigPath(file);
in.close();
}
}
MixAll.properties2Object(ServerUtil.commandLine2Properties(commandLine), brokerConfig);
if (null == brokerConfig.getRocketmqHome()) {
System.out.printf("Please set the %s variable in your environment to match the location of the RocketMQ installation", MixAll.ROCKETMQ_HOME_ENV);
System.exit(-2);
}
//從brokerConfig獲取namesrvAddr資訊,並將地址資訊轉為SocketAddress物件
String namesrvAddr = brokerConfig.getNamesrvAddr();
if (null != namesrvAddr) {
try {
String[] addrArray = namesrvAddr.split(";");
for (String addr : addrArray) {
RemotingUtil.string2SocketAddress(addr);
}
} catch (Exception e) {
System.out.printf(
"The Name Server Address[%s] illegal, please set it as follows, \"127.0.0.1:9876;192.168.0.1:9876\"%n",
namesrvAddr);
System.exit(-3);
}
}
//設定當前broker的角色(master,slave), 如果是同步/非同步MASTER資訊,brokerId=0 ;如果是SLAVE資訊,brokerId > 0 ; 如果brokerId < 0 , 會丟擲異常
switch (messageStoreConfig.getBrokerRole()) {
case ASYNC_MASTER:
case SYNC_MASTER:
brokerConfig.setBrokerId(MixAll.MASTER_ID);
break;
case SLAVE:
if (brokerConfig.getBrokerId() <= 0) {
System.out.printf("Slave's brokerId must be > 0");
System.exit(-3);
}
break;
default:
break;
}
messageStoreConfig.setHaListenPort(nettyServerConfig.getListenPort() + 1);
LoggerContext lc = (LoggerContext) LoggerFactory.getILoggerFactory();
JoranConfigurator configurator = new JoranConfigurator();
configurator.setContext(lc);
lc.reset();
configurator.doConfigure(brokerConfig.getRocketmqHome() + "/conf/logback_broker.xml");
if (commandLine.hasOption('p')) { //列印配置資訊
InternalLogger console = InternalLoggerFactory.getLogger(LoggerName.BROKER_CONSOLE_NAME);
MixAll.printObjectProperties(console, brokerConfig);
MixAll.printObjectProperties(console, nettyServerConfig);
MixAll.printObjectProperties(console, nettyClientConfig);
MixAll.printObjectProperties(console, messageStoreConfig);
System.exit(0);
} else if (commandLine.hasOption('m')) {
InternalLogger console = InternalLoggerFactory.getLogger(LoggerName.BROKER_CONSOLE_NAME);
MixAll.printObjectProperties(console, brokerConfig, true);
MixAll.printObjectProperties(console, nettyServerConfig, true);
MixAll.printObjectProperties(console, nettyClientConfig, true);
MixAll.printObjectProperties(console, messageStoreConfig, true);
System.exit(0);
}
//將log的資訊配置到brokerConfig,nettyServerConfig,nettyClientConfig,messageStoreConfig物件中
log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
MixAll.printObjectProperties(log, brokerConfig);
MixAll.printObjectProperties(log, nettyServerConfig);
MixAll.printObjectProperties(log, nettyClientConfig);
MixAll.printObjectProperties(log, messageStoreConfig);
final BrokerController controller = new BrokerController(
brokerConfig,
nettyServerConfig,
nettyClientConfig,
messageStoreConfig);
// remember all configs to prevent discard
controller.getConfiguration().registerConfig(properties);
boolean initResult = controller.initialize();
if (!initResult) {
controller.shutdown();
System.exit(-3);
}
//通過Runtime.getRuntime().addShutdownHook()設定,在jvm關閉之前需要處理的一些事情,系統會處理記憶體清理、物件銷燬等一系列操作
Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
private volatile boolean hasShutdown = false;
private AtomicInteger shutdownTimes = new AtomicInteger(0);
@Override
public void run() {
synchronized (this) {
log.info("Shutdown hook was invoked, {}", this.shutdownTimes.incrementAndGet());
if (!this.hasShutdown) {
this.hasShutdown = true;
long beginTime = System.currentTimeMillis();
controller.shutdown();
long consumingTimeTotal = System.currentTimeMillis() - beginTime;
log.info("Shutdown hook over, consuming total time(ms): {}", consumingTimeTotal);
}
}
}
}, "ShutdownHook"));
return controller;
} catch (Throwable e) {
e.printStackTrace();
System.exit(-1);
}
return null;
}
當配置資訊設定完畢後, broker會將這些資訊傳入brokerController控制器當中,這個控制器會初始化載入很多的管理器:
- rocketmq配置檔案資訊可以參考如下連結地址
- rocketMQ配置檔案資訊
- topicManager : 用於管理broker中儲存的所有topic的配置
- consumerOffsetManager: 管理Consumer的消費進度
- subscriptionGroupManager: 用來管理訂閱組,包括訂閱許可權等。
當管理器全部載入完畢後,控制器將開始進入下一步初始化,初始化多個執行緒池,包括sendMessageExecutor、pullMessageExecutor、adminBrokerExecutor、clientManagerExecutor, 分別用於傳送訊息執行器、拉取訊息執行器、broker管理器執行器、客戶端管理執行器, 這些執行器回放如執行緒池中處理, 來做併發執行。
public boolean initialize() throws CloneNotSupportedException {
//用於管理broker中儲存的所有topic的配置
boolean result = this.topicConfigManager.load();
//管理Consumer的消費進度
result = result && this.consumerOffsetManager.load();
//用來管理訂閱組,包括訂閱許可權等
result = result && this.subscriptionGroupManager.load();
// 用於消費者過濾配置
result = result && this.consumerFilterManager.load();
if (result) {//載入topicConfigManager、consumerOffsetManager、subscriptionGroupManager、consumerFilterManager ,將載入結果成功與否儲存在result中
try {
//用於broker層的訊息落地儲存
this.messageStore =
new DefaultMessageStore(this.messageStoreConfig, this.brokerStatsManager, this.messageArrivingListener,
this.brokerConfig);
this.brokerStats = new BrokerStats((DefaultMessageStore) this.messageStore);
//load plugin
MessageStorePluginContext context = new MessageStorePluginContext(messageStoreConfig, brokerStatsManager, messageArrivingListener, brokerConfig);
this.messageStore = MessageStoreFactory.build(context, this.messageStore);
this.messageStore.getDispatcherList().addFirst(new CommitLogDispatcherCalcBitMap(this.brokerConfig, this.consumerFilterManager));
} catch (IOException e) {
result = false;
log.error("Failed to initialize", e);
}
}
result = result && this.messageStore.load();
if (result) {
this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.clientHousekeepingService);
NettyServerConfig fastConfig = (NettyServerConfig) this.nettyServerConfig.clone();
fastConfig.setListenPort(nettyServerConfig.getListenPort() - 2);
this.fastRemotingServer = new NettyRemotingServer(fastConfig, this.clientHousekeepingService);
//傳送訊息的執行緒組
this.sendMessageExecutor = new BrokerFixedThreadPoolExecutor(
this.brokerConfig.getSendMessageThreadPoolNums(),
this.brokerConfig.getSendMessageThreadPoolNums(),
1000 * 60,
TimeUnit.MILLISECONDS,
this.sendThreadPoolQueue,
new ThreadFactoryImpl("SendMessageThread_"));
//設定拉取訊息的執行緒組
this.pullMessageExecutor = new BrokerFixedThreadPoolExecutor(
this.brokerConfig.getPullMessageThreadPoolNums(),
this.brokerConfig.getPullMessageThreadPoolNums(),
1000 * 60,
TimeUnit.MILLISECONDS,
this.pullThreadPoolQueue,
new ThreadFactoryImpl("PullMessageThread_"));
//設定查詢訊息的執行緒組
this.queryMessageExecutor = new BrokerFixedThreadPoolExecutor(
this.brokerConfig.getQueryMessageThreadPoolNums(),
this.brokerConfig.getQueryMessageThreadPoolNums(),
1000 * 60,
TimeUnit.MILLISECONDS,
this.queryThreadPoolQueue,
new ThreadFactoryImpl("QueryMessageThread_"));
//broker管理器執行器
this.adminBrokerExecutor =
Executors.newFixedThreadPool(this.brokerConfig.getAdminBrokerThreadPoolNums(), new ThreadFactoryImpl(
"AdminBrokerThread_"));
//客戶端管理執行器
this.clientManageExecutor = new ThreadPoolExecutor(
this.brokerConfig.getClientManageThreadPoolNums(),
this.brokerConfig.getClientManageThreadPoolNums(),
1000 * 60,
TimeUnit.MILLISECONDS,
this.clientManagerThreadPoolQueue,
new ThreadFactoryImpl("ClientManageThread_"));
//心跳
this.heartbeatExecutor = new BrokerFixedThreadPoolExecutor(
this.brokerConfig.getHeartbeatThreadPoolNums(),
this.brokerConfig.getHeartbeatThreadPoolNums(),
1000 * 60,
TimeUnit.MILLISECONDS,
this.heartbeatThreadPoolQueue,
new ThreadFactoryImpl("HeartbeatThread_", true));
this.endTransactionExecutor = new BrokerFixedThreadPoolExecutor(
this.brokerConfig.getEndTransactionThreadPoolNums(),
this.brokerConfig.getEndTransactionThreadPoolNums(),
1000 * 60,
TimeUnit.MILLISECONDS,
this.endTransactionThreadPoolQueue,
new ThreadFactoryImpl("EndTransactionThread_"));
//消費者管理執行緒組
this.consumerManageExecutor =
Executors.newFixedThreadPool(this.brokerConfig.getConsumerManageThreadPoolNums(), new ThreadFactoryImpl(
"ConsumerManageThread_"));
//把剛剛建立的執行器註冊到 remotingServer, fastRemotingServer物件中
this.registerProcessor();
final long initialDelay = UtilAll.computNextMorningTimeMillis() - System.currentTimeMillis();
final long period = 1000 * 60 * 60 * 24;
//定期查詢如下資訊
//記錄broker的狀態
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
BrokerController.this.getBrokerStats().record();
} catch (Throwable e) {
log.error("schedule record error.", e);
}
}
}, initialDelay, period, TimeUnit.MILLISECONDS);
//消費者當前資訊的offset位置
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void