Netty 服務端啟動過程
在 Netty 中創建 1 個 NioServerSocketChannel 在指定的端口監聽客戶端連接,這個過程主要有以下 個步驟:
- 創建 NioServerSocketChannel
- 初始化並註冊 NioServerSocketChannel
- 綁定指定端口
首先列出一個簡易服務端的啟動代碼:
1 public void start() { 2 EventLoopGroup bossGroup = new NioEventLoopGroup(1); 3 EventLoopGroup workerGroup = new NioEventLoopGroup();4 try { 5 ServerBootstrap sbs = new ServerBootstrap() 6 //添加 group 7 .group(bossGroup, workerGroup) 8 //指定服務端 Channel 類型 9 .channel(NioServerSocketChannel.class) 10 //添加服務端 Channel 的 Handler 11 .handler(newHelloWorldServerHandler()) 12 //添加客戶端 Channel 的 Handler 13 .childHandler(new ChannelInitializer<NioSocketChannel>() { 14 @Override 15 protected void initChannel(NioSocketChannel ch) throws Exception { 16 //為後續接入的客戶端 Channel 準備的字符串編解碼 Handler 17 ch.pipeline().addLast(new StringDecoder()); 18 ch.pipeline().addLast(new StringEncoder()); 19 } 20 }); 21 //監聽指定的端口 22 ChannelFuture future = sbs.bind(port).sync(); 23 System.out.println("Server start listen at " + port); 24 future.channel().closeFuture().sync(); 25 } catch (Exception e) { 26 bossGroup.shutdownGracefully(); 27 workerGroup.shutdownGracefully(); 28 } 29 }
下面就從 ServerBootstrap 的 bind(int port)方法開始分析服務端的 NioServerSocketChannel 的創建過程。
1. 創建 NioServerSocketChannel
跟隨 bind 方法的調用,最終在 AbstractBootstrap 類的 doBind()方法找到了初始化,註冊和綁定方法調用:
1 private ChannelFuture doBind(final SocketAddress localAddress) { 2 //初始化並註冊 3 final ChannelFuture regFuture = initAndRegister(); 4 final Channel channel = regFuture.channel(); 5 if (regFuture.cause() != null) { 6 return regFuture; 7 } 8 9 if (regFuture.isDone()) { 10 // At this point we know that the registration was complete and successful. 11 ChannelPromise promise = channel.newPromise(); 12 //綁定本地端口 13 doBind0(regFuture, channel, localAddress, promise); 14 return promise; 15 } else { 16 //.... 17 } 18 }
2.
- 初始化並註冊 NioServerSocketChannel
首先來看一下這個 initAndRegister()方法:
1 final ChannelFuture initAndRegister() { 2 Channel channel = null; 3 try { 4 //創建 Channel 5 channel = channelFactory.newChannel(); 6 //初始化 Channel 7 init(channel); 8 } catch (Throwable t) { 9 //... 10 } 11 12 //註冊 13 ChannelFuture regFuture = config().group().register(channel); 14 if (regFuture.cause() != null) { 15 if (channel.isRegistered()) { 16 channel.close(); 17 } else { 18 channel.unsafe().closeForcibly(); 19 } 20 } 21 //... 22 }
Channel 也是通過工廠類來創建的,這個工廠默認是 ReflectiveChannelFactory,是在前面啟動代碼中,設置服務端 Channel 類型時創建的。通過名字可以知道,是用反射的方式創建了 Channel 對象。
init()方法有兩種實現,這裏分析的是 ServerBootstrap 的實現:
1 @Override 2 void init(Channel channel) throws Exception { 3 //... option 的設置省略掉 4 //pipeline 的創建,默認使用的 DefaultPipeline 5 ChannelPipeline p = channel.pipeline(); 6 7 //... 客戶端 Channel 相關配置的保存 8 9 p.addLast(new ChannelInitializer<Channel>() { 10 @Override 11 public void initChannel(Channel ch) throws Exception { 12 final ChannelPipeline pipeline = ch.pipeline(); 13 //這裏添加的是啟動代碼中,服務端的 Handler 14 ChannelHandler handler = config.handler(); 15 if (handler != null) { 16 pipeline.addLast(handler); 17 } 18 19 // We add this handler via the EventLoop as the user may have used a ChannelInitializer as handler. 20 // In this case the initChannel(...) method will only be called after this method returns. Because 21 // of this we need to ensure we add our handler in a delayed fashion so all the users handler are 22 // placed in front of the ServerBootstrapAcceptor. 23 ch.eventLoop().execute(new Runnable() { 24 @Override 25 public void run() { 26 //這裏添加了一個 Accepter,用來處理新連接的接入 27 pipeline.addLast(new ServerBootstrapAcceptor( 28 currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs)); 29 } 30 }); 31 } 32 }); 33 }
初始化 Channel 這個動作,主要做了 4 件事:
- 創建 pipeline
- 為 Channel 添加用戶創建的 Handler
- 添加 Accepter
- 其他屬性的設置
接下來分析 Channel 的註冊,需要關註的是這行代碼:
1 ChannelFuture regFuture = config().group().register(channel);
config()方法獲取了啟動時創建的 config 對象,這個對象的 group()方法就返回了啟動時傳入的 bossGroup。啟動代碼中傳入了兩個 group,返回的為什麽是 boosGroup 呢?查看啟動代碼中的 group(EventLoopGroup parentGroup, EventLoopGroup childGroup)方法,在它第一行就調用了 super.group(parentGroup),將第一個 group 對象傳給了父類 AbstractBootstrap。而此處 config 調用的 group()方法返回的正是父類中的 group。
因為這裏是一個 NioEventLoopGroup 對象,所以使用的 register(channel)方法是 MultithreadEventLoopGroup 中的。
1 @Override 2 public ChannelFuture register(Channel channel) { 3 return next().register(channel); 4 }
查看 next()方法可以發現,最終是調用之前創建 group 時創建的 chooser 的 next()方法,該方法會返回一個 NioEventLooop 對象(EventLoop 是在這裏分配的),它的 register()方法是在父類 SingleThreadEventLoop 中實現的。最終調用了 AbstractChannel 中的註冊方法。
1 @Override 2 public final void register(EventLoop eventLoop, final ChannelPromise promise) { 3 //... 4 //將前面返回的 eventLoop 保存起來 5 AbstractChannel.this.eventLoop = eventLoop; 6 //判斷 eventLoop 中的 thread 是否是當前線程 7 //初次啟動時,eventLoop 中的 thread 為 null 8 if (eventLoop.inEventLoop()) { 9 register0(promise); 10 } else { 11 try { 12 //將註冊任務傳進去 13 eventLoop.execute(new Runnable() { 14 @Override 15 public void run() { 16 //註冊 17 register0(promise); 18 } 19 }); 20 } catch (Throwable t) { 21 //... 22 } 23 } 24 }
將註冊動作封裝成一個任務,然後交給 eventLoop 對象處理。
@Override public void execute(Runnable task) { //... //這裏通用是 false boolean inEventLoop = inEventLoop(); if (inEventLoop) { addTask(task); } else { //啟動線程 startThread(); addTask(task);//將前面傳進來的註冊任務添加進隊列 if (isShutdown() && removeTask(task)) { reject(); } } if (!addTaskWakesUp && wakesUpForTask(task)) { wakeup(inEventLoop); } } private void startThread() { //判斷是否需要啟動線程 if (STATE_UPDATER.get(this) == ST_NOT_STARTED) { if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) { //啟動線程 doStartThread(); } } }
上面代碼中的 startThread()方法有個 STATE_UPDATER,它是用來更新該對象的 state 屬性,是一個線程安全的操作。state 默認值為 ST_NOT_STARTED,所以第一次進入該方法,條件判斷為 true,接下來進行 CAS 操作,將 state 設置為 ST_STARTED,然後調用 doStartThread()方法。當 group 中的線程都啟用之後,下一次 chooser 再選中這個線程,startThread()方法中的第一個 if 的條件判斷就是 false 了,不會再創建新的線程。
1 private void doStartThread() { 2 assert thread == null; 3 //這個 executor 就是構建 group 時,創建出來的 executor 4 executor.execute(new Runnable() { 5 @Override 6 public void run() { 7 thread = Thread.currentThread(); 8 if (interrupted) { 9 thread.interrupt(); 10 } 11 12 boolean success = false; 13 updateLastExecutionTime(); 14 try { 15 //前面創建的是 NioEventLoop 16 SingleThreadEventExecutor.this.run(); 17 success = true; 18 } catch (Throwable t) { 19 logger.warn("Unexpected exception from an event executor: ", t); 20 } finally { 21 for (;;) { 22 //更新 state 23 int oldState = STATE_UPDATER.get(SingleThreadEventExecutor.this); 24 if (oldState >= ST_SHUTTING_DOWN || STATE_UPDATER.compareAndSet( 25 SingleThreadEventExecutor.this, oldState, ST_SHUTTING_DOWN)) { 26 break; 27 } 28 } 29 //... 30 } 31 } 32 }); 33 }
前一篇分析 EventLoopGroup 創建時說過,會在 EventLoop 保存一個 executor 對象的引用,最終個任務就是交給這個 executor 來處理的。executor 的 execute(Runnable task) 方法會創建新線程,並執行傳入的 task。接下來看一下 NioEventLoop 中的 run() 方法。
1 protected void run() { 2 for (;;) { 3 try { 4 //計算 select 策略,當前有任務時,會進行一次 selectNow(NIO),返回就緒的 key 個數 5 //顯然 switch 中沒有匹配項,直接跳出 switch 6 //無任務時,則直接返回 SelectStrategy.SELECT 7 //這裏的 SelectStrategy.CONTINUE 感覺不會匹配到 8 switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) { 9 case SelectStrategy.CONTINUE: 10 continue; 11 case SelectStrategy.SELECT: 12 //當沒有可處理的任務時,直接進行 select 操作 13 // wakenUp.getAndSet(false) 返回的是 oldValue,由於默認值是 false 14 // 所以第一次返回的是 false 15 select(wakenUp.getAndSet(false)); 16 17 // ‘wakenUp.compareAndSet(false, true)‘ is always evaluated 18 // before calling ‘selector.wakeup()‘ to reduce the wake-up 19 // overhead. (Selector.wakeup() is an expensive operation.) 20 // 21 // However, there is a race condition in this approach. 22 // The race condition is triggered when ‘wakenUp‘ is set to 23 // true too early. 24 // 25 // ‘wakenUp‘ is set to true too early if: 26 // 1) Selector is waken up between ‘wakenUp.set(false)‘ and 27 // ‘selector.select(...)‘. (BAD) 28 // 2) Selector is waken up between ‘selector.select(...)‘ and 29 // ‘if (wakenUp.get()) { ... }‘. (OK) 30 // 31 // In the first case, ‘wakenUp‘ is set to true and the 32 // following ‘selector.select(...)‘ will wake up immediately. 33 // Until ‘wakenUp‘ is set to false again in the next round, 34 // ‘wakenUp.compareAndSet(false, true)‘ will fail, and therefore 35 // any attempt to wake up the Selector will fail, too, causing 36 // the following ‘selector.select(...)‘ call to block 37 // unnecessarily. 38 // 39 // To fix this problem, we wake up the selector again if wakenUp 40 // is true immediately after selector.select(...). 41 // It is inefficient in that it wakes up the selector for both 42 // the first case (BAD - wake-up required) and the second case 43 // (OK - no wake-up required). 44 45 if (wakenUp.get()) { 46 selector.wakeup(); 47 } 48 default: 49 // fallthrough 50 } 51 52 cancelledKeys = 0; 53 needsToSelectAgain = false; 54 final int ioRatio = this.ioRatio; 55 //根據比例來處理 IO 事件和任務 56 if (ioRatio == 100) { 57 try { 58 //處理就緒的 key 59 processSelectedKeys(); 60 } finally { 61 // Ensure we always run tasks. 62 //執行任務 63 runAllTasks(); 64 } 65 } else { 66 final long ioStartTime = System.nanoTime(); 67 try { 68 processSelectedKeys(); 69 } finally { 70 // Ensure we always run tasks. 71 // 計算出處理 IO 事件的時間,然後根據比例算出執行任務的時間 72 final long ioTime = System.nanoTime() - ioStartTime; 73 runAllTasks(ioTime * (100 - ioRatio) / ioRatio); 74 } 75 } 76 } catch (Throwable t) { 77 handleLoopException(t); 78 } 79 // Always handle shutdown even if the loop processing threw an exception. 80 try { 81 if (isShuttingDown()) { 82 closeAll(); 83 if (confirmShutdown()) { 84 return; 85 } 86 } 87 } catch (Throwable t) { 88 handleLoopException(t); 89 } 90 } 91 }
run()方法主要是做 select 操作,和處理 IO 事件和任務隊列中的任務,這部分內容下一篇文章再分析。從 executor 執行 execute()方法開始,由 Netyy 管理的線程就開始啟動運行了。實際上此時的 NioServerSocketChannel 對象還沒有註冊到 Netty 線程的 Selector 上,Debug 結果如下圖:
上圖中的 startThread()方法實際上是給 executor 提交了一個任務,緊接著 main 線程就調用了 addTask()方法,將 task 添加到 EventLoop 對象的任務隊列中,而這個 task 的內容就是執行註冊操作。在添加了註冊任務之後,Netty 線程就會在 select 完成後,執行隊列中的任務,將 NioServerSocketChannel 註冊到該線程的 Selector 上。接下來分析一下 AbstractChannel 的 register0()方法:
1 private void register0(ChannelPromise promise) { 2 try { 3 // check if the channel is still open as it could be closed in the mean time when the register 4 // call was outside of the eventLoop 5 if (!promise.setUncancellable() || !ensureOpen(promise)) { 6 return; 7 } 8 boolean firstRegistration = neverRegistered; 9 //註冊通道 10 doRegister(); 11 neverRegistered = false; 12 registered = true; 13 14 // Ensure we call handlerAdded(...) before we actually notify the promise. This is needed as the 15 // user may already fire events through the pipeline in the ChannelFutureListener. 16 //添加服務端 Channel 的 Handler 17 pipeline.invokeHandlerAddedIfNeeded(); 18 19 safeSetSuccess(promise); 20 //觸發通道註冊事件在 pipeline 上傳播 21 pipeline.fireChannelRegistered(); 22 // Only fire a channelActive if the channel has never been registered. This prevents firing 23 // multiple channel actives if the channel is deregistered and re-registered. 24 if (isActive()) {//第一次運行到這兒時,結果為 false,因為此時還沒有 bind 25 if (firstRegistration) { 26 pipeline.fireChannelActive(); 27 } else if (config().isAutoRead()) { 28 // This channel was registered before and autoRead() is set. This means we need to begin read 29 // again so that we process inbound data. 30 // 31 // See https://github.com/netty/netty/issues/4805 32 beginRead(); 33 } 34 } 35 } catch (Throwable t) { 36 // Close the channel directly to avoid FD leak. 37 closeForcibly(); 38 closeFuture.setClosed(); 39 safeSetFailure(promise, t); 40 } 41 }
doRegister()方法實際上就是 Java NIO 中將通道註冊到 Selector 上的操作:
1 selectionKey = javaChannel().register(eventLoop().selector, 0, this);//這裏感興趣的事件傳入的是 0
pipeline.invokeHandlerAddedIfNeeded() 和 pipeline.fireChannelRegistered() 則是用來添加 Handler 並觸發 Handler 別添加的事件的動作。
在 isActive()這個方法,由於當前是 NioServerSocketChannel,所以實際上是判斷當前通道是否成功綁定到一個地址,很顯然到目前為止,只是創建了通道並註冊到 Selector 上,還沒由綁定。
3. 綁定指定端口
在 initAndRegister()方法結束後,main 線程開始調用 doBind0()方法,該方法將綁定操作封裝成任務交給 Netty 線程去執行。最後,調用 DefaultPipeline 中的 HeadContext 的 bind()方法,然後通過 unsafe.bind(localAddress,promise)完成綁定:
1 @Override 2 public final void bind(final SocketAddress localAddress, final ChannelPromise promise) { 3 //... 4 //顯然這裏返回的是 false 5 boolean wasActive = isActive(); 6 try { 7 //綁定操作 8 doBind(localAddress); 9 } catch (Throwable t) { 10 safeSetFailure(promise, t); 11 closeIfClosed(); 12 return; 13 } 14 15 if (!wasActive && isActive()) { 16 invokeLater(new Runnable() { 17 @Override 18 public void run() { 19 //這裏才是觸發服務端 Channel 激活事件的地方 20 pipeline.fireChannelActive(); 21 } 22 }); 23 } 24 25 safeSetSuccess(promise); 26 }
這個過程,建議 Debug 跟一下代碼,比較清楚代碼是如何一步一步到 HeadContext 中來的。接下來分析一下 doBind()方法:
1 @Override 2 protected void doBind(SocketAddress localAddress) throws Exception { 3 if (PlatformDependent.javaVersion() >= 7) { 4 javaChannel().bind(localAddress, config.getBacklog()); 5 } else { 6 javaChannel().socket().bind(localAddress, config.getBacklog()); 7 } 8 }
最終是根據平臺及其 Java 版本來調用 JDK 中的綁定方法。在綁定完成後,會觸發通道激活事件,在 HeadContext 中經過時,發現它裏面有這麽一行代碼:
1 readIfIsAutoRead();
Debug 一下,發現這個方法最終會調用到 HeadContext 的 read()方法,該方法是調用了 unsafe.beginRead(),緊接著就到了 AbstractNioChannel 的 doBeginRead()方法:
1 @Override 2 protected void doBeginRead() throws Exception { 3 // Channel.read() or ChannelHandlerContext.read() was called 4 final SelectionKey selectionKey = this.selectionKey; 5 if (!selectionKey.isValid()) { 6 return; 7 } 8 9 readPending = true; 10 11 final int interestOps = selectionKey.interestOps(); 12 if ((interestOps & readInterestOp) == 0) {//說明對 OP_ACCEPT 不感興趣 13 selectionKey.interestOps(interestOps | readInterestOp);//通過 | 修改感興趣的事件 14 } 15 }
前面通過反射創建 NioServerSocketChannel 對象時,調用了父類也就是 AbstractNioChannel 的構造方法,將 readInterestOp 設置為 16 了,在 NIO 中就是 OP_ACCEPT。從此,該 NioServerSocketChannel 就可以接收客戶端連接了。
4. 總結
在 Netty 服務端啟動過程中,主線程僅僅是創建了 EventLoopGroup 和啟動引導對象,然後發起綁定操作。這個過程中的綁定,註冊等操作都是主線程封裝成任務交給 Netty 線程去執行的。
由於 Netty 代碼中抽象類和接口都比較多,所以某些地方調用的方法有很多種實現,不熟悉的時候可以通過 Debug 來確定。
Netty 服務端啟動過程