RocketMQ 原始碼分析 NettyRemotingServer(六)
前言
已經寫了幾篇關於RocketMQ原始碼的分析,可能對其邏輯處理會多一點認識,但還沒深入到RocketMQ精髓中。比如MQ中的通訊是如何實現的、如何實現高效能,高可用、最終一致性、MQ 訊息儲存。這些才是我們閱讀原始碼的一個目標。所以這篇通過分析原始碼的來了解一下RocketMQ通訊機制。
本文很大部分摘錄了 匠心獨運的部落格。
NettyRemotingServer
RocketMQ中RPC通訊的通過1+N+M1+M2的Reactor多執行緒實現。
RocketMQ的RPC通訊採用Netty元件作為底層通訊庫,同樣也遵循了Reactor多執行緒模型,同時又在這之上做了一些擴充套件和優化。下面先給出一張RocketMQ的RPC通訊層的Netty多執行緒模型框架圖,讓大家對RocketMQ的RPC通訊中的多執行緒分離設計有一個大致的瞭解。
從上面的框圖中可以大致瞭解RocketMQ中NettyRemotingServer的Reactor 多執行緒模型。一個 Reactor 主執行緒(eventLoopGroupBoss,即為上面的1)負責監聽 TCP網路連線請求,建立好連線後丟給Reactor 執行緒池(eventLoopGroupSelector,即為上面的“N”,原始碼中預設設定為3),它負責將建立好連線的socket 註冊到 selector上去(RocketMQ的原始碼中會自動根據OS的型別選擇NIO和Epoll,也可以通過引數配置),然後監聽真正的網路資料。拿到網路資料後,再丟給Worker執行緒池(defaultEventExecutorGroup,即為上面的“M1”,原始碼中預設設定為8)
點開NettyRemotingServer可以看到eventLoopGroupBoss、eventLoopGroupSelector的初始過程,擷取部分NettyRemotingServer建構函式:
public NettyRemotingServer(final NettyServerConfig nettyServerConfig,
final ChannelEventListener channelEventListener)
//省略部分程式碼
// eventLoopGroupBoss負責監聽 TCP網路連線請求
this. eventLoopGroupBoss = new NioEventLoopGroup(1, new ThreadFactory() {
private AtomicInteger threadIndex = new AtomicInteger(0);
@Override
public Thread newThread(Runnable r) {
return new Thread(r, String.format("NettyBoss_%d", this.threadIndex.incrementAndGet()));
}
});
//這個是在eventLoopGroupBoss在接受到連線的時候,它負責將建立好連線的socket註冊到selector上去
if (useEpoll()) {
this.eventLoopGroupSelector = new EpollEventLoopGroup(nettyServerConfig.getServerSelectorThreads(), new ThreadFactory() {
private AtomicInteger threadIndex = new AtomicInteger(0);
private int threadTotal = nettyServerConfig.getServerSelectorThreads();
@Override
public Thread newThread(Runnable r) {
return new Thread(r, String.format("NettyServerEPOLLSelector_%d_%d", threadTotal, this.threadIndex.incrementAndGet()));
}
});
} else {
this.eventLoopGroupSelector = new NioEventLoopGroup(nettyServerConfig.getServerSelectorThreads(), new ThreadFactory() {
private AtomicInteger threadIndex = new AtomicInteger(0);
private int threadTotal = nettyServerConfig.getServerSelectorThreads();
@Override
public Thread newThread(Runnable r) {
return new Thread(r, String.format("NettyServerNIOSelector_%d_%d", threadTotal, this.threadIndex.incrementAndGet()));
}
});
}
//省略部分程式碼
}
在NettyRemotingServer例項初始化完成後,就會將其啟動。Server端在啟動階段會將之前例項化好的1個acceptor執行緒(eventLoopGroupBoss),N個IO執行緒(eventLoopGroupSelector),M1個worker 執行緒(defaultEventExecutorGroup)繫結上去。前面部分也已經介紹過各個執行緒池的作用了。擷取start()部分程式碼:
public void start() {
//這裡的Worker執行緒池是專門用於處理Netty網路通訊相關的(包括編碼/解碼、空閒連結管理、網路連線管理以及網路請求處理)
//RocketMQ-> Java NIO的1+N+M模型:1個acceptor執行緒,N個IO執行緒,M1個worker 執行緒。
this.defaultEventExecutorGroup = new DefaultEventExecutorGroup(
nettyServerConfig.getServerWorkerThreads(),
new ThreadFactory() {
private AtomicInteger threadIndex = new AtomicInteger(0);
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "NettyServerCodecThread_" + this.threadIndex.incrementAndGet());
}
});
//省略部分程式碼
}
這裡需要說明的是,Worker執行緒拿到網路資料後,就交給Netty的ChannelPipeline(其採用責任鏈設計模式),從Head到Tail的一個個Handler執行下去,這些 Handler是在建立NettyRemotingServer例項時候指定的。擷取部分start()部分程式碼:
public void start() {
//省略部分程式碼
ServerBootstrap childHandler =
this.serverBootstrap.group(this.eventLoopGroupBoss, this.eventLoopGroupSelector)
.channel(useEpoll() ? EpollServerSocketChannel.class : NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 1024)
.option(ChannelOption.SO_REUSEADDR, true)
.option(ChannelOption.SO_KEEPALIVE, false)
.childOption(ChannelOption.TCP_NODELAY, true)
.childOption(ChannelOption.SO_SNDBUF, nettyServerConfig.getServerSocketSndBufSize())
.childOption(ChannelOption.SO_RCVBUF, nettyServerConfig.getServerSocketRcvBufSize())
.localAddress(new InetSocketAddress(this.nettyServerConfig.getListenPort()))
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline()
.addLast(defaultEventExecutorGroup, HANDSHAKE_HANDLER_NAME,
new HandshakeHandler(TlsSystemConfig.tlsMode))
.addLast(defaultEventExecutorGroup,
//rocketmq解碼器,他們分別覆蓋了父類的encode和decode方法
new NettyEncoder(),
//rocketmq編碼器
new NettyDecoder(),
//Netty自帶的心跳管理器
new IdleStateHandler(0, 0, nettyServerConfig.getServerChannelMaxIdleTimeSeconds()),
//連線管理器,他負責捕獲新連線、連線斷開、異常等事件,然後統一排程到NettyEventExecuter處理器處理。
new NettyConnectManageHandler(),
//當一個訊息經過前面的解碼等步驟後,然後排程到channelRead0方法,然後根據訊息型別進行分發
new NettyServerHandler()
);
}
});
NettyEncoder和NettyDecoder 負責網路傳輸資料和 RemotingCommand 之間的編解碼。NettyServerHandler 拿到解碼得到的 RemotingCommand 後,然後排程到channelRead0方法。NettyServerHandler程式碼:
class NettyServerHandler extends SimpleChannelInboundHandler<RemotingCommand> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception {
processMessageReceived(ctx, msg);
}
}
根據 RemotingCommand.type 來判斷是 request 還是 response來進行相應處理,根據業務請求碼封裝成不同的task任務後,提交給對應的業務processor處理執行緒池處理M2。processMessageReceived程式碼:
public void processMessageReceived(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception {
final RemotingCommand cmd = msg;
if (cmd != null) {
switch (cmd.getType()) {
case REQUEST_COMMAND:
processRequestCommand(ctx, cmd);
break;
case RESPONSE_COMMAND:
processResponseCommand(ctx, cmd);
break;
default:
break;
}
}
}
流程總結
一個 Reactor 主執行緒負責監聽 TCP 連線請求,建立好連線後丟給 Reactor 執行緒池,它負責將建立好連線的 socket 註冊到 selector 上去(這裡有兩種方式,NIO和Epoll,可配置),然後監聽真正的網路資料。拿到網路資料後,再丟給 Worker 執行緒池。
Worker 拿到網路資料後,就交給 Pipeline,從 Head 到 Tail 一個個 Handler 的走下去,這些 Handler 是在建立 Server 的時候指定的。NettyEncoder 和 NettyDecoder 負責網路資料和 RemotingCommand 之間的編解碼。
NettyServerHandler 拿到解碼得到的 RemotingCommand 後,根據 RemotingCommand.type 來判斷是 request 還是 response,如果是 request, 就根據 RomotingCommand 的 code(code用來標識不同型別的請求) 去 processorTable 找到對應的 processor,然後封裝成 task 後,丟給對應的 processor 執行緒池, 如果是 response 就根據RemotingCommand.opaque 去 responseTable 中拿到對應的 ResponseFuture,把結果 set 給它。
對於 Client,經過 Pipeline 的順序是從 Tail 到 Head。不管是 Server 和 Client,並不是每次資料流轉都得經過所有的 Handler,而是會根據 Context 中的一些資訊去判斷。
整個資料流轉過程中還有很多hook, 比如處理 command 前,處理 command 後,傳送資料前,傳送資料後等。