1. 程式人生 > >rocketmq原始碼解析之NamesrvController建立

rocketmq原始碼解析之NamesrvController建立

說在前面

本次開始進行rocketmq原始碼解析,比較喜歡rocketmq的架構設計,rocketmq內嵌了namesrv註冊中心儲存了元資料,進行負載均衡、容錯的一些處理,4.3以上支援訊息事務,有管理控制檯、命令列工具,底層namesrv與broker、client與server互動netty實現。更多精彩文章請關注“天河聊架構”微信公眾號。

 

原始碼解析

建立NamesrvController,進入這個方法org.apache.rocketmq.namesrv.NamesrvStartup#main,再進入這個方法org.apache.rocketmq.namesrv.NamesrvStartup#main0

public static NamesrvController main0(String[] args) {

        try {
//            原始碼解析之建立namesrv控制器 =》
            NamesrvController controller = createNamesrvController(args);
//            原始碼解析之啟動namesrv控制器 =》
            start(controller);
            String tip = "The Name Server boot success. serializeType=" + RemotingCommand.getSerializeTypeConfigInThisServer();
            log.info(tip);
            System.out.printf("%s%n", tip);
            return controller;
        } catch (Throwable e) {
            e.printStackTrace();
            System.exit(-1);
        }

        return null;
    }

進入這個方法org.apache.rocketmq.namesrv.NamesrvStartup#createNamesrvController

public static NamesrvController createNamesrvController(String[] args) throws IOException, JoranException {
//        從系統檔案中查詢rocketmq版本,預設4.3.0
        System.setProperty(RemotingCommand.REMOTING_VERSION_KEY, Integer.toString(MQVersion.CURRENT_VERSION));
//        PackageConflictDetect.detectFastjson();
//        構建命令列操作的指令 =》
        Options options = ServerUtil.buildCommandlineOptions(new Options());
//        mqnamesrv 啟動namesrv命令 =》
        commandLine = ServerUtil.parseCmdLine("mqnamesrv", args, buildCommandlineOptions(options), new PosixParser());
        if (null == commandLine) {
//            系統非正常退出,流程結束
            System.exit(-1);
            return null;
        }

//        解析配置檔案 =》
        final NamesrvConfig namesrvConfig = new NamesrvConfig();
//        =》
        final NettyServerConfig nettyServerConfig = new NettyServerConfig();
//        設定 namesrv的服務埠
        nettyServerConfig.setListenPort(9876);
//        c 指定啟動的時候載入配置檔案
        if (commandLine.hasOption('c')) {
//            命令列啟動指定配置檔案,前面用c開頭
            String file = commandLine.getOptionValue('c');
            if (file != null) {
                InputStream in = new BufferedInputStream(new FileInputStream(file));
                properties = new Properties();
                properties.load(in);
                MixAll.properties2Object(properties, namesrvConfig);
                MixAll.properties2Object(properties, nettyServerConfig);
//                設定命令列啟動namesrv指定的配置檔案路徑
                namesrvConfig.setConfigStorePath(file);
                System.out.printf("load config properties file OK, %s%n", file);
                in.close();
            }
        }

//        列印namesrv的配置資訊,命令列上面加p
        if (commandLine.hasOption('p')) {
            MixAll.printObjectProperties(null, namesrvConfig);
            MixAll.printObjectProperties(null, nettyServerConfig);
//            正常程式退出
            System.exit(0);
        }

//        把命令列屬性解析成properties
        MixAll.properties2Object(ServerUtil.commandLine2Properties(commandLine), namesrvConfig);
//        解析 ROCKETMQ_HOME 環境變數
        if (null == namesrvConfig.getRocketmqHome()) {
            System.out.printf("Please set the %s variable in your environment to match the location of the RocketMQ installation%n", MixAll.ROCKETMQ_HOME_ENV);
//            系統非正常退出
            System.exit(-2);
        }

        LoggerContext lc = (LoggerContext) LoggerFactory.getILoggerFactory();
        JoranConfigurator configurator = new JoranConfigurator();
        configurator.setContext(lc);
        lc.reset();
//        logback日誌檔案路徑
        configurator.doConfigure(namesrvConfig.getRocketmqHome() + "/conf/logback_namesrv.xml");
        log = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME);
        MixAll.printObjectProperties(log, namesrvConfig);
        MixAll.printObjectProperties(log, nettyServerConfig);
