rocketmq之原始碼分析netty實現原始碼(六)
阿新 • • 發佈:2019-05-29
netty的服務端核心屬性
public class NettyRemotingServer extends NettyRemotingAbstract implements RemotingServer { private static final InternalLogger log = InternalLoggerFactory.getLogger(RemotingHelper.ROCKETMQ_REMOTING); //netty的啟動核心入口 private final ServerBootstrap serverBootstrap; //netty的入口連線池 private final EventLoopGroup eventLoopGroupSelector; //netty的handler的連線池 private final EventLoopGroup eventLoopGroupBoss; private final NettyServerConfig nettyServerConfig; //操作處理的執行緒池 private final ExecutorService publicExecutor; //處理監聽的回撥服務 private final ChannelEventListener channelEventListener; private final Timer timer = new Timer("ServerHouseKeepingService", true); private DefaultEventExecutorGroup defaultEventExecutorGroup;
netty服務端的核心構造
public NettyRemotingServer(final NettyServerConfig nettyServerConfig, final ChannelEventListener channelEventListener) { super(nettyServerConfig.getServerOnewaySemaphoreValue(), nettyServerConfig.getServerAsyncSemaphoreValue()); //初始化netty的核心框架 this.serverBootstrap = new ServerBootstrap(); this.nettyServerConfig = nettyServerConfig; this.channelEventListener = channelEventListener; int publicThreadNums = nettyServerConfig.getServerCallbackExecutorThreads(); if (publicThreadNums <= 0) { publicThreadNums = 4; } //共享執行緒池的初始化 this.publicExecutor = Executors.newFixedThreadPool(publicThreadNums, new ThreadFactory() { private AtomicInteger threadIndex = new AtomicInteger(0); @Override public Thread newThread(Runnable r) { return new Thread(r, "NettyServerPublicExecutor_" + this.threadIndex.incrementAndGet()); } }); //獲得系統平臺是否支援epoll的高效能io操作 if (useEpoll()) { this.eventLoopGroupBoss = new EpollEventLoopGroup(1, new ThreadFactory() { private AtomicInteger threadIndex = new AtomicInteger(0); @Override public Thread newThread(Runnable r) { return new Thread(r, String.format("NettyEPOLLBoss_%d", this.threadIndex.incrementAndGet())); } }); this.eventLoopGroupSelector = new EpollEventLoopGroup(nettyServerConfig.getServerSelectorThreads(), new ThreadFactory() { private AtomicInteger threadIndex = new AtomicInteger(0); private int threadTotal = nettyServerConfig.getServerSelectorThreads(); @Override public Thread newThread(Runnable r) { return new Thread(r, String.format("NettyServerEPOLLSelector_%d_%d", threadTotal, this.threadIndex.incrementAndGet())); } }); } else { this.eventLoopGroupBoss = new NioEventLoopGroup(1, new ThreadFactory() { private AtomicInteger threadIndex = new AtomicInteger(0); @Override public Thread newThread(Runnable r) { return new Thread(r, String.format("NettyNIOBoss_%d", this.threadIndex.incrementAndGet())); } }); this.eventLoopGroupSelector = new NioEventLoopGroup(nettyServerConfig.getServerSelectorThreads(), new ThreadFactory() { private AtomicInteger threadIndex = new AtomicInteger(0); private int threadTotal = nettyServerConfig.getServerSelectorThreads(); @Override public Thread newThread(Runnable r) { return new Thread(r, String.format("NettyServerNIOSelector_%d_%d", threadTotal, this.threadIndex.incrementAndGet())); } }); } loadSslContext(); }
netty的服務端核心啟動
public void start() { //預設的事件處理程序組初始化 this.defaultEventExecutorGroup = new DefaultEventExecutorGroup( nettyServerConfig.getServerWorkerThreads(), new ThreadFactory() { private AtomicInteger threadIndex = new AtomicInteger(0); @Override public Thread newThread(Runnable r) { return new Thread(r, "NettyServerCodecThread_" + this.threadIndex.incrementAndGet()); } }); //netty的服務端啟動配置 ServerBootstrap childHandler = //netty中服務端的標準設定,一個boss,多個worker this.serverBootstrap.group(this.eventLoopGroupBoss, this.eventLoopGroupSelector) //採用的nio模型 .channel(useEpoll() ? EpollServerSocketChannel.class : NioServerSocketChannel.class) //socket引數,服務端接受連結的佇列長度,如果佇列已滿則拒絕,預設值較小 win200 ,它128 .option(ChannelOption.SO_BACKLOG, 1024) //socket引數,地址複用,預設false,快速啟動的操作更優 .option(ChannelOption.SO_REUSEADDR, true) //socket引數,連線保持,預設false,tcp會主動探測連線的有效性,可理解為心跳 .option(ChannelOption.SO_KEEPALIVE, false) //tcp引數,立即傳送資料,netty預設true,系統預設false,如果選擇頻寬的效能可以設定為false,一次傳送多個數據款 .childOption(ChannelOption.TCP_NODELAY, true) //socket引數,tcp資料傳送緩衝區 .childOption(ChannelOption.SO_SNDBUF, nettyServerConfig.getServerSocketSndBufSize()) //socket引數,tcp資料接收緩衝區 .childOption(ChannelOption.SO_RCVBUF, nettyServerConfig.getServerSocketRcvBufSize()) //綁定當前伺服器及埠 .localAddress(new InetSocketAddress(this.nettyServerConfig.getListenPort())) //設定handler .childHandler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) throws Exception { ch.pipeline() // .addLast(defaultEventExecutorGroup, HANDSHAKE_HANDLER_NAME, new HandshakeHandler(TlsSystemConfig.tlsMode)) //增加業務需要的handler .addLast(defaultEventExecutorGroup, //編碼 new NettyEncoder(), //解碼 new NettyDecoder(), //idle new IdleStateHandler(0, 0, nettyServerConfig.getServerChannelMaxIdleTimeSeconds()), //連線管理 new NettyConnectManageHandler(), //業務核心處理 new NettyServerHandler() ); } }); //buf的分配器設定 if (nettyServerConfig.isServerPooledByteBufAllocatorEnable()) { childHandler.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT); } //啟動當前的服務 try { ChannelFuture sync = this.serverBootstrap.bind().sync(); InetSocketAddress addr = (InetSocketAddress) sync.channel().localAddress(); this.port = addr.getPort(); } catch (InterruptedException e1) { throw new RuntimeException("this.serverBootstrap.bind().sync() InterruptedException", e1); } //namesrv端的事件回撥處理機制,會呼叫channelEventListener的事件處理 if (this.channelEventListener != null) { this.nettyEventExecutor.start(); } //執行響應資料的處理 this.timer.scheduleAtFixedRate(new TimerTask() { @Override public void run() { try { NettyRemotingServer.this.scanResponseTable(); } catch (Throwable e) { log.error("scanResponseTable exception", e); } } }, 1000 * 3, 1000); }
netty服務端核心業務邏輯設計
class NettyServerHandler extends SimpleChannelInboundHandler<RemotingCommand> { //讀取網路請求 @Override protected void channelRead0(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception { processMessageReceived(ctx, msg); } }
public void processMessageReceived(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception { final RemotingCommand cmd = msg; if (cmd != null) { switch (cmd.getType()) { case REQUEST_COMMAND: //處理請求類的操作 processRequestCommand(ctx, cmd); break; case RESPONSE_COMMAND: //處理響應類的操作 processResponseCommand(ctx, cmd); break; default: break; } } }
public void processRequestCommand(final ChannelHandlerContext ctx, final RemotingCommand cmd) { //獲得對應的事件處理物件 final Pair<NettyRequestProcessor, ExecutorService> matched = this.processorTable.get(cmd.getCode()); //健全的配置事件處理 final Pair<NettyRequestProcessor, ExecutorService> pair = null == matched ? this.defaultRequestProcessor : matched; //訊息唯一標示 final int opaque = cmd.getOpaque(); if (pair != null) { //構建非同步執行緒操作 Runnable run = new Runnable() { @Override public void run() { try { //前置處理 doBeforeRpcHooks(RemotingHelper.parseChannelRemoteAddr(ctx.channel()), cmd); //執行對應的事件處理 final RemotingCommand response = pair.getObject1().processRequest(ctx, cmd); //後置處理 doAfterRpcHooks(RemotingHelper.parseChannelRemoteAddr(ctx.channel()), cmd, response); //如果不是單發需要返回對應的請求唯一標識和資料 if (!cmd.isOnewayRPC()) { if (response != null) { response.setOpaque(opaque); response.markResponseType(); try { //將結果資料會寫給netty的channel ctx.writeAndFlush(response); } catch (Throwable e) { log.error("process request over, but response failed", e); log.error(cmd.toString()); log.error(response.toString()); } } else { } } } catch (Throwable e) { log.error("process request exception", e); log.error(cmd.toString()); if (!cmd.isOnewayRPC()) { final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_ERROR, RemotingHelper.exceptionSimpleDesc(e)); response.setOpaque(opaque); ctx.writeAndFlush(response); } } } }; //驗證請求處理型別 if (pair.getObject1().rejectRequest()) { final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_BUSY, "[REJECTREQUEST]system busy, start flow control for a while"); response.setOpaque(opaque); ctx.writeAndFlush(response); return; } //封裝非同步處理執行緒,同時根據配置的事件執行緒池執行事件非同步執行緒, try { final RequestTask requestTask = new RequestTask(run, ctx.channel(), cmd); pair.getObject2().submit(requestTask); } catch (RejectedExecutionException e) { if ((System.currentTimeMillis() % 10000) == 0) { log.warn(RemotingHelper.parseChannelRemoteAddr(ctx.channel()) + ", too many requests and system thread pool busy, RejectedExecutionException " + pair.getObject2().toString() + " request code: " + cmd.getCode()); } if (!cmd.isOnewayRPC()) { final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_BUSY, "[OVERLOAD]system busy, start flow control for a while"); response.setOpaque(opaque); ctx.writeAndFlush(response); } } } else { String error = " request type " + cmd.getCode() + " not supported"; final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.REQUEST_CODE_NOT_SUPPORTED, error); response.setOpaque(opaque); ctx.writeAndFlush(response); log.error(RemotingHelper.parseChannelRemoteAddr(ctx.channel()) + error); } }
===========================================================================
netty客戶端的核心屬性
public class NettyRemotingClient extends NettyRemotingAbstract implements RemotingClient { private static final InternalLogger log = InternalLoggerFactory.getLogger(RemotingHelper.ROCKETMQ_REMOTING); private static final long LOCK_TIMEOUT_MILLIS = 3000; //netty的client配置 private final NettyClientConfig nettyClientConfig; //netty的啟動核心入口 private final Bootstrap bootstrap = new Bootstrap(); //netty的啟動執行緒池事件集合 private final EventLoopGroup eventLoopGroupWorker; private final Lock lockChannelTables = new ReentrantLock(); //channel的本地記憶體資料介面 private final ConcurrentMap<String /* addr */, ChannelWrapper> channelTables = new ConcurrentHashMap<String, ChannelWrapper>(); private final Timer timer = new Timer("ClientHouseKeepingService", true); private final AtomicReference<List<String>> namesrvAddrList = new AtomicReference<List<String>>(); private final AtomicReference<String> namesrvAddrChoosed = new AtomicReference<String>(); private final AtomicInteger namesrvIndex = new AtomicInteger(initValueIndex()); private final Lock lockNamesrvChannel = new ReentrantLock(); private final ExecutorService publicExecutor; /** * Invoke the callback methods in this executor when process response. */ private ExecutorService callbackExecutor; private final ChannelEventListener channelEventListener; private DefaultEventExecutorGroup defaultEventExecutorGroup;
netty客戶端的初始化構造
//客戶端的通訊實現 public NettyRemotingClient(final NettyClientConfig nettyClientConfig, final ChannelEventListener channelEventListener) { super(nettyClientConfig.getClientOnewaySemaphoreValue(), nettyClientConfig.getClientAsyncSemaphoreValue()); this.nettyClientConfig = nettyClientConfig; this.channelEventListener = channelEventListener; int publicThreadNums = nettyClientConfig.getClientCallbackExecutorThreads(); if (publicThreadNums <= 0) { publicThreadNums = 4; } //共享執行緒池配置 this.publicExecutor = Executors.newFixedThreadPool(publicThreadNums, new ThreadFactory() { private AtomicInteger threadIndex = new AtomicInteger(0); @Override public Thread newThread(Runnable r) { return new Thread(r, "NettyClientPublicExecutor_" + this.threadIndex.incrementAndGet()); } }); //netty的客戶端處理eventgroup配置 this.eventLoopGroupWorker = new NioEventLoopGroup(1, new ThreadFactory() { private AtomicInteger threadIndex = new AtomicInteger(0); @Override public Thread newThread(Runnable r) { return new Thread(r, String.format("NettyClientSelector_%d", this.threadIndex.incrementAndGet())); } }); //驗證是否載入ssl if (nettyClientConfig.isUseTLS()) { try { sslContext = TlsHelper.buildSslContext(true); log.info("SSL enabled for client"); } catch (IOException e) { log.error("Failed to create SSLContext", e); } catch (CertificateException e) { log.error("Failed to create SSLContext", e); throw new RuntimeException("Failed to create SSLContext", e); } } }
netty客戶端的啟動
public void start() { //構造個性的執行緒池執行分組,定製執行緒名稱格式 this.defaultEventExecutorGroup = new DefaultEventExecutorGroup( nettyClientConfig.getClientWorkerThreads(), new ThreadFactory() { private AtomicInteger threadIndex = new AtomicInteger(0); @Override public Thread newThread(Runnable r) { return new Thread(r, "NettyClientWorkerThread_" + this.threadIndex.incrementAndGet()); } }); //netty的客戶端通訊關鍵,可以參照netty的option配置詳細 Bootstrap handler = this.bootstrap.group(this.eventLoopGroupWorker).channel(NioSocketChannel.class) .option(ChannelOption.TCP_NODELAY, true) .option(ChannelOption.SO_KEEPALIVE, false) .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, nettyClientConfig.getConnectTimeoutMillis()) .option(ChannelOption.SO_SNDBUF, nettyClientConfig.getClientSocketSndBufSize()) .option(ChannelOption.SO_RCVBUF, nettyClientConfig.getClientSocketRcvBufSize()) .handler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); if (nettyClientConfig.isUseTLS()) { if (null != sslContext) { pipeline.addFirst(defaultEventExecutorGroup, "sslHandler", sslContext.newHandler(ch.alloc())); log.info("Prepend SSL handler"); } else { log.warn("Connections are insecure as SSLContext is null!"); } } pipeline.addLast( defaultEventExecutorGroup, //編碼 new NettyEncoder(), //解碼 new NettyDecoder(), //idle new IdleStateHandler(0, 0, nettyClientConfig.getClientChannelMaxIdleTimeSeconds()), //連結管理 new NettyConnectManageHandler(), //核心請求處理 new NettyClientHandler()); } }); //執行掃描響應結果,每秒 this.timer.scheduleAtFixedRate(new TimerTask() { @Override public void run() { try { NettyRemotingClient.this.scanResponseTable(); } catch (Throwable e) { log.error("scanResponseTable exception", e); } } }, 1000 * 3, 1000); //事件處理機制 if (this.channelEventListener != null) { this.nettyEventExecutor.start(); } }
//netty客戶端的核心實現 class NettyClientHandler extends SimpleChannelInboundHandler<RemotingCommand> { @Override protected void channelRead0(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception { processMessageReceived(ctx, msg); } }
服務端和客戶端共享的程式碼設計
public void processMessageReceived(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception { final RemotingCommand cmd = msg; if (cmd != null) { switch (cmd.getType()) { case REQUEST_COMMAND: //處理請求類的操作 processRequestCommand(ctx, cmd); break; case RESPONSE_COMMAND: //處理響應類的操作 processResponseCommand(ctx, cmd); break; default: break; } } }
netty客戶端處理服務端的響應資料
public void processResponseCommand(ChannelHandlerContext ctx, RemotingCommand cmd) { final int opaque = cmd.getOpaque(); final ResponseFuture responseFuture = responseTable.get(opaque); if (responseFuture != null) { responseFuture.setResponseCommand(cmd); responseTable.remove(opaque); if (responseFuture.getInvokeCallback() != null) { executeInvokeCallback(responseFuture); } else { responseFuture.putResponse(cmd); responseFuture.release(); } } else { log.warn("receive response, but not matched any request, " + RemotingHelper.parseChannelRemoteAddr(ctx.channel())); log.warn(cmd.toString()); } }
=============================================================================
這裡只講netty的通訊設計,後面的章節會根據功能詳細的分析實現不同功能的技術