1. 程式人生 > 其它 >《RocketMQ原始碼系列》nameserver啟動流程

《RocketMQ原始碼系列》nameserver啟動流程

建立nameserver

  可以看到我們啟動 nameserver,就是執行NamesrvStartup 類的main方法。看起來比較簡單,應該就是建立了一個nameserver的控制器然後啟動,這樣 broker 就可以註冊上來了。

  首先,我們就看看 createNamesrvController() 方法,他具體是怎麼建立的。

  我們啟動 nameserver,就是執行 runserver.sh 指令碼,它裡面封裝了一堆 jvm 啟動命令,其實就和我們自己部署 jar 到伺服器上沒什麼兩樣。整段命令簡化出來差不多就是 java -server -Xms4g -Xmx4g -Xmn2g org.apache.rocketmq.namesrv.NamesrvStartup 這麼一個意思。

  一開是就是解析我們的 jvm 命令,然後建立了 namesreConfig、NettyServerConfig 物件,然後就基於這兩個物件建立完NamesrvController 直接返回了。其他的程式碼看起來似乎不重要,就是解析我們的啟動引數、然後賦值到config物件而已。

  現在我們就來看看,上面建立的 namesreConfig、NettyServerConfig 2個物件是幹嘛的。

  可以看到,這兩個類裡面就是一些基本的引數屬性,沒有其他邏輯了,那麼基於 這兩個物件 建立的NamesrvController 應該就是拿到這些配置資訊進行初始化,底層我們猜測應該是基於 nettyserver 監聽 9876 埠,然後 brocker 進行註冊、producer 拉取元資料。

  也就是說到此位置,namesrv 已經建立完成,但此時還沒有辦法對外提供服務,所以我們接下來應該看看 start() 方法了。

啟動netty伺服器

  整個程式碼也比較簡潔,initialize() 完之後,就直接 start() 啟動了。一個好的開源框架、主流程一定是比較清晰的。我之前看 eureka-client 程式碼的時候,那程式碼是真的一言難盡,層次含糊不清、到處硬編碼、邏輯也不嚴謹。

initialize()

  我們看看 NamesrvController 的initialize() 方法,首先是load()方法,應該是在之前建立NamesrvController 的時候給的一些配置,然後直接就創建出 NettyRemotingServer 網路服務元件 扔到執行緒池裡面去了,NettyRemotingServer 在建立的時候,構造方法 new 了一個ServerBootstrap ,他才是netty伺服器的核心,所以可以猜測應該它是 對外提供服務的元件。

  然後後面程式碼就是 心跳機制執行緒了,和我們啟動沒啥關係,現在我們就可以看看 start()幹了些啥了。

start()

  我們剛剛初始化的時候,就建立了NettyRemotingServer ,現在剛好就是啟動這個元件,那麼我們啟動 nameserver 的核心邏輯,必定就是這個無疑了。

    @Override
    public void start() {
        // 又是一個後臺執行的執行緒,主要是netty網路上的配置,先跳過
        this.defaultEventExecutorGroup = new DefaultEventExecutorGroup( nettyServerConfig.getServerWorkerThreads(),
            new ThreadFactory() {
                private AtomicInteger threadIndex = new AtomicInteger(0);
                @Override
                public Thread newThread(Runnable r) {
                    return new Thread(r, "NettyServerCodecThread_" + this.threadIndex.incrementAndGet());
                }
            });
        // 準備環境
        prepareSharableHandlers();

        ServerBootstrap childHandler =
            // 這裡都是netty的一些配置。
            this.serverBootstrap.group(this.eventLoopGroupBoss, this.eventLoopGroupSelector)
                .channel(useEpoll() ? EpollServerSocketChannel.class : NioServerSocketChannel.class)
                .option(ChannelOption.SO_BACKLOG, 1024)
                .option(ChannelOption.SO_REUSEADDR, true)
                .option(ChannelOption.SO_KEEPALIVE, false)
                .childOption(ChannelOption.TCP_NODELAY, true)
                .childOption(ChannelOption.SO_SNDBUF, nettyServerConfig.getServerSocketSndBufSize())
                .childOption(ChannelOption.SO_RCVBUF, nettyServerConfig.getServerSocketRcvBufSize())
                // 是設定了Netty伺服器要監聽的埠號,預設就是9876
                .localAddress(new InetSocketAddress(this.nettyServerConfig.getListenPort()))
                // 這裡是一大堆網路請求處理器。netty伺服器收到一個請求,就會一次使用下面處理器來處理請求
                .childHandler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    public void initChannel(SocketChannel ch) throws Exception {
                        ch.pipeline()
                            .addLast(defaultEventExecutorGroup, HANDSHAKE_HANDLER_NAME, handshakeHandler)
                            .addLast(defaultEventExecutorGroup,
                                encoder,
                                // 這是負責編碼解碼的
                                new NettyDecoder(),
                                // 這是負責連線空閒管理的
                                new IdleStateHandler(0, 0, nettyServerConfig.getServerChannelMaxIdleTimeSeconds()),
                                // 這是負責網路連線管理的
                                connectionManageHandler,
                                // 這是負責關鍵的網路請求處理的
                                serverHandler
                            );
                    }
                });

        if (nettyServerConfig.isServerPooledByteBufAllocatorEnable()) {
            childHandler.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
        }

        try {
            // 這裡就是啟動netty伺服器了,bind方法就是繫結和監聽一個埠號
            ChannelFuture sync = this.serverBootstrap.bind().sync();
            InetSocketAddress addr = (InetSocketAddress) sync.channel().localAddress();
            this.port = addr.getPort();
        } catch (InterruptedException e1) {
            throw new RuntimeException("this.serverBootstrap.bind().sync() InterruptedException", e1);
        }

        if (this.channelEventListener != null) {
            this.nettyEventExecutor.start();
        }

        this.timer.scheduleAtFixedRate(new TimerTask() {
            @Override
            public void run() {
                try {
                    NettyRemotingServer.this.scanResponseTable();
                } catch (Throwable e) {
                    log.error("scanResponseTable exception", e);
                }
            }
        }, 1000 * 3, 1000);
    }

可以看到 nameserver 的啟動並不複雜,畫個圖簡單梳理下。