//        建立namesrv控制器 =》
        final NamesrvController controller = new NamesrvController(namesrvConfig, nettyServerConfig);
        // remember all configs to prevent discard 把配置檔案配置值的屬性值註冊到namesrv控制器
        controller.getConfiguration().registerConfig(properties);
        return controller;
    }

進入這個方法org.apache.rocketmq.srvutil.ServerUtil#buildCommandlineOptions 這裡可以看到啟動rocketmq的時候namesrv引數怎麼制定

public static Options buildCommandlineOptions(final Options options) {
        Option opt = new Option("h", "help", false, "Print help");
        opt.setRequired(false);
        options.addOption(opt);
//        這裡可以看到啟動的時候有一個n引數指定namesrv的地址,可以是單機可以是叢集,啟動地址之間用;分開
        opt =
            new Option("n", "namesrvAddr", true,
                "Name server address list, eg: 192.168.0.1:9876;192.168.0.2:9876");
        opt.setRequired(false);
        options.addOption(opt);
        return options;
    }

進入這個方法org.apache.rocketmq.namesrv.NamesrvStartup#buildCommandlineOptions 從這裡看出來啟動rocketmq的時候怎麼指定配置檔案,是否自動建立topic、消費組這些引數都可以在配置檔案中配置

//    指定namesrv啟動的時候載入的配置檔案
    public static Options buildCommandlineOptions(final Options options) {
        Option opt = new Option("c", "configFile", true, "Name server config properties file");
        opt.setRequired(false);
        options.addOption(opt);
//        控制檯輸出配置項
        opt = new Option("p", "printConfigItem", false, "Print all config item");
        opt.setRequired(false);
        options.addOption(opt);
        return options;
    }

從這裡可以看到一些配置的儲存地址

public class NamesrvConfig {
    private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME);
//    解析ROCKETMQ_HOME
    private String rocketmqHome = System.getProperty(MixAll.ROCKETMQ_HOME_PROPERTY, System.getenv(MixAll.ROCKETMQ_HOME_ENV));
//    kvConfig儲存地址
    private String kvConfigPath = System.getProperty("user.home") + File.separator + "namesrv" + File.separator + "kvConfig.json";
//    namesrv儲存地址
    private String configStorePath = System.getProperty("user.home") + File.separator + "namesrv" + File.separator + "namesrv.properties";

nettyserver的一些配置

