Netty(一):server啟動流程解析
netty作為一個被廣泛應用的通訊框架,有必要我們多瞭解一點。
實際上netty的幾個重要的技術亮點:
1. reactor的執行緒模型;
2. 安全有效的nio非阻塞io模型應用;
3. pipeline流水線式的靈活處理過程;
4. channelHandler的靈活實現;
5. 提供許多開箱即用的處理器和編解碼器;
我們可以從這些點去深入理解其過人之處。
1. 一個NettyServer的demo
要想深入理解某個框架,一般還是要以demo作為一個抓手點的。以下,我們可以看到一個簡單的nettyServer的建立過程,即netty的quick start樣例吧。
@Slf4j public class NettyServerHelloApplication { /** * 一個server的樣例 */ public static void main(String[] args) throws Exception { // 1. 建立對應的EventLoop執行緒池備用, 分bossGroup和workerGroup EventLoopGroup bossGroup = new NioEventLoopGroup(1); EventLoopGroup workerGroup = new NioEventLoopGroup(4); try { // 2. 建立netty對應的入口核心類 ServerBootstrap ServerBootstrap b = new ServerBootstrap(); // 3. 設定server的各項引數,以及應用處理器 b.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG, 100) // 設定tcp協議的請求等待佇列 .childHandler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) throws Exception { // 3.2. 最重要的,將各channelHandler繫結到netty的上下文中(暫且這麼說吧) ChannelPipeline p = ch.pipeline(); p.addLast(new LoggingHandler(LogLevel.INFO)); p.addLast("encoder", new MessageEncoder()); p.addLast("decoder", new MessageDecoder()); p.addLast(new EchoServerHandler()); } }); // 4. 繫結tcp埠開啟服務端監聽, sync() 保證執行完成所有任務 ChannelFuture f = b.bind(ServerConstant.PORT).sync(); // 5. 等待關閉訊號,讓業務執行緒去服務業務了 f.channel().closeFuture().sync(); } finally { // 6. 收到關閉訊號後,優雅關閉server的執行緒池,保護應用 bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } }
以上,就是一個簡版的nettyServer的整個框架了,這也基本上整個nettyServer的程式設計正規化了。主要即分為這麼幾步:
1. 建立對應的EventLoop執行緒池備用, 分bossGroup和workerGroup;
2. 建立netty對應的入口核心類 ServerBootstrap;
3. 設定server的各項引數,以及應用處理器(必備的channelHandler業務接入過程);
4. 繫結tcp埠開啟服務端監聽;
5. 等待關閉訊號,讓業務執行緒去服務業務了;
6. 收到關閉訊號後,優雅關閉server的執行緒池,保護應用;
事實上,如果我們直接基於jdk提供的ServerSocketChannel是否也差不了多少呢?是的,至少表面看起來是的,但我們要處理許多的異常情況,且可能面對變化繁多的業務型別。又該如何呢?
畢竟一個框架的成功,絕非偶然。下面我們就這幾個過程來看看netty都是如何處理的吧!
2. EventLoop 的建立
EventLoop 直譯為事件迴圈,但在這裡我們也可以理解為一個執行緒池,因為所有的事件都是提交給其處理的。那麼,它倒底是個什麼樣的迴圈呢?
首先來看下其類繼承情況:
從類圖可以看出,EventLoop也是一個executor或者說執行緒池的實現,它們也許有相通之處。
// 呼叫方式如下 EventLoopGroup bossGroup = new NioEventLoopGroup(1); EventLoopGroup workerGroup = new NioEventLoopGroup(4); // io.netty.channel.nio.NioEventLoopGroup#NioEventLoopGroup(int, java.util.concurrent.ThreadFactory) /** * Create a new instance using the specified number of threads, the given {@link ThreadFactory} and the * {@link SelectorProvider} which is returned by {@link SelectorProvider#provider()}. */ public NioEventLoopGroup(int nThreads, ThreadFactory threadFactory) { this(nThreads, threadFactory, SelectorProvider.provider()); } public NioEventLoopGroup( int nThreads, Executor executor, final SelectorProvider selectorProvider) { this(nThreads, executor, selectorProvider, DefaultSelectStrategyFactory.INSTANCE); } public NioEventLoopGroup(int nThreads, Executor executor, final SelectorProvider selectorProvider, final SelectStrategyFactory selectStrategyFactory) { super(nThreads, executor, selectorProvider, selectStrategyFactory, RejectedExecutionHandlers.reject()); } // io.netty.channel.MultithreadEventLoopGroup#MultithreadEventLoopGroup(int, java.util.concurrent.Executor, java.lang.Object...) protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) { // 預設執行緒是 cpu * 2 super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args); } // io.netty.util.concurrent.MultithreadEventExecutorGroup#MultithreadEventExecutorGroup(int, java.util.concurrent.Executor, java.lang.Object...) /** * Create a new instance. * * @param nThreads the number of threads that will be used by this instance. * @param executor the Executor to use, or {@code null} if the default should be used. * @param args arguments which will passed to each {@link #newChild(Executor, Object...)} call */ protected MultithreadEventExecutorGroup(int nThreads, Executor executor, Object... args) { this(nThreads, executor, DefaultEventExecutorChooserFactory.INSTANCE, args); } // io.netty.util.concurrent.MultithreadEventExecutorGroup#MultithreadEventExecutorGroup(int, java.util.concurrent.Executor, io.netty.util.concurrent.EventExecutorChooserFactory, java.lang.Object...) /** * Create a new instance. * * @param nThreads the number of threads that will be used by this instance. * @param executor the Executor to use, or {@code null} if the default should be used. * @param chooserFactory the {@link EventExecutorChooserFactory} to use. * @param args arguments which will passed to each {@link #newChild(Executor, Object...)} call */ protected MultithreadEventExecutorGroup(int nThreads, Executor executor, EventExecutorChooserFactory chooserFactory, Object... args) { if (nThreads <= 0) { throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads)); } // 建立一個執行器,該執行器每提交一個任務,就建立一個執行緒來執行,即並沒有佇列的概念 if (executor == null) { executor = new ThreadPerTaskExecutor(newDefaultThreadFactory()); } // 使用一個數組來儲存整個可用的執行緒池 children = new EventExecutor[nThreads]; for (int i = 0; i < nThreads; i ++) { boolean success = false; try { // 為每個child建立一個執行緒執行, 該方法由子類實現 children[i] = newChild(executor, args); success = true; } catch (Exception e) { // TODO: Think about if this is a good exception type throw new IllegalStateException("failed to create a child event loop", e); } finally { if (!success) { // 如果建立失敗,則把已經建立好的執行緒池關閉掉 // 不過值得注意的是,當某個執行緒池建立失敗後,並沒有立即停止後續建立工作,即無 return 操作,這是為啥? // 實際上,發生異常時,Exeception 已經被丟擲,此處無需關注 for (int j = 0; j < i; j ++) { children[j].shutdownGracefully(); } for (int j = 0; j < i; j ++) { EventExecutor e = children[j]; try { while (!e.isTerminated()) { e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS); } } catch (InterruptedException interrupted) { // Let the caller handle the interruption. Thread.currentThread().interrupt(); break; } } } } } // 建立選擇器,猜測是做負載均衡時使用 // 此處的chooser預設是 DefaultEventExecutorChooserFactory chooser = chooserFactory.newChooser(children); final FutureListener<Object> terminationListener = new FutureListener<Object>() { @Override public void operationComplete(Future<Object> future) throws Exception { if (terminatedChildren.incrementAndGet() == children.length) { terminationFuture.setSuccess(null); } } }; for (EventExecutor e: children) { e.terminationFuture().addListener(terminationListener); } Set<EventExecutor> childrenSet = new LinkedHashSet<EventExecutor>(children.length); Collections.addAll(childrenSet, children); readonlyChildren = Collections.unmodifiableSet(childrenSet); } // io.netty.channel.nio.NioEventLoopGroup#newChild @Override protected EventLoop newChild(Executor executor, Object... args) throws Exception { // 注意此處的引數型別是由外部進行保證的,在此直接做強轉操作 return new NioEventLoop(this, executor, (SelectorProvider) args[0], ((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2]); } // io.netty.channel.nio.NioEventLoop#NioEventLoop NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider, SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler) { // 此構造器會做很多事,比如建立佇列,開啟nio selector... super(parent, executor, false, DEFAULT_MAX_PENDING_TASKS, rejectedExecutionHandler); if (selectorProvider == null) { throw new NullPointerException("selectorProvider"); } if (strategy == null) { throw new NullPointerException("selectStrategy"); } provider = selectorProvider; final SelectorTuple selectorTuple = openSelector(); selector = selectorTuple.selector; unwrappedSelector = selectorTuple.unwrappedSelector; selectStrategy = strategy; } // io.netty.util.concurrent.DefaultEventExecutorChooserFactory#newChooser @SuppressWarnings("unchecked") @Override public EventExecutorChooser newChooser(EventExecutor[] executors) { // 如: 1,2,4,8... 都會建立 PowerOfTwoEventExecutorChooser if (isPowerOfTwo(executors.length)) { return new PowerOfTwoEventExecutorChooser(executors); } else { return new GenericEventExecutorChooser(executors); } } // io.netty.util.concurrent.DefaultPromise#addListener @Override public Promise<V> addListener(GenericFutureListener<? extends Future<? super V>> listener) { checkNotNull(listener, "listener"); synchronized (this) { addListener0(listener); } if (isDone()) { notifyListeners(); } return this; }
以上,就是 NioEventLoopGroup 的建立過程了. 本質上其就是一個個的單獨的執行緒組成的陣列列表, 等待被呼叫.
3. ServerBootstrap 的建立
ServerBootstrap是Netty的一個服務端核心入口類, 它可以很快速的建立一個穩定的netty服務.
ServerBootstrap 的類圖如下:
還是非常純粹的啊!其中有意思是的, ServerBootstrap繼承自 AbstractBootstrap, 而這個 AbstractBootstrap 是一個自依賴的抽象類: AbstractBootstrap<B extends AbstractBootstrap<B, C>, C extends Channel> , 這樣,即父類可以直接返回子類的資訊了。
其預設構造方法為空,所以所以引數都使用預設值, 因為還有後續的引數設定過程,接下來,我們看看其一些關鍵引數的設定:
// 1. channel的設定 // io.netty.bootstrap.AbstractBootstrap#channel /** * The {@link Class} which is used to create {@link Channel} instances from. * You either use this or {@link #channelFactory(io.netty.channel.ChannelFactory)} if your * {@link Channel} implementation has no no-args constructor. */ public B channel(Class<? extends C> channelClass) { if (channelClass == null) { throw new NullPointerException("channelClass"); } // 預設使用構造器反射的方式建立 channel return channelFactory(new ReflectiveChannelFactory<C>(channelClass)); } // io.netty.bootstrap.AbstractBootstrap#channelFactory(io.netty.channel.ChannelFactory<? extends C>) /** * {@link io.netty.channel.ChannelFactory} which is used to create {@link Channel} instances from * when calling {@link #bind()}. This method is usually only used if {@link #channel(Class)} * is not working for you because of some more complex needs. If your {@link Channel} implementation * has a no-args constructor, its highly recommend to just use {@link #channel(Class)} for * simplify your code. */ @SuppressWarnings({ "unchecked", "deprecation" }) public B channelFactory(io.netty.channel.ChannelFactory<? extends C> channelFactory) { return channelFactory((ChannelFactory<C>) channelFactory); } // io.netty.bootstrap.AbstractBootstrap#channelFactory(io.netty.bootstrap.ChannelFactory<? extends C>) /** * @deprecated Use {@link #channelFactory(io.netty.channel.ChannelFactory)} instead. */ @Deprecated public B channelFactory(ChannelFactory<? extends C> channelFactory) { if (channelFactory == null) { throw new NullPointerException("channelFactory"); } if (this.channelFactory != null) { throw new IllegalStateException("channelFactory set already"); } this.channelFactory = channelFactory; return self(); } @SuppressWarnings("unchecked") private B self() { return (B) this; } // 2. option 引數選項設定, 它會承包各種特殊配置的設定, 是一個通用配置項設定的入口 /** * Allow to specify a {@link ChannelOption} which is used for the {@link Channel} instances once they got * created. Use a value of {@code null} to remove a previous set {@link ChannelOption}. */ public <T> B option(ChannelOption<T> option, T value) { if (option == null) { throw new NullPointerException("option"); } // options 是一個 new LinkedHashMap<ChannelOption<?>, Object>(), 即非執行緒安全的容器, 所以設定值時要求使用 synchronized 保證執行緒安全 // value 為null時代表要將該選項設定刪除, 如果key相同,後面的配置將會覆蓋前面的配置 if (value == null) { synchronized (options) { options.remove(option); } } else { synchronized (options) { options.put(option, value); } } return self(); } // 3. childHandler 新增channelHandler, 這是一個最重要的一個方法, 它會影響到後面的業務處理統籌 // 呼叫該方法僅將 channelHandler的上下文加入進來, 實際還未進行真正的新增操作 .childHandler(new ChannelInitializer<SocketChannel>() { ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG, 100) // 設定tcp協議的請求等待佇列 .childHandler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) throws Exception { ChannelPipeline p = ch.pipeline(); p.addLast(new LoggingHandler(LogLevel.INFO)); p.addLast("encoder", new MessageEncoder()); p.addLast("decoder", new MessageDecoder()); p.addLast(new EchoServerHandler()); } }); /** * Set the {@link ChannelHandler} which is used to serve the request for the {@link Channel}'s. */ public ServerBootstrap childHandler(ChannelHandler childHandler) { if (childHandler == null) { throw new NullPointerException("childHandler"); } // 僅將 channelHandler 繫結到netty的上下文中 this.childHandler = childHandler; return this; } // 4. bossGroup, workGroup 如何被分配 ? /** * Set the {@link EventLoopGroup} for the parent (acceptor) and the child (client). These * {@link EventLoopGroup}'s are used to handle all the events and IO for {@link ServerChannel} and * {@link Channel}'s. */ public ServerBootstrap group(EventLoopGroup parentGroup, EventLoopGroup childGroup) { // parentGroup 是給acceptor使用的, 主要用於對socket連線的接入,所以一般一個執行緒也夠了 super.group(parentGroup); if (childGroup == null) { throw new NullPointerException("childGroup"); } if (this.childGroup != null) { throw new IllegalStateException("childGroup set already"); } // childGroup 主要用於接入後的socket的事件的處理,一般要求數量較多,視業務屬性決定 this.childGroup = childGroup; return this; }
bind 繫結tcp埠,這個是真正觸發server初始化的一步,工作量比較大,我們另開一段講解。
4. nettyServer 的初始化
前面所有工作都是在準備, 都並未體現在外部, 而 bind 則會是開啟一個對外服務, 對外可見, 真正啟動server.
// io.netty.bootstrap.AbstractBootstrap#bind(int) /** * Create a new {@link Channel} and bind it. */ public ChannelFuture bind(int inetPort) { return bind(new InetSocketAddress(inetPort)); } // io.netty.bootstrap.AbstractBootstrap#bind(java.net.SocketAddress) /** * Create a new {@link Channel} and bind it. */ public ChannelFuture bind(SocketAddress localAddress) { // 先驗證各種引數是否設定完整, 如執行緒池是否設定, channelHandler 是否設定... validate(); if (localAddress == null) { throw new NullPointerException("localAddress"); } // 繫結tcp埠 return doBind(localAddress); } private ChannelFuture doBind(final SocketAddress localAddress) { // 1. 建立一些channel使用, 與eventloop繫結, 統一管理嘛 final ChannelFuture regFuture = initAndRegister(); final Channel channel = regFuture.channel(); if (regFuture.cause() != null) { return regFuture; } if (regFuture.isDone()) { // At this point we know that the registration was complete and successful. ChannelPromise promise = channel.newPromise(); // 2. 註冊成功之後, 開始實際的 bind() 操作, 實際就是呼叫 channel.bind() // doBind0() 是一個非同步的操作,所以使用的一個 promise 作為結果驅動 doBind0(regFuture, channel, localAddress, promise); return promise; } else { // Registration future is almost always fulfilled already, but just in case it's not. final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel); regFuture.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { Throwable cause = future.cause(); if (cause != null) { // Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an // IllegalStateException once we try to access the EventLoop of the Channel. promise.setFailure(cause); } else { // Registration was successful, so set the correct executor to use. // See https://github.com/netty/netty/issues/2586 promise.registered(); doBind0(regFuture, channel, localAddress, promise); } } }); return promise; } }
所以,從整體來說,bind()過程分兩大步走:1. 初始化channel,與nio關聯; 2. 落實channel和本地埠的繫結工作; 我們來細看下:
4.1 初始化channel
初始化channel, 並註冊到 selector上, 這個操作實際上非常重要。
// 以下我們先看下執行框架 // io.netty.bootstrap.AbstractBootstrap#initAndRegister final ChannelFuture initAndRegister() { Channel channel = null; try { // 即根據前面設定的channel 使用反射建立一個例項出來 // 即此處將會例項化出一個 ServerSocketChannel 出來 // 所以如果你想用jdk的nio實現,則設定channel時使用 NioServerSocketChannel.class即可, 而你想使用其他更優化的實現時比如EpollServerSocketChannel時,改變一下即可 // 而此處的 channelFactory 就是一個反射的實現 ReflectiveChannelFactory, 它會呼叫如上channel的無參構造方法例項化 // 重點工作就需要在這個無參構造器中完成,我們接下來看看 channel = channelFactory.newChannel(); // 初始化channel的一些公共引數, 相當於做一些屬性的繼承, 因為後續它將不再依賴 ServerBootstrap, 它需要有獨立自主能力 init(channel); } catch (Throwable t) { if (channel != null) { // channel can be null if newChannel crashed (eg SocketException("too many open files")) channel.unsafe().closeForcibly(); // as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t); } // as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor return new DefaultChannelPromise(new FailedChannel(), GlobalEventExecutor.INSTANCE).setFailure(t); } // 註冊建立好的 channel 到eventLoop中 ChannelFuture regFuture = config().group().register(channel); if (regFuture.cause() != null) { if (channel.isRegistered()) { channel.close(); } else { channel.unsafe().closeForcibly(); } } // If we are here and the promise is not failed, it's one of the following cases: // 1) If we attempted registration from the event loop, the registration has been completed at this point. // i.e. It's safe to attempt bind() or connect() now because the channel has been registered. // 2) If we attempted registration from the other thread, the registration request has been successfully // added to the event loop's task queue for later execution. // i.e. It's safe to attempt bind() or connect() now: // because bind() or connect() will be executed *after* the scheduled registration task is executed // because register(), bind(), and connect() are all bound to the same thread. return regFuture; } // 1. 先看看 NioServerSocketChannel 的構造過程 // io.netty.channel.socket.nio.NioServerSocketChannel#NioServerSocketChannel() /** * Create a new instance */ public NioServerSocketChannel() { // newSocket 簡單說就是建立一個本地socket, api呼叫: SelectorProvider.provider().openServerSocketChannel() // 但此時本 socket 並未和任何埠繫結 this(newSocket(DEFAULT_SELECTOR_PROVIDER)); } /** * Create a new instance using the given {@link ServerSocketChannel}. */ public NioServerSocketChannel(ServerSocketChannel channel) { // 註冊 OP_ACCEPT 事件 super(null, channel, SelectionKey.OP_ACCEPT); // 此處的 javaChannel() 實際就是 channel, 這樣呼叫只是為統一吧 // 建立一個新的 socket 傳入 NioServerSocketChannelConfig 中 // 主要用於一些 RecvByteBufAllocator 的設定,及channel的儲存 config = new NioServerSocketChannelConfig(this, javaChannel().socket()); } // io.netty.channel.nio.AbstractNioChannel#AbstractNioChannel /** * Create a new instance * * @param parent the parent {@link Channel} by which this instance was created. May be {@code null} * @param ch the underlying {@link SelectableChannel} on which it operates * @param readInterestOp the ops to set to receive data from the {@link SelectableChannel} */ protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) { // 先讓父類初始化必要的上下文 super(parent); // 保留 channel 資訊,並設定非阻塞標識 this.ch = ch; this.readInterestOp = readInterestOp; try { ch.configureBlocking(false); } catch (IOException e) { try { ch.close(); } catch (IOException e2) { if (logger.isWarnEnabled()) { logger.warn( "Failed to close a partially initialized socket.", e2); } } throw new ChannelException("Failed to enter non-blocking mode.", e); } } // io.netty.channel.AbstractChannel#AbstractChannel(io.netty.channel.Channel) /** * Creates a new instance. * * @param parent * the parent of this channel. {@code null} if there's no parent. */ protected AbstractChannel(Channel parent) { // 初始化上下文 this.parent = parent; // DefaultChannelId id = newId(); // NioMessageUnsafe unsafe = newUnsafe(); // new DefaultChannelPipeline(this); // 比較重要,將會初始化 head, tail 節點 pipeline = newChannelPipeline(); } // io.netty.channel.DefaultChannelPipeline#DefaultChannelPipeline protected DefaultChannelPipeline(Channel channel) { this.channel = ObjectUtil.checkNotNull(channel, "channel"); succeededFuture = new SucceededChannelFuture(channel, null); voidPromise = new VoidChannelPromise(channel, true); // 初始化 head, tail tail = new TailContext(this); head = new HeadContext(this); // 構成雙向連結串列 head.next = tail; tail.prev = head; } // 2. 初始化channel, 有個最重要的動作是將 Acceptor 接入到 pipeline 中 // io.netty.bootstrap.ServerBootstrap#init @Override void init(Channel channel) throws Exception { final Map<ChannelOption<?>, Object> options = options0(); // 根據前面的設定, 將各種屬性copy過來, 放到 config 欄位中 // 同樣, 因為 options 和 attrs 都不是執行緒安全的, 所以都要上鎖操作 synchronized (options) { setChannelOptions(channel, options, logger); } final Map<AttributeKey<?>, Object> attrs = attrs0(); synchronized (attrs) { for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) { @SuppressWarnings("unchecked") AttributeKey<Object> key = (AttributeKey<Object>) e.getKey(); channel.attr(key).set(e.getValue()); } } // 此處的pipeline, 就是在 NioServerSocketChannel 中初始化好head,tail的pipeline ChannelPipeline p = channel.pipeline(); // childGroup 實際就是外部的 workGroup final EventLoopGroup currentChildGroup = childGroup; final ChannelHandler currentChildHandler = childHandler; final Entry<ChannelOption<?>, Object>[] currentChildOptions; final Entry<AttributeKey<?>, Object>[] currentChildAttrs; synchronized (childOptions) { currentChildOptions = childOptions.entrySet().toArray(newOptionArray(childOptions.size())); } synchronized (childAttrs) { currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(childAttrs.size())); } // 這個就比較重要了, 關聯 ServerBootstrapAcceptor // 主動新增一個 initializer, 它將作為第一個被呼叫的 channelInitializer 存在 // 而 channelInitializer 只會被呼叫一次 p.addLast(new ChannelInitializer<Channel>() { @Override public void initChannel(final Channel ch) throws Exception { final ChannelPipeline pipeline = ch.pipeline(); ChannelHandler handler = config.handler(); if (handler != null) { pipeline.addLast(handler); } ch.eventLoop().execute(new Runnable() { @Override public void run() { // 新增 Acceptor 到 pipeline 中, 形成一個 head -> ServerBootstrapAcceptor -> tail 的pipeline pipeline.addLast(new ServerBootstrapAcceptor( ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs)); } }); } }); // 此操作過後,當前pipeline中,就只有此一handler }
。。。
4.2 handler的新增過程
addLast() 看起來只是一個新增元素的過程, 總體來說就是一個雙向連結串列的新增, 但也蠻有意思的, 有興趣可以戳開詳情看看.
// io.netty.channel.ChannelHandler @Override public final ChannelPipeline addLast(ChannelHandler... handlers) { return addLast(null, handlers); } // io.netty.channel.DefaultChannelPipeline#addLast(io.netty.util.concurrent.EventExecutorGroup, io.netty.channel.ChannelHandler...) @Override public final ChannelPipeline addLast(EventExecutorGroup executor, ChannelHandler... handlers) { if (handlers == null) { throw new NullPointerException("handlers"); } // 支援同時新增多個 handler for (ChannelHandler h: handlers) { if (h == null) { break; } addLast(executor, null, h); } return this; } // io.netty.channel.DefaultChannelPipeline#addLast(io.netty.util.concurrent.EventExecutorGroup, java.lang.String, io.netty.channel.ChannelHandler) @Override public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) { final AbstractChannelHandlerContext newCtx; synchronized (this) { // 重複性檢查 @Shareable 引數使用 checkMultiplicity(handler); // 生成一個新的上下文, filterName()將會生成一個唯一的名稱, 如 ServerBootstrap$1#0 newCtx = newContext(group, filterName(name, handler), handler); // 將當前ctx新增到連結串列中 addLast0(newCtx); // If the registered is false it means that the channel was not registered on an eventloop yet. // In this case we add the context to the pipeline and add a task that will call // ChannelHandler.handlerAdded(...) once the channel is registered. if (!registered) { newCtx.setAddPending(); // 未註冊情況下, 不會進行下一步了 callHandlerCallbackLater(newCtx, true); return this; } // 而已註冊情況下, 則會使用 executor 提交callHandlerAdded0, 即呼叫 pipeline 的頭節點 EventExecutor executor = newCtx.executor(); if (!executor.inEventLoop()) { newCtx.setAddPending(); executor.execute(new Runnable() { @Override public void run() { callHandlerAdded0(newCtx); } }); return this; } } callHandlerAdded0(newCtx); return this; } private AbstractChannelHandlerContext newContext(EventExecutorGroup group, String name, ChannelHandler handler) { return new DefaultChannelHandlerContext(this, childExecutor(group), name, handler); } private void addLast0(AbstractChannelHandlerContext newCtx) { // 一個雙向連結串列儲存上下文 AbstractChannelHandlerContext prev = tail.prev; newCtx.prev = prev; newCtx.next = tail; prev.next = newCtx; tail.prev = newCtx; } // 新增ctx到佇列尾部 private void callHandlerCallbackLater(AbstractChannelHandlerContext ctx, boolean added) { assert !registered; PendingHandlerCallback task = added ? new PendingHandlerAddedTask(ctx) : new PendingHandlerRemovedTask(ctx); PendingHandlerCallback pending = pendingHandlerCallbackHead; if (pending == null) { pendingHandlerCallbackHead = task; } else { // Find the tail of the linked-list. while (pending.next != null) { pending = pending.next; } pending.next = task; } } // 對每一次新增 handler, 則都會產生一個事件, 通知現有的handler, handlerAdded() private void callHandlerAdded0(final AbstractChannelHandlerContext ctx) { try { // We must call setAddComplete before calling handlerAdded. Otherwise if the handlerAdded method generates // any pipeline events ctx.handler() will miss them because the state will not allow it. ctx.setAddComplete(); ctx.handler().handlerAdded(ctx); } catch (Throwable t) { boolean removed = false; try { remove0(ctx); try { ctx.handler().handlerRemoved(ctx); } finally { ctx.setRemoved(); } removed = true; } catch (Throwable t2) { if (logger.isWarnEnabled()) { logger.warn("Failed to remove a handler: " + ctx.name(), t2); } } if (removed) { fireExceptionCaught(new ChannelPipelineException( ctx.handler().getClass().getName() + ".handlerAdded() has thrown an exception; removed.", t)); } else { fireExceptionCaught(new ChannelPipelineException( ctx.handler().getClass().getName() + ".handlerAdded() has thrown an exception; also failed to remove.", t)); } } }檢視 handler 的新增過程
4.3 註冊channel,繫結eventloop執行緒
經過前面兩步, channel已經建立好和初始化好了, 但還沒有看到 eventLoop 的影子. 實際上eventloop和channel間就差一個註冊了.
也就是前面看到的 ChannelFuture regFuture = config().group().register(channel); 此處的group 即是 bossGroup.
// io.netty.channel.MultithreadEventLoopGroup#register(io.netty.channel.Channel) @Override public ChannelFuture register(Channel channel) { // next() 相當於是一個負載均衡器, 會選擇出一個合適的 eventloop 出來, 預設是round-robin return next().register(channel); } // io.netty.channel.MultithreadEventLoopGroup#next @Override public EventLoop next() { return (EventLoop) super.next(); } // io.netty.util.concurrent.MultithreadEventExecutorGroup#next @Override public EventExecutor next() { // 使用前面建立的 PowerOfTwoEventExecutorChooser 進行呼叫 // 預設實現為輪詢 return chooser.next(); } // io.netty.util.concurrent.DefaultEventExecutorChooserFactory.PowerOfTwoEventExecutorChooser#next @Override public EventExecutor next() { return executors[idx.getAndIncrement() & executors.length - 1]; } // io.netty.channel.SingleThreadEventLoop#register(io.netty.channel.Channel) @Override public ChannelFuture register(Channel channel) { // 使用 DefaultChannelPromise 封裝channel, 再註冊到 eventloop 中 return register(new DefaultChannelPromise(channel, this)); } @Override public ChannelFuture register(final ChannelPromise promise) { ObjectUtil.checkNotNull(promise, "promise"); // NioMessageUnsafe promise.channel().unsafe().register(this, promise); return promise; } // io.netty.channel.AbstractChannel.AbstractUnsafe#register @Override public final void register(EventLoop eventLoop, final ChannelPromise promise) { if (eventLoop == null) { throw new NullPointerException("eventLoop"); } if (isRegistered()) { promise.setFailure(new IllegalStateException("registered to an event loop already")); return; } if (!isCompatible(eventLoop)) { promise.setFailure( new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName())); return; } AbstractChannel.this.eventLoop = eventLoop; // inEventLoop() 判斷當前執行緒是否在 eventLoop 中 // 判斷方式為直接比較 eventloop 執行緒也當前執行緒是否是同一個即可 Thread.currentThread() == this.thread; // 核心註冊方法 register0() if (eventLoop.inEventLoop()) { register0(promise); } else { // 不在 eventLoop 中, 則非同步提交任務給 eventloop 處理 try { eventLoop.execute(new Runnable() { @Override public void run() { register0(promise); } }); } catch (Throwable t) { logger.warn( "Force-closing a channel whose registration task was not accepted by an event loop: {}", AbstractChannel.this, t); closeForcibly(); closeFuture.setClosed(); safeSetFailure(promise, t); } } } // register0() 做真正的註冊 // io.netty.channel.AbstractChannel.AbstractUnsafe#register0 private void register0(ChannelPromise promise) { try { // check if the channel is still open as it could be closed in the mean time when the register // call was outside of the eventLoop if (!promise.setUncancellable() || !ensureOpen(promise)) { return; } boolean firstRegistration = neverRegistered; // 具體的註冊邏輯由子類實現, NioServerSocketChannel doRegister(); neverRegistered = false; registered = true; // Ensure we call handlerAdded(...) before we actually notify the promise. This is needed as the // user may already fire events through the pipeline in the ChannelFutureListener. // 幾個擴充套件點: fireHandlerAdded() -> fireChannelRegistered() -> fireChannelActive() // part1: fireChannelAdded(), 它將會回撥上面的 ServerBootstrapAcceptor 的新增 channelInitializer pipeline.invokeHandlerAddedIfNeeded(); safeSetSuccess(promise); // part2: fireChannelRegistered() pipeline.fireChannelRegistered(); // Only fire a channelActive if the channel has never been registered. This prevents firing // multiple channel actives if the channel is deregistered and re-registered. if (isActive()) { if (firstRegistration) { pipeline.fireChannelActive(); } else if (config().isAutoRead()) { // This channel was registered before and autoRead() is set. This means we need to begin read // again so that we process inbound data. // // See https://github.com/netty/netty/issues/4805 beginRead(); } } } catch (Throwable t) { // Close the channel directly to avoid FD leak. closeForcibly(); closeFuture.setClosed(); safeSetFailure(promise, t); } } // io.netty.channel.nio.AbstractNioChannel#doRegister @Override protected void doRegister() throws Exception { boolean selected = false; // 進行註冊即是 JDK 的 ServerSocketChannel.register() 過程 // 即 netty 與 socket 建立了關係連線, ops=0, 代表監聽所有讀事件 for (;;) { try { // 一直註冊直到成功 // 此處 ops=0, 即不關注任何事件哦, 那麼前面的 OP_ACCEPT 和這裡又是什麼關係呢? selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this); return; } catch (CancelledKeyException e) { if (!selected) { // Force the Selector to select now as the "canceled" SelectionKey may still be // cached and not removed because no Select.select(..) operation was called yet. eventLoop().selectNow(); selected = true; } else { // We forced a select operation on the selector before but the SelectionKey is still cached // for whatever reason. JDK bug ? throw e; } } } }
。。。
4.4 ServerBootstrapAcceptor 速覽
前面我們看到, 在做 register() 完了之後, netty 會觸發一個invokeHandlerAddedIfNeeded, 從而呼叫fireHandlerAdded. 此時將會觸發 handlerAdded() 從而首次呼叫 ChannelInitializer.initChannel(), 從而將 ServerBootstrapAcceptor 新增到pipeline進來. ServerBootstrapAcceptor 獨立做的事情不多,更多是交給父類處理。
ServerBootstrapAcceptor( final Channel channel, EventLoopGroup childGroup, ChannelHandler childHandler, Entry<ChannelOption<?>, Object>[] childOptions, Entry<AttributeKey<?>, Object>[] childAttrs) { this.childGroup = childGroup; this.childHandler = childHandler; this.childOptions = childOptions; this.childAttrs = childAttrs; // Task which is scheduled to re-enable auto-read. // It's important to create this Runnable before we try to submit it as otherwise the URLClassLoader may // not be able to load the class because of the file limit it already reached. // // See https://github.com/netty/netty/issues/1328 // enableAutoReadTask = new Runnable() { @Override public void run() { channel.config().setAutoRead(true); } }; } // ServerBootstrapAcceptor 大部分情況下都是普通的 InboundHandler, 除了 channelRead() 時 // io.netty.bootstrap.ServerBootstrap.ServerBootstrapAcceptor#channelRead @Override @SuppressWarnings("unchecked") public void channelRead(ChannelHandlerContext ctx, Object msg) { final Channel child = (Channel) msg; child.pipeline().addLast(childHandler); setChannelOptions(child, childOptions, logger); for (Entry<AttributeKey<?>, Object> e: childAttrs) { child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue()); } try { // 它會向 childGroup 中提交channel過去, 從而使用 childGroup 產生作用 childGroup.register(child).addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { if (!future.isSuccess()) { forceClose(child, future.cause()); } } }); } catch (Throwable t) { forceClose(child, t); } }
。。。
4.5 埠的繫結 doBind0
經過前面的channel的建立,初始化, Acceptor 的新增到handlerAdded(), 整個pipeline已經work起來了. 然後netty會回撥之前新增好的 listeners, 其中一個便是 doBind0();
// 回顧下: ... // Registration future is almost always fulfilled already, but just in case it's not. final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel); regFuture.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { Throwable cause = future.cause(); if (cause != null) { // Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an // IllegalStateException once we try to access the EventLoop of the Channel. promise.setFailure(cause); } else { // Registration was successful, so set the correct executor to use. // See https://github.com/netty/netty/issues/2586 promise.registered(); doBind0(regFuture, channel, localAddress, promise); } } }); ... // io.netty.bootstrap.AbstractBootstrap#doBind0 private static void doBind0( final ChannelFuture regFuture, final Channel channel, final SocketAddress localAddress, final ChannelPromise promise) { // This method is invoked before channelRegistered() is triggered. Give user handlers a chance to set up // the pipeline in its channelRegistered() implementation. // 這還是一個非同步過程 channel.eventLoop().execute(new Runnable() { @Override public void run() { // channel.bind(), channel 與 埠繫結 if (regFuture.isSuccess()) { channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE); } else { promise.setFailure(regFuture.cause()); } } }); } // io.netty.channel.AbstractChannel#bind(java.net.SocketAddress, io.netty.channel.ChannelPromise) @Override public ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) { // bind() 被當作一個普通的出站事件, 在pipeline中被傳遞 return pipeline.bind(localAddress, promise); } // io.netty.channel.DefaultChannelPipeline#bind(java.net.SocketAddress, io.netty.channel.ChannelPromise) @Override public final ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) { // 從tail開始傳遞 return tail.bind(localAddress, promise); } // io.netty.channel.AbstractChannelHandlerContext#bind(java.net.SocketAddress, io.netty.channel.ChannelPromise) @Override public ChannelFuture bind(final SocketAddress localAddress, final ChannelPromise promise) { if (localAddress == null) { throw new NullPointerException("localAddress"); } if (isNotValidPromise(promise, false)) { // cancelled return promise; } // 同樣是一個pipeline式呼叫, bind() 是一個出站事件, 所以查詢 outbound // 最終會調到 DefaultChannelPipeline 中 // netty的pipeline機制就體現在這裡, 它會一直查詢可用的handler, 然後執行它, 直到結束 final AbstractChannelHandlerContext next = findContextOutbound(); // 獲取其繫結的 executor EventExecutor executor = next.executor(); if (executor.inEventLoop()) { next.invokeBind(localAddress, promise); } else { safeExecute(executor, new Runnable() { @Override public void run() { next.invokeBind(localAddress, promise); } }, promise, null); } return promise; } // ------------------------------------------------------------------------- // 出入站handler的查詢實現, 非常簡單, 卻很有效 (該方法在 AbstractChannelHandlerContext 中實現,被所有handler通用) // io.netty.channel.AbstractChannelHandlerContext#findContextInbound private AbstractChannelHandlerContext findContextInbound() { // 以當前節點作為起點開始查詢, 取第一個入站handler返回, 沒有則說明 pipeline 已結束 AbstractChannelHandlerContext ctx = this; do { ctx = ctx.next; } while (!ctx.inbound); return ctx; } // io.netty.channel.AbstractChannelHandlerContext#findContextOutbound private AbstractChannelHandlerContext findContextOutbound() { // 以當前節點作為起點開始查詢, 取第一個出站handler返回, 沒有則說明 pipeline 已結束 AbstractChannelHandlerContext ctx = this; do { ctx = ctx.prev; } while (!ctx.outbound); return ctx; } // ------------------------------------------------------------------------- // io.netty.channel.AbstractChannelHandlerContext#invokeBind private void invokeBind(SocketAddress localAddress, ChannelPromise promise) { if (invokeHandler()) { try { ((ChannelOutboundHandler) handler()).bind(this, localAddress, promise); } catch (Throwable t) { notifyOutboundHandlerException(t, promise); } } else { bind(localAddress, promise); } } // 最終傳遞到 HeadContext 中進行處理 // io.netty.channel.DefaultChannelPipeline.HeadContext#bind @Override public void bind( ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) throws Exception { // unsafe 處理bind() 操作 unsafe.bind(localAddress, promise); } // io.netty.channel.AbstractChannel.AbstractUnsafe#bind @Override public final void bind(final SocketAddress localAddress, final ChannelPromise promise) { assertEventLoop(); if (!promise.setUncancellable() || !ensureOpen(promise)) { return; } // See: https://github.com/netty/netty/issues/576 if (Boolean.TRUE.equals(config().getOption(ChannelOption.SO_BROADCAST)) && localAddress instanceof InetSocketAddress && !((InetSocketAddress) localAddress).getAddress().isAnyLocalAddress() && !PlatformDependent.isWindows() && !PlatformDependent.maybeSuperUser()) { // Warn a user about the fact that a non-root user can't receive a // broadcast packet on *nix if the socket is bound on non-wildcard address. logger.warn( "A non-root user can't receive a broadcast packet if the socket " + "is not bound to a wildcard address; binding to a non-wildcard " + "address (" + localAddress + ") anyway as requested."); } boolean wasActive = isActive(); try { // 這裡會呼叫 jdk 的ServerSocketChannel介面, 實現真正的埠繫結 // 至此, 服務對外可見 doBind(localAddress); } catch (Throwable t) { safeSetFailure(promise, t); closeIfClosed(); return; } // 判斷是否是首次建立 channel, 如果是, 則呼叫 fireChannelActive() 傳播channelActive事件 if (!wasActive && isActive()) { // 這將會被稍後執行 invokeLater(new Runnable() { @Override public void run() { pipeline.fireChannelActive(); } }); } // 觸發一些通知什麼的, 結束了 safeSetSuccess(promise); } // 最終的bind(), 是通過 jdk 底層的 serverSocketChannel 開啟socket監聽 // io.netty.channel.socket.nio.NioServerSocketChannel#doBind @Override protected void doBind(SocketAddress localAddress) throws Exception { if (PlatformDependent.javaVersion() >= 7) { // 呼叫 serverSocketChannel bind() 方法,開啟socket監聽 javaChannel().bind(localAddress, config.getBacklog()); } else { javaChannel().socket().bind(localAddress, config.getBacklog()); } }
至此, bind 工作總算是完成了.我們來總結下它的主要工作:
1. 初始化一個channel, 根據設定裡來, 我們使用 NioServerSocketChannel;
2. 過繼現有的配置項給到channel;
3. 將channel與eventloop繫結做註冊, 新增 ServerBootstrapAcceptor 到 pipeline 中;
4. 繫結完成後, 通知現有的handler, 觸發系列事件: fireHandlerAdded() -> fireChannelRegistered() -> fireChannelActive();
5. 而bind()則作為一個出站事件, 被處理, 最終呼叫 jdk的ServerSocketChannel.register() 完成埠的開啟;
不過有一點需要注意, 在這個過程中, 只有 bossGroup 起作用, 所有的 workGroup 都還在待命中. 我們目前看到的 pipeline 是這樣的: head -> Acceptor -> tail;
講了這麼多, 有一種繞了一大圈的感覺有木有, 如果你自己直接使用nio寫, 估計10行程式碼都不要就搞定了. 尷尬!
5. netty eventloop 主迴圈
evenloop是netty的重要概念, 但在前面我們並未細講這玩意如何起作用(僅看過其建立過程而已), 不過這並不意味著它還沒起作用, 而我們暫時忽略了它. 每次要執行任務時, 總是會呼叫 eventloop().execute(...), 實際上這就是 eventloop的入口:
// io.netty.util.concurrent.SingleThreadEventExecutor#execute @Override public void execute(Runnable task) { // execute 線上程池中, 是一個非同步任務的提交方法, eventloop中同樣也一樣 // 但是大部分情況下只是新增佇列, 因為 eventloop 是單執行緒的 if (task == null) { throw new NullPointerException("task"); } // 向eventLoop佇列中新增task boolean inEventLoop = inEventLoop(); addTask(task); // 如果自身就是執行在 eventloop 環境中, 新增完task後則不再做更多的事 if (!inEventLoop) { // 如果不是在eventLoop執行緒中,則都會嘗試建立新執行緒執行, 但實際會重新檢測執行緒是否建立 startThread(); if (isShutdown() && removeTask(task)) { reject(); } } if (!addTaskWakesUp && wakesUpForTask(task)) { wakeup(inEventLoop); } } // io.netty.util.concurrent.SingleThreadEventExecutor#addTask /** * Add a task to the task queue, or throws a {@link RejectedExecutionException} if this instance was shutdown * before. */ protected void addTask(Runnable task) { if (task == null) { throw new NullPointerException("task"); } // taskQueue = MpscUnsafeUnboundedArrayQueue, 基於Unsafe 和 cas 實現的執行緒安全的佇列 if (!offerTask(task)) { // 新增失敗,則走拒絕策略 reject(task); } } // startThread, 看起來是開啟執行緒的意思, 卻又不太一樣 private void startThread() { // 所以實際上只會建立一次執行緒 if (state == ST_NOT_STARTED) { // 搶到鎖的執行緒才能呼叫start()方法 if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) { try { doStartThread(); } catch (Throwable cause) { STATE_UPDATER.set(this, ST_NOT_STARTED); PlatformDependent.throwException(cause); } } } } // 開啟eventLoop的執行緒 // io.netty.util.concurrent.SingleThreadEventExecutor#doStartThread private void doStartThread() { assert thread == null; // 它並不是簡單的thread.start() executor.execute(new Runnable() { @Override public void run() { thread = Thread.currentThread(); if (interrupted) { thread.interrupt(); } boolean success = false; updateLastExecutionTime(); try { // 核心方法,由 SingleThreadEventExecutor.run() 實現 // 當然是由具體的executor具體實現了, 此文為 NioEventLoop.run() SingleThreadEventExecutor.this.run(); success = true; } catch (Throwable t) { logger.warn("Unexpected exception from an event executor: ", t); } finally { // 執行緒池關閉,優雅停機 ... } } }); }
核心: 事件迴圈主框架, 既然是事件迴圈,則其必然是不會退出的。
// io.netty.channel.nio.NioEventLoop#run @Override protected void run() { // 一個死迴圈檢測任務, 這就 eventloop 的大殺器哦 for (;;) { try { switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) { case SelectStrategy.CONTINUE: continue; // 有任務時執行任務, 否則阻塞等待網路事件, 或被喚醒 case SelectStrategy.SELECT: // select.select(), 帶超時限制 select(wakenUp.getAndSet(false)); // 'wakenUp.compareAndSet(false, true)' is always evaluated // before calling 'selector.wakeup()' to reduce the wake-up // overhead. (Selector.wakeup() is an expensive operation.) // // However, there is a race condition in this approach. // The race condition is triggered when 'wakenUp' is set to // true too early. // // 'wakenUp' is set to true too early if: // 1) Selector is waken up between 'wakenUp.set(false)' and // 'selector.select(...)'. (BAD) // 2) Selector is waken up between 'selector.select(...)' and // 'if (wakenUp.get()) { ... }'. (OK) // // In the first case, 'wakenUp' is set to true and the // following 'selector.select(...)' will wake up immediately. // Until 'wakenUp' is set to false again in the next round, // 'wakenUp.compareAndSet(false, true)' will fail, and therefore // any attempt to wake up the Selector will fail, too, causing // the following 'selector.select(...)' call to block // unnecessarily. // // To fix this problem, we wake up the selector again if wakenUp // is true immediately after selector.select(...). // It is inefficient in that it wakes up the selector for both // the first case (BAD - wake-up required) and the second case // (OK - no wake-up required). if (wakenUp.get()) { selector.wakeup(); } // fall through default: } cancelledKeys = 0; needsToSelectAgain = false; // ioRatio 為io操作的佔比, 和執行任務相比, 預設為 50:50 final int ioRatio = this.ioRatio; if (ioRatio == 100) { try { // step1. 執行io操作 processSelectedKeys(); } finally { // Ensure we always run tasks. // step2. 執行task任務 runAllTasks(); } } else { final long ioStartTime = System.nanoTime(); try { processSelectedKeys(); } finally { // Ensure we always run tasks. final long ioTime = System.nanoTime() - ioStartTime; // 執行任