1. 程式人生 > >RocketMQ 原始碼分析 NettyRemotingServer(六)

RocketMQ 原始碼分析 NettyRemotingServer(六)

前言

已經寫了幾篇關於RocketMQ原始碼的分析,可能對其邏輯處理會多一點認識,但還沒深入到RocketMQ精髓中。比如MQ中的通訊是如何實現的、如何實現高效能,高可用、最終一致性、MQ 訊息儲存。這些才是我們閱讀原始碼的一個目標。所以這篇通過分析原始碼的來了解一下RocketMQ通訊機制。

本文很大部分摘錄了 匠心獨運的部落格

NettyRemotingServer

RocketMQ中RPC通訊的通過1+N+M1+M2的Reactor多執行緒實現。

RocketMQ的RPC通訊採用Netty元件作為底層通訊庫,同樣也遵循了Reactor多執行緒模型,同時又在這之上做了一些擴充套件和優化。下面先給出一張RocketMQ的RPC通訊層的Netty多執行緒模型框架圖,讓大家對RocketMQ的RPC通訊中的多執行緒分離設計有一個大致的瞭解。

在這裡插入圖片描述
從上面的框圖中可以大致瞭解RocketMQ中NettyRemotingServer的Reactor 多執行緒模型。一個 Reactor 主執行緒(eventLoopGroupBoss,即為上面的1)負責監聽 TCP網路連線請求,建立好連線後丟給Reactor 執行緒池(eventLoopGroupSelector,即為上面的“N”,原始碼中預設設定為3),它負責將建立好連線的socket 註冊到 selector上去(RocketMQ的原始碼中會自動根據OS的型別選擇NIO和Epoll,也可以通過引數配置),然後監聽真正的網路資料。拿到網路資料後,再丟給Worker執行緒池(defaultEventExecutorGroup,即為上面的“M1”,原始碼中預設設定為8)

點開NettyRemotingServer可以看到eventLoopGroupBoss、eventLoopGroupSelector的初始過程,擷取部分NettyRemotingServer建構函式:

public NettyRemotingServer(final NettyServerConfig nettyServerConfig,
    final ChannelEventListener channelEventListener) 
    
    //省略部分程式碼
    
    // eventLoopGroupBoss負責監聽 TCP網路連線請求
    this.
eventLoopGroupBoss = new NioEventLoopGroup(1, new ThreadFactory() { private AtomicInteger threadIndex = new AtomicInteger(0); @Override public Thread newThread(Runnable r) { return new Thread(r, String.format("NettyBoss_%d", this.threadIndex.incrementAndGet())); } }); //這個是在eventLoopGroupBoss在接受到連線的時候,它負責將建立好連線的socket註冊到selector上去 if (useEpoll()) { this.eventLoopGroupSelector = new EpollEventLoopGroup(nettyServerConfig.getServerSelectorThreads(), new ThreadFactory() { private AtomicInteger threadIndex = new AtomicInteger(0); private int threadTotal = nettyServerConfig.getServerSelectorThreads(); @Override public Thread newThread(Runnable r) { return new Thread(r, String.format("NettyServerEPOLLSelector_%d_%d", threadTotal, this.threadIndex.incrementAndGet())); } }); } else { this.eventLoopGroupSelector = new NioEventLoopGroup(nettyServerConfig.getServerSelectorThreads(), new ThreadFactory() { private AtomicInteger threadIndex = new AtomicInteger(0); private int threadTotal = nettyServerConfig.getServerSelectorThreads(); @Override public Thread newThread(Runnable r) { return new Thread(r, String.format("NettyServerNIOSelector_%d_%d", threadTotal, this.threadIndex.incrementAndGet())); } }); } //省略部分程式碼 }

在NettyRemotingServer例項初始化完成後,就會將其啟動。Server端在啟動階段會將之前例項化好的1個acceptor執行緒(eventLoopGroupBoss),N個IO執行緒(eventLoopGroupSelector),M1個worker 執行緒(defaultEventExecutorGroup)繫結上去。前面部分也已經介紹過各個執行緒池的作用了。擷取start()部分程式碼:

public void start() {
    //這裡的Worker執行緒池是專門用於處理Netty網路通訊相關的(包括編碼/解碼、空閒連結管理、網路連線管理以及網路請求處理)
    //RocketMQ-> Java NIO的1+N+M模型:1個acceptor執行緒,N個IO執行緒,M1個worker 執行緒。
    this.defaultEventExecutorGroup = new DefaultEventExecutorGroup(
        nettyServerConfig.getServerWorkerThreads(),
        new ThreadFactory() {

            private AtomicInteger threadIndex = new AtomicInteger(0);

            @Override
            public Thread newThread(Runnable r) {
                return new Thread(r, "NettyServerCodecThread_" + this.threadIndex.incrementAndGet());
            }
        });
    
    //省略部分程式碼
}