public class NettyServerConfig implements Cloneable {
//    netty server監聽埠
    private int listenPort = 8888;
//    服務端工作執行緒數
    private int serverWorkerThreads = 8;
//    服務端回撥執行執行緒
    private int serverCallbackExecutorThreads = 0;
//    服務端選擇器執行緒
    private int serverSelectorThreads = 3;
//    oneway方式訊號量的值
    private int serverOnewaySemaphoreValue = 256;
//    服務端非同步訊號量值
    private int serverAsyncSemaphoreValue = 64;
//    服務端channel最大空閒時間
    private int serverChannelMaxIdleTimeSeconds = 120;
//    傳送訊息最大大小從系統屬性中獲取,預設值65535
    private int serverSocketSndBufSize = NettySystemConfig.socketSndbufSize;
//    接受訊息最大大小從系統屬性中獲取,預設值65535
    private int serverSocketRcvBufSize = NettySystemConfig.socketRcvbufSize;
    private boolean serverPooledByteBufAllocatorEnable = true;

nettyserver預設監聽埠9876

nettyServerConfig.setListenPort(9876);

從這裡可以看到是解析啟動的時候指定的配置檔案屬性

//        c 指定啟動的時候載入配置檔案
        if (commandLine.hasOption('c')) {
//            命令列啟動指定配置檔案,前面用c開頭
            String file = commandLine.getOptionValue('c');
            if (file != null) {
                InputStream in = new BufferedInputStream(new FileInputStream(file));
                properties = new Properties();
                properties.load(in);
                MixAll.properties2Object(properties, namesrvConfig);
                MixAll.properties2Object(properties, nettyServerConfig);

配置檔案中可以配置這個類org.apache.rocketmq.common.namesrv.NamesrvConfig namesrv的一些配置

//    解析ROCKETMQ_HOME
    private String rocketmqHome = System.getProperty(MixAll.ROCKETMQ_HOME_PROPERTY, System.getenv(MixAll.ROCKETMQ_HOME_ENV));
//    kvConfig儲存地址
    private String kvConfigPath = System.getProperty("user.home") + File.separator + "namesrv" + File.separator + "kvConfig.json";
//    namesrv儲存地址
    private String configStorePath = System.getProperty("user.home") + File.separator + "namesrv" + File.separator + "namesrv.properties";
    private String productEnvName = "center";
    private boolean clusterTest = false;
    private boolean orderMessageEnable = false;

也可以配置這個類org.apache.rocketmq.remoting.netty.NettyServerConfig nettyserver的一些配置

public class NettyServerConfig implements Cloneable {
//    netty server監聽埠
    private int listenPort = 8888;
//    服務端工作執行緒數
    private int serverWorkerThreads = 8;
//    服務端回撥執行執行緒
    private int serverCallbackExecutorThreads = 0;
//    服務端選擇器執行緒
    private int serverSelectorThreads = 3;
//    oneway方式訊號量的值
    private int serverOnewaySemaphoreValue = 256;
//    服務端非同步訊號量值
    private int serverAsyncSemaphoreValue = 64;
//    服務端channel最大空閒時間
    private int serverChannelMaxIdleTimeSeconds = 120;
//    傳送訊息最大大小從系統屬性中獲取,預設值65535
    private int serverSocketSndBufSize = NettySystemConfig.socketSndbufSize;
//    接受訊息最大大小從系統屬性中獲取,預設值65535
    private int serverSocketRcvBufSize = NettySystemConfig.socketRcvbufSize;
    private boolean serverPooledByteBufAllocatorEnable = true;

namesrv配置的儲存地址就是啟動指定的配置檔案

namesrvConfig.setConfigStorePath(file);

也可以在命令列上指定namesrv屬性配置

MixAll.properties2Object(ServerUtil.commandLine2Properties(commandLine), namesrvConfig);

logback的配置檔案

configurator.doConfigure(namesrvConfig.getRocketmqHome() + "/conf/logback_namesrv.xml");

根據namesrv配置和nettyserver配置建立NamesrvController

final NamesrvController controller = new NamesrvController(namesrvConfig, nettyServerConfig);

進入這個方法org.apache.rocketmq.namesrv.NamesrvController#NamesrvController

public NamesrvController(NamesrvConfig namesrvConfig, NettyServerConfig nettyServerConfig) {
        this.namesrvConfig = namesrvConfig;
        this.nettyServerConfig = nettyServerConfig;
        this.kvConfigManager = new KVConfigManager(this);
        this.routeInfoManager = new RouteInfoManager();
        this.brokerHousekeepingService = new BrokerHousekeepingService(this);
//        =》
        this.configuration = new Configuration(
            log,
            this.namesrvConfig, this.nettyServerConfig
        );
//        指定儲存namesrv配置的配置檔案
        this.configuration.setStorePathFromConfig(this.namesrvConfig, "configStorePath");
    }

進入這裡org.apache.rocketmq.namesrv.routeinfo.RouteInfoManager#RouteInfoManager

public RouteInfoManager() {
//        指定長度,減少擴容提高效能
        this.topicQueueTable = new HashMap<String, List<QueueData>>(1024);//topicQueue快取資訊
        this.brokerAddrTable = new HashMap<String, BrokerData>(128);//broker地址快取
        this.clusterAddrTable = new HashMap<String, Set<String>>(32);//叢集地址快取
        this.brokerLiveTable = new HashMap<String, BrokerLiveInfo>(256);//可用的broker快取
        this.filterServerTable = new HashMap<String, List<String>>(256);//過濾的server地址
    }

設定namrsrv配置儲存地址System.getProperty("user.home") + File.separator + "namesrv" + File.separator + "namesrv.properties"

this.configuration.setStorePathFromConfig(this.namesrvConfig, "configStorePath");
// remember all configs to prevent discard 把配置檔案配置值的屬性值註冊到namesrv控制器
controller.getConfiguration().registerConfig(properties);

進入這個方法org.apache.rocketmq.common.Configuration#registerConfig(java.util.Properties)

public Configuration registerConfig(Object configObject) {
    try {
        readWriteLock.writeLock().lockInterruptibly();
        try {

            Properties registerProps = MixAll.object2Properties(configObject);
            merge(registerProps, this.allConfigs);
            configObjectList.add(configObject);
        } finally {
            readWriteLock.writeLock().unlock();
        }
    } catch (InterruptedException e) {
        log.error("registerConfig lock error");
    }
    return this;
}
/**
 * All properties include configs in object and extend properties.
 * 所有的啟動配置都在這裡
 */
private Properties allConfigs = new Properties();

 

說在最後

本次解析僅代表個人觀點,僅供參考。

 

加入技術微信群

相關推薦

rocketmq原始碼解析NamesrvController建立

說在前面 本次開始進行rocketmq原始碼解析,比較喜歡rocketmq的架構設計,rocketmq內嵌了namesrv註冊中

Android原始碼解析應用程式資源管理器(Asset Manager)的建立過程分析

轉載自:https://blog.csdn.net/luoshengyang/article/details/8791064 我們分析了Android應用程式資源的編譯和打包過程,最終得到的應用程式資源就與應用程式程式碼一起打包在一個APK檔案中。Android應用程式在執行的過程中,是通過一個

swoole原始碼解析swoole_lock的建立過程

swoole增加了鎖的實現,PHP程式碼中可以很方便地建立一個鎖,用來實現資料同步。swoole_lock類支援5種鎖的型別: 檔案鎖 SWOOLE_FILELOCK 讀寫鎖 SWOOLE_RWLOCK 訊號量 SWOOLE_SEM 互斥鎖 SWOOLE_MUTEX

swoole原始碼解析swoole_buffer的建立過程

swoole提供了一個swoole_buffer類(程式碼位於swoole_buffer.c中),讓PHP開發者可以像C一樣直接讀寫記憶體,提升程式的效能,又不用擔心記憶體越界。swoole_buffer會檢測offset,但是swoole_buffer提供的記憶體空間不是共

Spring原始碼解析bean的建立

閱讀須知 Spring原始碼版本:4.3.8 文章中使用/* */註釋的方法會做深入分析 正文 之前我們都是在圍繞 ApplicationContext applicationContext = new ClassPathXmlApplicati

openstack原始碼解析虛機建立

本文講的是openstack原始碼解析 虛擬機器建立流程 版本是icehouse版 首先先看架構圖,請求從nova-api發起,然後到nova-conductor,再到scheduler進行排程,排程選中某臺機器後,通過rpc請求,傳送到某臺機器上執行建立機器方法,期間會訪

Android框架原始碼解析(四)Picasso

這次要分析的原始碼是 Picasso 2.5.2 ,四年前的版本,用eclipse寫的,但不影響這次我們對其原始碼的分析 地址:https://github.com/square/picasso/tree/picasso-parent-2.5.2 Picasso的簡單使用

Android框架原始碼解析(三)ButterKnife

注:所有分析基於butterknife:8.4.0 原始碼目錄:https://github.com/JakeWharton/butterknife 其中最主要的3個模組是: Butterknife註解處理器https://github.com/JakeWharton/

Android框架原始碼解析(二)OKhttp

原始碼在:https://github.com/square/okhttp 包實在是太多了,OKhttp核心在這塊https://github.com/square/okhttp/tree/master/okhttp 直接匯入Android Studio中即可。 基本使用:

Android框架原始碼解析(一)Volley

前幾天面試CVTE,HR面掛了。讓內部一個學長幫我查看了一下面試官評價,發現二面面試官的評價如下: 廣度OK,但缺乏深究能力,深度與實踐不足 原始碼:只能說流程,細節程式碼不清楚,retrofit和volley都是。 感覺自己一方面:自己面試技巧有待提高吧(框

Spring-web原始碼解析Filter-OncePerRequestFilter

轉自:  http://blog.csdn.net/ktlifeng/article/details/50630934 基於4.1.7.RELEASE 我們先看一個filter-mapping的配置 

spring原始碼解析AOP原理

一、準備工作   在這裡我先簡單記錄下如何實現一個aop: AOP:【動態代理】 指在程式執行期間動態的將某段程式碼切入到指定方法指定位置進行執行的程式設計方式; 1、匯入aop模組;Spring AOP:(spring-aspects) 2、定義一個業務邏輯類(

Dubbo原始碼解析服務端接收訊息

準備 dubbo 版本:2.5.4 服務端接收訊息流程 Handler鏈路 DubboProtocol private ExchangeServer createServer(URL url) { url = url.addParameterIfAbsent("c

Dubbo原始碼解析服務釋出與註冊

準備 dubbo版本:2.5.4 Spring自定義擴充套件 dubbo 是基於 spring 配置來實現服務釋出,並基於 spring 的擴充套件機制定義了一套自定義標籤,要實現自定義擴充套件, spring 中提供了 NamespaceHandler 、BeanDefinit

MyBatis原始碼解析日誌記錄

一 .概述 MyBatis沒有提供日誌的實現類,需要接入第三方的日誌元件,但第三方日誌元件都有各自的Log級別,且各不相同,但MyBatis統一提供了trace、debug、warn、error四個級別; 自動掃描日誌實現,並且第三方日誌外掛載入優先順序如下:slf4J → commonsLoging →

MyBatis原始碼解析資料來源(含資料庫連線池簡析)

一.概述: 常見的資料來源元件都實現了javax.sql.DataSource介面; MyBatis不但要能整合第三方的資料來源元件,自身也提供了資料來源的實現; 一般情況下,資料來源的初始化過程引數較多,比較複雜; 二.設計模式: 為什麼要使用工廠模式     資料來

Spring原始碼解析 Spring Security啟動細節和工作模式

分享一下我老師大神的人工智慧教程!零基礎,通俗易懂!http://blog.csdn.net/jiangjunshow 也歡迎大家轉載本篇文章。分享知識,造福人民,實現我們中華民族偉大復興!        

Laravel原始碼解析反射的使用

前言 PHP的反射類與例項化物件作用相反,例項化是呼叫封裝類中的方法、成員,而反射類則是拆封類中的所有方法、成員變數,幷包括私有方法等。就如“解刨”一樣,我們可以呼叫任何關鍵字修飾的方法、成員。當然在正常業務中是建議不使用,比較反射類已經摒棄了封裝的概念。 本章講解反射類的使用及Laravel對反射的使用

hanlp原始碼解析中文分詞演算法詳解

詞圖 詞圖指的是句子中所有詞可能構成的圖。如果一個詞A的下一個詞可能是B的話,那麼A和B之間具有一條路徑E(A,B)。一個詞可能有多個後續,同時也可能有多個前驅,它們構成的圖我稱作詞圖。 需要稀疏2維矩陣模型,以一個詞的起始位置作為行,終止位置作為列,可以得到一個二維矩陣。例如:“他說的確實

MapReduce原始碼解析Mapper

MapReduce原始碼解析之Mapper 北京易觀智庫網路科技有限公司 作者:賀斌 摘要:詳解MapReduce中Map(對映)的實現者Mapper。 導語: 說起MapReduce,只要是大資料領域的小夥伴,相信都不陌生。它作為Hadoop生態系統中的一部分,最早是由G