Netty源碼分析之NioEventLoop(二)—NioEventLoop的啟動
上篇文章中我們對Netty中NioEventLoop創建流程與源碼進行了跟蹤分析。本篇文章中我們接著分析NioEventLoop的啟動流程;
Netty中會在服務端啟動和新連接接入時通過chooser選擇器,分別為NioServerSocketChannel與NioSocketChannel選擇綁定一個NioEventLoop,接下來我們就分別從這兩個方面梳理NioEventLoop的啟動源碼
一、服務端啟動
首先我們結合下圖看下Netty服務啟動過程中,NioServerSocketChannel綁定的NioEventLoop啟動流程
bind()部分源碼我們在之前服務端啟動過程中進行過說明,我們進一步跟蹤進入doBind0()方法中可以看到channel.eventLoop().execute的執行,需要說明的是這裏其實啟動的NioServerSocketChannel綁定的 bossGroup,用來負責處理新連接接入的。
/** * read by jsf * * @param regFuture * @param channel * @param localAddress * @param promise */ private static void doBind0(final ChannelFuture regFuture, final Channel channel, final SocketAddress localAddress, final ChannelPromise promise) {//該方法向 NioServerSocketChannel 的 eventLoop 提交了一個任務,當 future(其實就是 promise) 成功後執行 //NioServerSocketChannel 的 bind 方法,並添加一個關閉監聽器。我們主要關註 bind 方法。 // This method is invoked before channelRegistered() is triggered. Give user // handlers a chance to set up // the pipeline in its channelRegistered() implementation.channel.eventLoop().execute(new Runnable() { @Override public void run() { if (regFuture.isSuccess()) { channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE); } else { promise.setFailure(regFuture.cause()); } } }); }
進入NioEventLoop父類SingleThreadEventExecutor中的execute方法,改方法通過inEventLoop()會首先判斷當前的線程是否是NioEventLoop本身綁定的線程,結合inEventLoop的代碼可以看到NioEventLoop本身線程還未初始化為空,這裏返回false,執行啟動線程操作,同時會任務放入任務隊列中。
@Override public void execute(Runnable task) { if (task == null) { throw new NullPointerException("task"); } //首先判斷當前線程是否是該EventLoop綁定的線程 boolean inEventLoop = inEventLoop(); //把傳入的任務加入任務對立 addTask(task); if (!inEventLoop) {//如果不是同一條線程 startThread(); if (isShutdown() && removeTask(task)) { reject(); } } if (!addTaskWakesUp && wakesUpForTask(task)) { wakeup(inEventLoop); } }
@Override public boolean inEventLoop(Thread thread) { return thread == this.thread; }
繼續跟蹤進入startThread()方法中
private void startThread() { if (state == ST_NOT_STARTED) { if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) { try { doStartThread(); } catch (Throwable cause) { STATE_UPDATER.set(this, ST_NOT_STARTED); PlatformDependent.throwException(cause); } } } }
在 doStartThread()中主要實現了以下功能:
1、執行傳入的ThreadPerTaskExecutor的execute方法,創建一個新的線程,並與這個NioEventLoop對象綁定;
2、在開啟的線程中執行SingleThreadEventExecutor.this.run(),也就是NioEventLoop的run方法,開始NioEventLoop的執行操作;
private void doStartThread() { assert thread == null; //線程執行器通過線程工廠創建線程 executor.execute(new Runnable() { @Override public void run() { //開啟線程,並賦值 thread = Thread.currentThread(); if (interrupted) { thread.interrupt(); } boolean success = false; updateLastExecutionTime(); try { //執行NioEventLoop的run方法 SingleThreadEventExecutor.this.run(); success = true; } catch (Throwable t) { logger.warn("Unexpected exception from an event executor: ", t); } finally { for (;;) { int oldState = state; if (oldState >= ST_SHUTTING_DOWN || STATE_UPDATER.compareAndSet( SingleThreadEventExecutor.this, oldState, ST_SHUTTING_DOWN)) { break; } } // Check if confirmShutdown() was called at the end of the loop. if (success && gracefulShutdownStartTime == 0) { if (logger.isErrorEnabled()) { logger.error("Buggy " + EventExecutor.class.getSimpleName() + " implementation; " + SingleThreadEventExecutor.class.getSimpleName() + ".confirmShutdown() must " + "be called before run() implementation terminates."); } } try { // Run all remaining tasks and shutdown hooks. for (;;) { if (confirmShutdown()) { break; } } } finally { try { cleanup(); } finally { STATE_UPDATER.set(SingleThreadEventExecutor.this, ST_TERMINATED); threadLock.release(); if (!taskQueue.isEmpty()) { if (logger.isWarnEnabled()) { logger.warn("An event executor terminated with " + "non-empty task queue (" + taskQueue.size() + ‘)‘); } } terminationFuture.setSuccess(null); } } } } }); }
OK到這一步,基於服務端啟動綁定端口的NioServerSocketChannel,也就是服務端Channel綁定的NioEventLoop已經啟動。
二、新連接接入
首先我們結合下圖看下當有客戶端接入時,創建NioSocketChannel,然後綁定NioEventLoop並啟動的流程
服務端啟動時會在NioServerSocketChannel的任務鏈中添加ServerBootstrapAcceptor對象,這就是用來處理新新連接接入的
p.addLast(new ChannelInitializer<Channel>() { @Override public void initChannel(final Channel ch) throws Exception { final ChannelPipeline pipeline = ch.pipeline(); ChannelHandler handler = config.handler(); if (handler != null) { pipeline.addLast(handler); } // 服務端NioServerSocketChannel的pipeline中添加ServerBootstrapAcceptor ch.eventLoop().execute(new Runnable() { @Override public void run() { pipeline.addLast(new ServerBootstrapAcceptor( ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs)); } }); } });
在新連接接入事件觸發時,執行unsafe.read();
private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) { final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe(); if (!k.isValid()) { final EventLoop eventLoop; try { eventLoop = ch.eventLoop(); } catch (Throwable ignored) { // If the channel implementation throws an exception because there is no event loop, we ignore this // because we are only trying to determine if ch is registered to this event loop and thus has authority // to close ch. return; } // Only close ch if ch is still registered to this EventLoop. ch could have deregistered from the event loop // and thus the SelectionKey could be cancelled as part of the deregistration process, but the channel is // still healthy and should not be closed. // See https://github.com/netty/netty/issues/5125 if (eventLoop != this || eventLoop == null) { return; } // close the channel if the key is not valid anymore unsafe.close(unsafe.voidPromise()); return; } try { int readyOps = k.readyOps(); // We first need to call finishConnect() before try to trigger a read(...) or write(...) as otherwise // the NIO JDK channel implementation may throw a NotYetConnectedException. if ((readyOps & SelectionKey.OP_CONNECT) != 0) { // remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking // See https://github.com/netty/netty/issues/924 int ops = k.interestOps(); ops &= ~SelectionKey.OP_CONNECT; k.interestOps(ops); unsafe.finishConnect(); } // Process OP_WRITE first as we may be able to write some queued buffers and so free memory. if ((readyOps & SelectionKey.OP_WRITE) != 0) { // Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write ch.unsafe().forceFlush(); } // Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead // to a spin loop //新連接接入 if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) { unsafe.read(); } } catch (CancelledKeyException ignored) { unsafe.close(unsafe.voidPromise()); } }
unsafe.read()的具體實現為NioMessageUnsafe中的read(),在read()方法中主要實現了兩個功能:
1、創建客戶端Channel,也就是NioSocketChannel;
2、開始服務端NioServerSocketChannel的任務鏈傳遞,首先執行之前已經加入任務鏈的ServerBootstrapAcceptor中的channelRead
@Override public void read() { assert eventLoop().inEventLoop(); final ChannelConfig config = config(); final ChannelPipeline pipeline = pipeline(); final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle(); allocHandle.reset(config); boolean closed = false; Throwable exception = null; try { try { do { //這裏創建客戶端連接,也就是NioSocketChannelChannel int localRead = doReadMessages(readBuf); if (localRead == 0) { break; } if (localRead < 0) { closed = true; break; } allocHandle.incMessagesRead(localRead); } while (allocHandle.continueReading()); } catch (Throwable t) { exception = t; } int size = readBuf.size(); for (int i = 0; i < size; i ++) { readPending = false; //在這裏開始NioServerSocketChannel的任務鏈傳遞,會首先執行ServerBootstrapAcceptor中的channelRead pipeline.fireChannelRead(readBuf.get(i)); } readBuf.clear(); allocHandle.readComplete(); pipeline.fireChannelReadComplete(); if (exception != null) { closed = closeOnReadError(exception); pipeline.fireExceptionCaught(exception); } if (closed) { inputShutdown = true; if (isOpen()) { close(voidPromise()); } } } finally { // Check if there is a readPending which was not processed yet. // This could be for two reasons: // * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method // * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method // // See https://github.com/netty/netty/issues/2254 if (!readPending && !config.isAutoRead()) { removeReadOp(); } } }
接下來在ServerBootstrapAcceptor中的channelRead中會獲取到傳入的NioSocketChannel,針對NioSocketChannel主要會執行以下操作:
1、配置childHandler任務鏈;
2、配置childOptions;
3、為NioSocketChannel分配NioEventLoop
@Override @SuppressWarnings("unchecked") public void channelRead(ChannelHandlerContext ctx, Object msg) { final Channel child = (Channel) msg; //配置childHandler任務鏈 child.pipeline().addLast(childHandler); //配置childOptions setChannelOptions(child, childOptions, logger); for (Entry<AttributeKey<?>, Object> e: childAttrs) { child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue()); } try { //為新連接分配NioEventLoop,並啟動執行 childGroup.register(child).addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { if (!future.isSuccess()) { forceClose(child, future.cause()); } } }); } catch (Throwable t) { forceClose(child, t); } }
看以看到EventLoopGroup中register具體實實現:
1、關於next(),我們之前講過是專門用來分配NioEventLoop;
2、register()主要負責了EventLoop的綁定和啟動;
@Override public ChannelFuture register(ChannelPromise promise) { return next().register(promise); }
@Override public final void register(EventLoop eventLoop, final ChannelPromise promise) { if (eventLoop == null) { throw new NullPointerException("eventLoop"); } if (isRegistered()) { promise.setFailure(new IllegalStateException("registered to an event loop already")); return; } if (!isCompatible(eventLoop)) { promise.setFailure( new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName())); return; } //與NioEventLoop綁定 AbstractChannel.this.eventLoop = eventLoop; //首先判斷線程是否一致,當前線程是NioServerSocketChannel的線程,與當前創建NioSocketChannel的eventLoop線程不一致 if (eventLoop.inEventLoop()) { register0(promise); } else { try { //在這裏NioEventLoop啟動 eventLoop.execute(new Runnable() { @Override public void run() { register0(promise); } }); } catch (Throwable t) { logger.warn( "Force-closing a channel whose registration task was not accepted by an event loop: {}", AbstractChannel.this, t); closeForcibly(); closeFuture.setClosed(); safeSetFailure(promise, t); } } }
上面代碼中的 eventLoop.execute我們已經分析過,經過一系列的流程,最後會執行NioEventLoop的run方法開始輪詢感興趣的IO事件。
以上我們主要從服務啟動與客戶端連接兩個方面分析了NioEventLoop的啟動流程與源碼,其實也就對應NioServerSocketChannel與NioSocketChannel分別綁定的NioEventLoop,其中有錯誤和不足之處還請指正與海涵。
Netty源碼分析之NioEventLoop(二)—NioEventLoop的啟動