這裡需要說明的是,Worker執行緒拿到網路資料後,就交給Netty的ChannelPipeline(其採用責任鏈設計模式),從Head到Tail的一個個Handler執行下去,這些 Handler是在建立NettyRemotingServer例項時候指定的。擷取部分start()部分程式碼:

public void start() {
        //省略部分程式碼 
        ServerBootstrap childHandler =
            this.serverBootstrap.group(this.eventLoopGroupBoss, this.eventLoopGroupSelector)
                .channel(useEpoll() ? EpollServerSocketChannel.class : NioServerSocketChannel.class)
                .option(ChannelOption.SO_BACKLOG, 1024)
                .option(ChannelOption.SO_REUSEADDR, true)
                .option(ChannelOption.SO_KEEPALIVE, false)
                .childOption(ChannelOption.TCP_NODELAY, true)
                .childOption(ChannelOption.SO_SNDBUF, nettyServerConfig.getServerSocketSndBufSize())
                .childOption(ChannelOption.SO_RCVBUF, nettyServerConfig.getServerSocketRcvBufSize())
                .localAddress(new InetSocketAddress(this.nettyServerConfig.getListenPort()))
                .childHandler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    public void initChannel(SocketChannel ch) throws Exception {
                        ch.pipeline()
                            .addLast(defaultEventExecutorGroup, HANDSHAKE_HANDLER_NAME,
                                new HandshakeHandler(TlsSystemConfig.tlsMode))
                            .addLast(defaultEventExecutorGroup,
                                //rocketmq解碼器,他們分別覆蓋了父類的encode和decode方法
                                new NettyEncoder(),
                                //rocketmq編碼器
                                new NettyDecoder(),
                                //Netty自帶的心跳管理器     
                                new IdleStateHandler(0, 0, nettyServerConfig.getServerChannelMaxIdleTimeSeconds()),
                                //連線管理器,他負責捕獲新連線、連線斷開、異常等事件,然後統一排程到NettyEventExecuter處理器處理。     
                                new NettyConnectManageHandler(),
                                //當一個訊息經過前面的解碼等步驟後,然後排程到channelRead0方法,然後根據訊息型別進行分發     
                                new NettyServerHandler()
                            );
                    }
                });

NettyEncoder和NettyDecoder 負責網路傳輸資料和 RemotingCommand 之間的編解碼。NettyServerHandler 拿到解碼得到的 RemotingCommand 後,然後排程到channelRead0方法。NettyServerHandler程式碼:

class NettyServerHandler extends SimpleChannelInboundHandler<RemotingCommand> {

        @Override
        protected void channelRead0(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception {
            processMessageReceived(ctx, msg);
        }
    }

根據 RemotingCommand.type 來判斷是 request 還是 response來進行相應處理,根據業務請求碼封裝成不同的task任務後,提交給對應的業務processor處理執行緒池處理M2。processMessageReceived程式碼:

public void processMessageReceived(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception {
    final RemotingCommand cmd = msg;
    if (cmd != null) {
        switch (cmd.getType()) {
            case REQUEST_COMMAND:
                processRequestCommand(ctx, cmd);
                break;
            case RESPONSE_COMMAND:
                processResponseCommand(ctx, cmd);
                break;
            default:
                break;
        }
    }
}

流程總結

在這裡插入圖片描述
一個 Reactor 主執行緒負責監聽 TCP 連線請求,建立好連線後丟給 Reactor 執行緒池,它負責將建立好連線的 socket 註冊到 selector 上去(這裡有兩種方式,NIO和Epoll,可配置),然後監聽真正的網路資料。拿到網路資料後,再丟給 Worker 執行緒池。

Worker 拿到網路資料後,就交給 Pipeline,從 Head 到 Tail 一個個 Handler 的走下去,這些 Handler 是在建立 Server 的時候指定的。NettyEncoder 和 NettyDecoder 負責網路資料和 RemotingCommand 之間的編解碼。

NettyServerHandler 拿到解碼得到的 RemotingCommand 後,根據 RemotingCommand.type 來判斷是 request 還是 response,如果是 request, 就根據 RomotingCommand 的 code(code用來標識不同型別的請求) 去 processorTable 找到對應的 processor,然後封裝成 task 後,丟給對應的 processor 執行緒池, 如果是 response 就根據RemotingCommand.opaque 去 responseTable 中拿到對應的 ResponseFuture,把結果 set 給它。

對於 Client,經過 Pipeline 的順序是從 Tail 到 Head。不管是 Server 和 Client,並不是每次資料流轉都得經過所有的 Handler,而是會根據 Context 中的一些資訊去判斷。

整個資料流轉過程中還有很多hook, 比如處理 command 前,處理 command 後,傳送資料前,傳送資料後等。