Netty服務端的啟動原始碼分析
ServerBootstrap的構造:
1 public class ServerBootstrap extends AbstractBootstrap<ServerBootstrap, ServerChannel> { 2 private static final InternalLogger logger = InternalLoggerFactory.getInstance(ServerBootstrap.class); 3 private final Map<ChannelOption<?>, Object> childOptions = new LinkedHashMap(); 4 private final Map<AttributeKey<?>, Object> childAttrs = new LinkedHashMap(); 5 private final ServerBootstrapConfig config = new ServerBootstrapConfig(this); 6 private volatile EventLoopGroup childGroup; 7 private volatile ChannelHandler childHandler; 8 9 public ServerBootstrap() { 10 } 11 ...... 12 }
隱式地執行了父類的無參構造:
1 public abstract class AbstractBootstrap<B extends AbstractBootstrap<B, C>, C extends Channel> implements Cloneable { 2 volatile EventLoopGroup group; 3 private volatile ChannelFactory<? extends C> channelFactory; 4 private volatile SocketAddress localAddress; 5 private final Map<ChannelOption<?>, Object> options = new LinkedHashMap(); 6 private final Map<AttributeKey<?>, Object> attrs = new LinkedHashMap(); 7 private volatile ChannelHandler handler; 8 9 AbstractBootstrap() { 10 } 11 ...... 12 }
只是初始化了幾個容器成員
在ServerBootstrap建立後,需要呼叫group方法,繫結EventLoopGroup,有關EventLoopGroup的建立在我之前部落格中寫過:Netty中NioEventLoopGroup的建立原始碼分析
ServerBootstrap的group方法:
1 public ServerBootstrap group(EventLoopGroup group) { 2 return this.group(group, group); 3 } 4 5 public ServerBootstrap group(EventLoopGroup parentGroup, EventLoopGroup childGroup) { 6 super.group(parentGroup); 7 if (childGroup == null) { 8 throw new NullPointerException("childGroup"); 9 } else if (this.childGroup != null) { 10 throw new IllegalStateException("childGroup set already"); 11 } else { 12 this.childGroup = childGroup; 13 return this; 14 } 15 }
首先呼叫父類的group方法繫結parentGroup:
1 public B group(EventLoopGroup group) { 2 if (group == null) { 3 throw new NullPointerException("group"); 4 } else if (this.group != null) { 5 throw new IllegalStateException("group set already"); 6 } else { 7 this.group = group; 8 return this.self(); 9 } 10 } 11 12 private B self() { 13 return this; 14 }
將傳入的parentGroup繫結給AbstractBootstrap的group成員,將childGroup繫結給ServerBootstrap的childGroup成員。
group的繫結僅僅是交給了成員儲存。
再來看看ServerBootstrap的channel方法,,是在AbstractBootstrap中實現的:
1 public B channel(Class<? extends C> channelClass) { 2 if (channelClass == null) { 3 throw new NullPointerException("channelClass"); 4 } else { 5 return this.channelFactory((io.netty.channel.ChannelFactory)(new ReflectiveChannelFactory(channelClass))); 6 } 7 }
使用channelClass構建了一個ReflectiveChannelFactory物件:
1 public class ReflectiveChannelFactory<T extends Channel> implements ChannelFactory<T> { 2 private final Class<? extends T> clazz; 3 4 public ReflectiveChannelFactory(Class<? extends T> clazz) { 5 if (clazz == null) { 6 throw new NullPointerException("clazz"); 7 } else { 8 this.clazz = clazz; 9 } 10 } 11 12 public T newChannel() { 13 try { 14 return (Channel)this.clazz.getConstructor().newInstance(); 15 } catch (Throwable var2) { 16 throw new ChannelException("Unable to create Channel from class " + this.clazz, var2); 17 } 18 } 19 20 public String toString() { 21 return StringUtil.simpleClassName(this.clazz) + ".class"; 22 } 23 }
可以看到ReflectiveChannelFactory的作用就是通過反射機制,產生clazz的例項(這裡以NioServerSocketChannel為例)。
在建立完ReflectiveChannelFactory物件後, 呼叫channelFactory方法:
1 public B channelFactory(io.netty.channel.ChannelFactory<? extends C> channelFactory) { 2 return this.channelFactory((ChannelFactory)channelFactory); 3 } 4 5 public B channelFactory(ChannelFactory<? extends C> channelFactory) { 6 if (channelFactory == null) { 7 throw new NullPointerException("channelFactory"); 8 } else if (this.channelFactory != null) { 9 throw new IllegalStateException("channelFactory set already"); 10 } else { 11 this.channelFactory = channelFactory; 12 return this.self(); 13 } 14 }
將剛才建立的ReflectiveChannelFactory物件交給channelFactory成員,用於後續服務端NioServerSocketChannel的建立。
再來看ServerBootstrap的childHandler方法:
1 public ServerBootstrap childHandler(ChannelHandler childHandler) { 2 if (childHandler == null) { 3 throw new NullPointerException("childHandler"); 4 } else { 5 this.childHandler = childHandler; 6 return this; 7 } 8 }
還是交給了childHandler成員儲存,可以看到上述這一系列的操作,都是為了填充ServerBootstrap,而ServerBootstrap真正的啟動是在bind時:
ServerBootstrap的bind方法,在AbstractBootstrap中實現:
1 public ChannelFuture bind(int inetPort) { 2 return this.bind(new InetSocketAddress(inetPort)); 3 } 4 5 public ChannelFuture bind(String inetHost, int inetPort) { 6 return this.bind(SocketUtils.socketAddress(inetHost, inetPort)); 7 } 8 9 public ChannelFuture bind(InetAddress inetHost, int inetPort) { 10 return this.bind(new InetSocketAddress(inetHost, inetPort)); 11 } 12 13 public ChannelFuture bind(SocketAddress localAddress) { 14 this.validate(); 15 if (localAddress == null) { 16 throw new NullPointerException("localAddress"); 17 } else { 18 return this.doBind(localAddress); 19 } 20 }
可以看到首先呼叫了ServerBootstrap的validate方法,:
1 public ServerBootstrap validate() { 2 super.validate(); 3 if (this.childHandler == null) { 4 throw new IllegalStateException("childHandler not set"); 5 } else { 6 if (this.childGroup == null) { 7 logger.warn("childGroup is not set. Using parentGroup instead."); 8 this.childGroup = this.config.group(); 9 } 10 11 return this; 12 } 13 }
先呼叫了AbstractBootstrap的validate方法:
1 public B validate() { 2 if (this.group == null) { 3 throw new IllegalStateException("group not set"); 4 } else if (this.channelFactory == null) { 5 throw new IllegalStateException("channel or channelFactory not set"); 6 } else { 7 return this.self(); 8 } 9 }
這個方法就是用來檢查是否綁定了group和channel以及childHandler,所以在執行bind方法前,無論如何都要執行group、channel和childHandler方法。
實際的bind交給了doBind來完成:
1 private ChannelFuture doBind(final SocketAddress localAddress) { 2 final ChannelFuture regFuture = this.initAndRegister(); 3 final Channel channel = regFuture.channel(); 4 if (regFuture.cause() != null) { 5 return regFuture; 6 } else if (regFuture.isDone()) { 7 ChannelPromise promise = channel.newPromise(); 8 doBind0(regFuture, channel, localAddress, promise); 9 return promise; 10 } else { 11 final AbstractBootstrap.PendingRegistrationPromise promise = new AbstractBootstrap.PendingRegistrationPromise(channel); 12 regFuture.addListener(new ChannelFutureListener() { 13 public void operationComplete(ChannelFuture future) throws Exception { 14 Throwable cause = future.cause(); 15 if (cause != null) { 16 promise.setFailure(cause); 17 } else { 18 promise.registered(); 19 AbstractBootstrap.doBind0(regFuture, channel, localAddress, promise); 20 } 21 } 22 }); 23 return promise; 24 } 25 }
首先呼叫initAndRegister,完成ServerSocketChannel的建立以及註冊:
1 final ChannelFuture initAndRegister() { 2 Channel channel = null; 3 4 try { 5 channel = this.channelFactory.newChannel(); 6 this.init(channel); 7 } catch (Throwable var3) { 8 if (channel != null) { 9 channel.unsafe().closeForcibly(); 10 return (new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE)).setFailure(var3); 11 } 12 13 return (new DefaultChannelPromise(new FailedChannel(), GlobalEventExecutor.INSTANCE)).setFailure(var3); 14 } 15 16 ChannelFuture regFuture = this.config().group().register(channel); 17 if (regFuture.cause() != null) { 18 if (channel.isRegistered()) { 19 channel.close(); 20 } else { 21 channel.unsafe().closeForcibly(); 22 } 23 } 24 25 return regFuture; 26 }
首先呼叫channelFactory的newChannel通過反射機制構建Channel例項,也就是NioServerSocketChannel,
NioServerSocketChannel的無參構造:
1 public class NioServerSocketChannel extends AbstractNioMessageChannel implements ServerSocketChannel { 2 private static final SelectorProvider DEFAULT_SELECTOR_PROVIDER = SelectorProvider.provider(); 3 4 public NioServerSocketChannel() { 5 this(newSocket(DEFAULT_SELECTOR_PROVIDER)); 6 } 7 ...... 8 }
SelectorProvider 是JDK的,關於SelectorProvider在我之前的部落格中有介紹:【Java】NIO中Selector的建立原始碼分析
在Windows系統下預設產生WindowsSelectorProvider,即DEFAULT_SELECTOR_PROVIDER,再來看看newSocket方法:
1 private static java.nio.channels.ServerSocketChannel newSocket(SelectorProvider provider) { 2 try { 3 return provider.openServerSocketChannel(); 4 } catch (IOException var2) { 5 throw new ChannelException("Failed to open a server socket.", var2); 6 } 7 }
使用WindowsSelectorProvider建立了一個ServerSocketChannelImpl,其實看到這裡就明白了,NioServerSocketChannel是為了封裝JDK的ServerSocketChannel
接著呼叫另一個過載的構造:
1 public NioServerSocketChannel(java.nio.channels.ServerSocketChannel channel) { 2 super((Channel)null, channel, 16); 3 this.config = new NioServerSocketChannel.NioServerSocketChannelConfig(this, this.javaChannel().socket()); 4 }
首先呼叫父類的三參構造,其中16對應的是JDK中SelectionKey的ACCEPT狀態:
1 public static final int OP_ACCEPT = 1 << 4;
其父類的構造處於一條繼承鏈上:
AbstractNioMessageChannel:
1 protected AbstractNioMessageChannel(Channel parent, SelectableChannel ch, int readInterestOp) { 2 super(parent, ch, readInterestOp); 3 }
AbstractNioChannel:
1 protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) { 2 super(parent); 3 this.ch = ch; 4 this.readInterestOp = readInterestOp; 5 6 try { 7 ch.configureBlocking(false); 8 } catch (IOException var7) { 9 try { 10 ch.close(); 11 } catch (IOException var6) { 12 if (logger.isWarnEnabled()) { 13 logger.warn("Failed to close a partially initialized socket.", var6); 14 } 15 } 16 17 throw new ChannelException("Failed to enter non-blocking mode.", var7); 18 } 19 }
AbstractChannel:
1 private final ChannelId id; 2 private final Channel parent; 3 private final Unsafe unsafe; 4 private final DefaultChannelPipeline pipeline; 5 6 protected AbstractChannel(Channel parent) { 7 this.parent = parent; 8 this.id = this.newId(); 9 this.unsafe = this.newUnsafe(); 10 this.pipeline = this.newChannelPipeline(); 11 }
在AbstractChannel中使用newUnsafe和newChannelPipeline分別建立了一個Unsafe和一個DefaultChannelPipeline物件,
在前面的部落格介紹NioEventLoopGroup時候,在NioEventLoop的run方法中,每次輪詢完呼叫processSelectedKeys方法時,都是通過這個unsafe根據SelectedKey來完成資料的讀或寫,unsafe是處理基礎的資料讀寫
(unsafe在NioServerSocketChannel建立時,產生NioMessageUnsafe例項,在NioSocketChannel建立時產生NioSocketChannelUnsafe例項)
而pipeline的實現是一條雙向責任鏈,負責處理unsafe提供的資料,進而進行使用者的業務邏輯 (Netty中的ChannelPipeline原始碼分析)
在AbstractNioChannel中呼叫configureBlocking方法給JDK的ServerSocketChannel設定為非阻塞模式,且讓readInterestOp成員賦值為16用於未來註冊ACCEPT事件。
在呼叫完繼承鏈後回到NioServerSocketChannel構造,呼叫了javaChannel方法:
1 protected java.nio.channels.ServerSocketChannel javaChannel() { 2 return (java.nio.channels.ServerSocketChannel)super.javaChannel(); 3 }
其實這個javaChannel就是剛才出傳入到AbstractNioChannel中的ch成員:
1 protected SelectableChannel javaChannel() { 2 return this.ch; 3 }
也就是剛才建立的JDK的ServerSocketChannelImpl,用其socket方法,得到一個ServerSocket物件,然後產生了一個NioServerSocketChannelConfig物件,用於封裝相關資訊。
NioServerSocketChannel構建完畢,回到initAndRegister方法,使用剛建立的NioServerSocketChannel呼叫init方法,這個方法是在ServerBootstrap中實現的:
1 void init(Channel channel) throws Exception { 2 Map<ChannelOption<?>, Object> options = this.options0(); 3 synchronized(options) { 4 setChannelOptions(channel, options, logger); 5 } 6 7 Map<AttributeKey<?>, Object> attrs = this.attrs0(); 8 synchronized(attrs) { 9 Iterator var5 = attrs.entrySet().iterator(); 10 11 while(true) { 12 if (!var5.hasNext()) { 13 break; 14 } 15 16 Entry<AttributeKey<?>, Object> e = (Entry)var5.next(); 17 AttributeKey<Object> key = (AttributeKey)e.getKey(); 18 channel.attr(key).set(e.getValue()); 19 } 20 } 21 22 ChannelPipeline p = channel.pipeline(); 23 final EventLoopGroup currentChildGroup = this.childGroup; 24 final ChannelHandler currentChildHandler = this.childHandler; 25 Map var9 = this.childOptions; 26 final Entry[] currentChildOptions; 27 synchronized(this.childOptions) { 28 currentChildOptions = (Entry[])this.childOptions.entrySet().toArray(newOptionArray(0)); 29 } 30 31 var9 = this.childAttrs; 32 final Entry[] currentChildAttrs; 33 synchronized(this.childAttrs) { 34 currentChildAttrs = (Entry[])this.childAttrs.entrySet().toArray(newAttrArray(0)); 35 } 36 37 p.addLast(new ChannelHandler[]{new ChannelInitializer<Channel>() { 38 public void initChannel(final Channel ch) throws Exception { 39 final ChannelPipeline pipeline = ch.pipeline(); 40 ChannelHandler handler = ServerBootstrap.this.config.handler(); 41 if (handler != null) { 42 pipeline.addLast(new ChannelHandler[]{handler}); 43 } 44 45 ch.eventLoop().execute(new Runnable() { 46 public void run() { 47 pipeline.addLast(new ChannelHandler[]{new ServerBootstrap.ServerBootstrapAcceptor(ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs)}); 48 } 49 }); 50 } 51 }}); 52 }
首先對attrs和options這兩個成員進行了填充屬性配置,這不是重點,然後獲取剛才建立的NioServerSocketChannel的責任鏈pipeline,通過addLast將ChannelInitializer加入責任鏈,在ChannelInitializer中重寫了initChannel方法,首先根據handler是否是null(這個handler是ServerBootstrap呼叫handler方法新增的,和childHandler方法不一樣),若是handler不是null,將handler加入責任鏈,無論如何,都會非同步將一個ServerBootstrapAcceptor物件加入責任鏈(後面會說為什麼是非同步)
這個ChannelInitializer的initChannel方法的執行需要等到後面註冊時才會被呼叫,在後面pipeline處理channelRegistered請求時,此initChannel方法才會被執行 (Netty中的ChannelPipeline原始碼分析)
ChannelInitializer的channelRegistered方法:
1 public final void channelRegistered(ChannelHandlerContext ctx) throws Exception { 2 if (initChannel(ctx)) { 3 ctx.pipeline().fireChannelRegistered(); 4 } else { 5 ctx.fireChannelRegistered(); 6 } 7 }
首先呼叫initChannel方法(和上面的initChannel不是一個):
1 private boolean initChannel(ChannelHandlerContext ctx) throws Exception { 2 if (initMap.putIfAbsent(ctx, Boolean.TRUE) == null) { 3 try { 4 initChannel((C) ctx.channel()); 5 } catch (Throwable cause) { 6 exceptionCaught(ctx, cause); 7 } finally { 8 remove(ctx); 9 } 10 return true; 11 } 12 return false; 13 }
可以看到,這個ChannelInitializer只會在pipeline中初始化一次,僅用於Channel的註冊,在完成註冊後,會呼叫remove方法將其從pipeline中移除:
remove方法:
1 private void remove(ChannelHandlerContext ctx) { 2 try { 3 ChannelPipeline pipeline = ctx.pipeline(); 4 if (pipeline.context(this) != null) { 5 pipeline.remove(this); 6 } 7 } finally { 8 initMap.remove(ctx); 9 } 10 }
在移除前,就會回撥用剛才覆蓋的initChannel方法,異步向pipeline添加了ServerBootstrapAcceptor,用於後續的NioServerSocketChannel偵聽到客戶端連線後,完成在服務端的NioSocketChannel的註冊。
回到initAndRegister,在對NioServerSocketChannel初始化完畢,接下來就是註冊邏輯:
1 ChannelFuture regFuture = this.config().group().register(channel);
首先呼叫config().group(),這個就得到了一開始在ServerBootstrap的group方法傳入的parentGroup,呼叫parentGroup的register方法,parentGroup是NioEventLoopGroup,這個方法是在子類MultithreadEventLoopGroup中實現的:
1 public ChannelFuture register(Channel channel) { 2 return this.next().register(channel); 3 }
首先呼叫next方法:
1 public EventLoop next() { 2 return (EventLoop)super.next(); 3 }
實際上呼叫父類MultithreadEventExecutorGroup的next方法:
1 public EventExecutor next() { 2 return this.chooser.next(); 3 }
關於chooser在我之前部落格:Netty中NioEventLoopGroup的建立原始碼分析 介紹過,在NioEventLoopGroup建立時,預設會根據cpu個數建立二倍個NioEventLoop,而chooser就負責通過取模,每次選擇一個NioEventLoop使用
所以在MultithreadEventLoopGroup的register方法實際呼叫了NioEventLoop的register方法:
NioEventLoop的register方法在子類SingleThreadEventLoop中實現:
1 public ChannelFuture register(Channel channel) { 2 return this.register((ChannelPromise)(new DefaultChannelPromise(channel, this))); 3 } 4 5 public ChannelFuture register(ChannelPromise promise) { 6 ObjectUtil.checkNotNull(promise, "promise"); 7 promise.channel().unsafe().register(this, promise); 8 return promise; 9 }
先把channel包裝成ChannelPromise,預設是DefaultChannelPromise (Netty中的ChannelFuture和ChannelPromise),用於處理非同步操作
呼叫過載方法,而在過載方法裡,可以看到,實際上的register操作交給了channel的unsafe來實現:
unsafe的register方法在AbstractUnsafe中實現:
1 public final void register(EventLoop eventLoop, final ChannelPromise promise) { 2 if (eventLoop == null) { 3 throw new NullPointerException("eventLoop"); 4 } else if (AbstractChannel.this.isRegistered()) { 5 promise.setFailure(new IllegalStateException("registered to an event loop already")); 6 } else if (!AbstractChannel.this.isCompatible(eventLoop)) { 7 promise.setFailure(new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName())); 8 } else { 9 AbstractChannel.this.eventLoop = eventLoop; 10 if (eventLoop.inEventLoop()) { 11 this.register0(promise); 12 } else { 13 try { 14 eventLoop.execute(new Runnable() { 15 public void run() { 16 AbstractUnsafe.this.register0(promise); 17 } 18 }); 19 } catch (Throwable var4) { 20 AbstractChannel.logger.warn("Force-closing a channel whose registration task was not accepted by an event loop: {}", AbstractChannel.this, var4); 21 this.closeForcibly(); 22 AbstractChannel.this.closeFuture.setClosed(); 23 this.safeSetFailure(promise, var4); 24 } 25 } 26 27 } 28 }
前面的判斷做了一些檢查就不細說了,直接看到else塊
首先給當前Channel綁定了eventLoop,即通過剛才chooser選擇的eventLoop,該Channel也就是NioServerSocketChannel
由於Unsafe的操作是在輪詢執行緒中非同步執行的,所裡,這裡需要判斷inEventLoop是否處於輪詢中
在之前介紹NioEventLoopGroup的時候說過,NioEventLoop在沒有呼叫doStartThread方法時並沒有啟動輪詢的,所以inEventLoop判斷不成立
那麼就呼叫eventLoop的execute方法,實際上的註冊方法可以看到呼叫了AbstractUnsafe的register0方法,而將這個方法封裝為Runnable交給eventLoop作為一個task去非同步執行
先來看eventLoop的execute方法實現,是在NioEventLoop的子類SingleThreadEventExecutor中實現的:
1 public void execute(Runnable task) { 2 if (task == null) { 3 throw new NullPointerException("task"); 4 } else { 5 boolean inEventLoop = this.inEventLoop(); 6 this.addTask(task); 7 if (!inEventLoop) { 8 this.startThread(); 9 if (this.isShutdown() && this.removeTask(task)) { 10 reject(); 11 } 12 } 13 14 if (!this.addTaskWakesUp && this.wakesUpForTask(task)) { 15 this.wakeup(inEventLoop); 16 } 17 18 } 19 }
這裡首先將task,即剛才的註冊事件放入阻塞任務佇列中,然後呼叫startThread方法:
1 private void startThread() { 2 if (this.state == 1 && STATE_UPDATER.compareAndSet(this, 1, 2)) { 3 try { 4 this.doStartThread(); 5 } catch (Throwable var2) { 6 STATE_UPDATER.set(this, 1); 7 PlatformDependent.throwException(var2); 8 } 9 } 10 11 }
NioEventLoop此時還沒有輪詢,所以狀態是1,對應ST_NOT_STARTED,此時利用CAS操作,將狀態修改為2,即ST_STARTED ,標誌著NioEventLoop要啟動輪詢了,果然,接下來就呼叫了doStartThread開啟輪詢執行緒:
1 private void doStartThread() { 2 assert this.thread == null; 3 4 this.executor.execute(new Runnable() { 5 public void run() { 6 SingleThreadEventExecutor.this.thread = Thread.currentThread(); 7 if (SingleThreadEventExecutor.this.interrupted) { 8 SingleThreadEventExecutor.this.thread.interrupt(); 9 } 10 11 boolean success = false; 12 SingleThreadEventExecutor.this.updateLastExecutionTime(); 13 boolean var112 = false; 14 15 int oldState; 16 label1907: { 17 try { 18 var112 = true; 19 SingleThreadEventExecutor.this.run(); 20 success = true; 21 var112 = false; 22 break label1907; 23 } catch (Throwable var119) { 24 SingleThreadEventExecutor.logger.warn("Unexpected exception from an event executor: ", var119); 25 var112 = false; 26 } finally { 27 if (var112) { 28 int oldStatex; 29 do { 30 oldStatex = SingleThreadEventExecutor.this.state; 31 } while(oldStatex < 3 && !SingleThreadEventExecutor.STATE_UPDATER.compareAndSet(SingleThreadEventExecutor.this, oldStatex, 3)); 32 33 if (success && SingleThreadEventExecutor.this.gracefulShutdownStartTime == 0L && SingleThreadEventExecutor.logger.isErrorEnabled()) { 34 SingleThreadEventExecutor.logger.error("Buggy " + EventExecutor.class.getSimpleName() + " implementation; " + SingleThreadEventExecutor.class.getSimpleName() + ".confirmShutdown() must be called before run() implementation terminates."); 35 } 36 37 try { 38 while(!SingleThreadEventExecutor.this.confirmShutdown()) { 39 ; 40 } 41 } finally { 42 try { 43 SingleThreadEventExecutor.this.cleanup(); 44 } finally { 45 SingleThreadEventExecutor.STATE_UPDATER.set(SingleThreadEventExecutor.this, 5); 46 SingleThreadEventExecutor.this.threadLock.release(); 47 if (!SingleThreadEventExecutor.this.taskQueue.isEmpty() && SingleThreadEventExecutor.logger.isWarnEnabled()) { 48 SingleThreadEventExecutor.logger.warn("An event executor terminated with non-empty task queue (" + SingleThreadEventExecutor.this.taskQueue.size() + ')'); 49 } 50 51 SingleThreadEventExecutor.this.terminationFuture.setSuccess((Object)null); 52 } 53 } 54 55 } 56 } 57 58 do { 59 oldState = SingleThreadEventExecutor.this.state; 60 } while(oldState < 3 && !SingleThreadEventExecutor.STATE_UPDATER.compareAndSet(SingleThreadEventExecutor.this, oldState, 3)); 61 62 if (success && SingleThreadEventExecutor.this.gracefulShutdownStartTime == 0L && SingleThreadEventExecutor.logger.isErrorEnabled()) { 63 SingleThreadEventExecutor.logger.error("Buggy " + EventExecutor.class.getSimpleName() + " implementation; " + SingleThreadEventExecutor.class.getSimpleName() + ".confirmShutdown() must be called before run() implementation terminates."); 64 } 65 66 try { 67 while(!SingleThreadEventExecutor.this.confirmShutdown()) { 68 ; 69 } 70 71 return; 72 } finally { 73 try { 74 SingleThreadEventExecutor.this.cleanup(); 75 } finally { 76 SingleThreadEventExecutor.STATE_UPDATER.set(SingleThreadEventExecutor.this, 5); 77 SingleThreadEventExecutor.this.threadLock.release(); 78 if (!SingleThreadEventExecutor.this.taskQueue.isEmpty() && SingleThreadEventExecutor.logger.isWarnEnabled()) { 79 SingleThreadEventExecutor.logger.warn("An event executor terminated with non-empty task queue (" + SingleThreadEventExecutor.this.taskQueue.size() + ')'); 80 } 81 82 SingleThreadEventExecutor.this.terminationFuture.setSuccess((Object)null); 83 } 84 } 85 } 86 87 do { 88 oldState = SingleThreadEventExecutor.this.state; 89 } while(oldState < 3 && !SingleThreadEventExecutor.STATE_UPDATER.compareAndSet(SingleThreadEventExecutor.this, oldState, 3)); 90 91 if (success && SingleThreadEventExecutor.this.gracefulShutdownStartTime == 0L && SingleThreadEventExecutor.logger.isErrorEnabled()) { 92 SingleThreadEventExecutor.logger.error("Buggy " + EventExecutor.class.getSimpleName() + " implementation; " + SingleThreadEventExecutor.class.getSimpleName() + ".confirmShutdown() must be called before run() implementation terminates."); 93 } 94 95 try { 96 while(!SingleThreadEventExecutor.this.confirmShutdown()) { 97 ; 98 } 99 } finally { 100 try { 101 SingleThreadEventExecutor.this.cleanup(); 102 } finally { 103 SingleThreadEventExecutor.STATE_UPDATER.set(SingleThreadEventExecutor.this, 5); 104 SingleThreadEventExecutor.this.threadLock.release(); 105 if (!SingleThreadEventExecutor.this.taskQueue.isEmpty() && SingleThreadEventExecutor.logger.isWarnEnabled()) { 106 SingleThreadEventExecutor.logger.warn("An event executor terminated with non-empty task queue (" + SingleThreadEventExecutor.this.taskQueue.size() + ')'); 107 } 108 109 SingleThreadEventExecutor.this.terminationFuture.setSuccess((Object)null); 110 } 111 } 112 113 } 114 }); 115 }
關於doStartThread方法,我在 Netty中NioEventLoopGroup的建立原始碼分析 中已經說的很細了,這裡就不再一步一步分析了
因為此時還沒真正意義上的啟動輪詢,所以thread等於null成立的,然後呼叫executor的execute方法,這裡的executor是一個執行緒池,在之前說過的,所以裡面的run方法是處於一個執行緒裡面的,然後給thread成員賦值為當前執行緒,表明正式進入了輪詢。
而SingleThreadEventExecutor.this.run()才是真正的輪詢邏輯,這在之前也說過,這個run的實現是在父類NioEventLoop中:
1 protected void run() { 2 while(true) { 3 while(true) { 4 try { 5 switch(this.selectStrategy.calculateStrategy(this.selectNowSupplier, this.hasTasks())) { 6 case -2: 7 continue; 8 case -1: 9 this.select(this.wakenUp.getAndSet(false)); 10 if (this.wakenUp.get()) { 11 this.selector.wakeup(); 12 } 13 default: 14 this.cancelledKeys = 0; 15 this.needsToSelectAgain = false; 16 int ioRatio = this.ioRatio; 17 if (ioRatio == 100) { 18 try { 19 this.processSelectedKeys(); 20 } finally { 21 this.runAllTasks(); 22 } 23 } else { 24 long ioStartTime = System.nanoTime(); 25 boolean var13 = false; 26 27 try { 28 var13 = true; 29 this.processSelectedKeys(); 30 var13 = false; 31 } finally { 32 if (var13) { 33 long ioTime = System.nanoTime() - ioStartTime; 34 this.runAllTasks(ioTime * (long)(100 - ioRatio) / (long)ioRatio); 35 } 36 } 37 38 long ioTime = System.nanoTime() - ioStartTime; 39 this.runAllTasks(ioTime * (long)(100 - ioRatio) / (long)ioRatio); 40 } 41 } 42 } catch (Throwable var21) { 43 handleLoopException(var21); 44 } 45 46 try { 47 if (this.isShuttingDown()) { 48 this.closeAll(); 49 if (this.confirmShutdown()) { 50 return; 51 } 52 } 53 } catch (Throwable var18) { 54 handleLoopException(var18); 55 } 56 } 57 } 58 }
首先由於task已經有一個了,就是剛才的註冊事件,所以選擇策略calculateStrategy最終呼叫selectNow(也是之前說過的):
1 private final IntSupplier selectNowSupplier = new IntSupplier() { 2 public int get() throws Exception { 3 return NioEventLoop.this.selectNow(); 4 } 5 }; 6 7 int selectNow() throws IOException { 8 int var1; 9 try { 10 var1 = this.selector.selectNow(); 11 } finally { 12 if (this.wakenUp.get()) { 13 this.selector.wakeup(); 14 } 15 16 } 17 18 return var1; 19 }
使用JDK原生Selector進行selectNow,由於此時沒有任何Channel的註冊,所以selectNow會立刻返回0,此時就進入default邏輯,由於沒有任何註冊,processSelectedKeys方法也做不了什麼,所以在這一次的輪詢實質上只進行了runAllTasks方法,此方法會執行阻塞佇列中的task的run方法(還是在之前部落格中介紹過),由於輪詢是線上程池中的一個執行緒中執行的,所以task的執行是一個非同步操作。(在執行完task,將task移除阻塞對立,執行緒繼續輪詢)
這時就可以回到AbstractChannel的register方法中了,由上面可以知道task實際上非同步執行了:
1 AbstractUnsafe.this.register0(promise);
register0方法:
1 private void register0(ChannelPromise promise) { 2 try { 3 if (!promise.setUncancellable() || !this.ensureOpen(promise)) { 4 return; 5 } 6 7 boolean firstRegistration = this.neverRegistered; 8 AbstractChannel.this.doRegister(); 9 this.neverRegistered = false; 10 AbstractChannel.this.registered = true; 11 AbstractChannel.this.pipeline.invokeHandlerAddedIfNeeded(); 12 this.safeSetSuccess(promise); 13 AbstractChannel.this.pipeline.fireChannelRegistered(); 14 if (AbstractChannel.this.isActive()) { 15 if (firstRegistration) { 16 AbstractChannel.this.pipeline.fireChannelActive(); 17 } else if (AbstractChannel.this.config().isAutoRead()) { 18 this.beginRead(); 19 } 20 } 21 } catch (Throwable var3) { 22 this.closeForcibly(); 23 AbstractChannel.this.closeFuture.setClosed(); 24 this.safeSetFailure(promise, var3); 25 } 26 27 }
可以看到實際上的註冊邏輯又交給了AbstractChannel的doRegister,而這個方法在AbstractNioChannel中實現:
1 protected void doRegister() throws Exception { 2 boolean selected = false; 3 4 while(true) { 5 try { 6 this.selectionKey = this.javaChannel().register(this.eventLoop().unwrappedSelector(), 0, this); 7 return; 8 } catch (CancelledKeyException var3) { 9 if (selected) { 10 throw var3; 11 } 12 13 this.eventLoop().selectNow(); 14 selected = true; 15 } 16 } 17 }
javaChannel就是之前產生的JDK的ServerSocketChannel,unwrappedSelector在之前說過,就是未經修改的JDK原生Selector,這個Selector和eventLoop是一對一繫結的,可以看到呼叫JDK原生的註冊方法,完成了對ServerSocketChannel的註冊,但是註冊的是一個0狀態(預設值),而傳入的this,即AbstractNioChannel物件作為了一個附件,用於以後processSelectedKeys方法從SelectionKey中得到對應的Netty的Channel(還是之前部落格說過)
關於預設值,是由於AbstractNioChannel不僅用於NioServerSocketChannel的註冊,還用於NioSocketChannel的註冊,只有都使用預設值註冊才不會產生異常 【Java】NIO中Channel的註冊原始碼分析 ,並且,在以後processSelectedKeys方法會對0狀態判斷,再使用unsafe進行相應的邏輯處理。
在完成JDK的註冊後,呼叫pipeline的invokeHandlerAddedIfNeeded方法(Netty中的ChannelPipeline原始碼分析),處理ChannelHandler的handlerAdded的回撥,即呼叫使用者新增的ChannelHandler的handlerAdded方法。
呼叫safeSetSuccess,標誌非同步操作完成:
1 protected final void safeSetSuccess(ChannelPromise promise) { 2 if (!(promise instanceof VoidChannelPromise) && !promise.trySuccess()) { 3 logger.warn("Failed to mark a promise as success because it is done already: {}", promise); 4 } 5 }
關於非同步操作我在之前的部落格中說的很清楚了:Netty中的ChannelFuture和ChannelPromise
接著呼叫pipeline的fireChannelRegistered方法,也就是在責任鏈上呼叫channelRegistered方法,這時,就會呼叫之在ServerBootstrap中向pipeline新增的ChannelInitializer的channelRegistered,進而回調initChannel方法,完成對ServerBootstrapAcceptor的新增。
回到register0方法,在處理完pipeline的責任鏈後,根據當前AbstractChannel即NioServerSocketChannel的isActive:
1 public boolean isActive() { 2 return this.javaChannel().socket().isBound(); 3 }
獲得NioServerSocketChannel繫結的JDK的ServerSocketChannel,進而獲取ServerSocket,判斷isBound:
1 public boolean isBound() { 2 // Before 1.3 ServerSockets were always bound during creation 3 return bound || oldImpl; 4 }
這裡實際上就是判斷ServerSocket是否呼叫了bind方法,前面說過register0方法是一個非同步操作,在多執行緒環境下不能保證執行順序,若是此時已經完成了ServerSocket的bind,根據firstRegistration判斷是否需要pipeline傳遞channelActive請求,首先會執行pipeline的head即HeadContext的channelActive方法:
1 @Override 2 public void channelActive(ChannelHandlerContext ctx) throws Exception { 3 ctx.fireChannelActive(); 4 5 readIfIsAutoRead(); 6 }
在HeadContext通過ChannelHandlerContext 傳遞完channelActive請求後,會呼叫readIfIsAutoRead方法:
1 private void readIfIsAutoRead() { 2 if (channel.config().isAutoRead()) { 3 channel.read(); 4 } 5 }
此時呼叫AbstractChannel的read方法:
1 public Channel read() { 2 pipeline.read(); 3 return this; 4 }
最終在請求鏈由HeadContext執行read方法:
1 public void read(ChannelHandlerContext ctx) { 2 unsafe.beginRead(); 3 }
終於可以看到此時呼叫unsafe的beginRead方法:
1 public final void beginRead() { 2 assertEventLoop(); 3 4 if (!isActive()) { 5 return; 6 } 7 8 try { 9 doBeginRead(); 10 } catch (final Exception e) { 11 invokeLater(new Runnable() { 12 @Override 13 public void run() { 14 pipeline.fireExceptionCaught(e); 15 } 16 }); 17 close(voidPromise()); 18 } 19 }
最終執行了doBeginRead方法,由AbstractNioChannel實現:
1 protected void doBeginRead() throws Exception { 2 final SelectionKey selectionKey = this.selectionKey; 3 if (!selectionKey.isValid()) { 4 return; 5 } 6 7 readPending = true; 8 9 final int interestOps = selectionKey.interestOps(); 10 if ((interestOps & readInterestOp) == 0) { 11 selectionKey.interestOps(interestOps | readInterestOp); 12 } 13 }
這裡,就完成了向Selector註冊readInterestOp事件,從前面來看就是ACCEPT事件
回到AbstractBootstrap的doBind方法,在initAndRegister邏輯結束後,由上面可以知道,實際上的register註冊邏輯是一個非同步操作,在register0中完成
根據ChannelFuture來判斷非同步操作是否完成,如果isDone,則表明非同步操作先完成,即完成了safeSetSuccess方法,
然後呼叫newPromise方法:
1 public ChannelPromise newPromise() { 2 return pipeline.newPromise(); 3 }
給channel的pipeline繫結非同步操作ChannelPromise
然後呼叫doBind0方法完成ServerSocket的繫結,若是register0這個非同步操作還沒完成,就需要給ChannelFuture產生一個非同步操作的偵聽ChannelFutureListener物件,等到register0方法呼叫safeSetSuccess時,在promise的trySuccess中會回撥ChannelFutureListener的operationComplete方法,進而呼叫doBind0方法
doBind0方法:
1 private static void doBind0( 2 final ChannelFuture regFuture, final Channel channel, 3 final SocketAddress localAddress, final ChannelPromise promise) { 4 channel.eventLoop().execute(new Runnable() { 5 @Override 6 public void run() { 7 if (regFuture.isSuccess()) { 8 channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE); 9 } else { 10 promise.setFailure(regFuture.cause()); 11 } 12 } 13 }); 14 }
向輪詢執行緒提交了一個任務,非同步處理bind,可以看到,只有在regFuture非同步操作成功結束後,呼叫channel的bind方法:
1 public ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) { 2 return pipeline.bind(localAddress, promise); 3 }
實際上的bind又交給pipeline,去完成,pipeline中就會交給責任鏈去完成,最終會交給HeadContext完成:
1 public void bind( 2 ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) 3 throws Exception { 4 unsafe.bind(localAddress, promise); 5 }
可以看到,繞了一大圈,交給了unsafe完成:
1 public final void bind(final SocketAddress localAddress, final ChannelPromise promise) { 2 assertEventLoop(); 3 4 if (!promise.setUncancellable() || !ensureOpen(promise)) { 5 return; 6 } 7 8 if (Boolean.TRUE.equals(config().getOption(ChannelOption.SO_BROADCAST)) && 9 localAddress instanceof InetSocketAddress && 10 !((InetSocketAddress) localAddress).getAddress().isAnyLocalAddress() && 11 !PlatformDependent.isWindows() && !PlatformDependent.maybeSuperUser()) { 12 logger.warn( 13 "A non-root user can't receive a broadcast packet if the socket " + 14 "is not bound to a wildcard address; binding to a non-wildcard " + 15 "address (" + localAddress + ") anyway as requested."); 16 } 17 18 boolean wasActive = isActive(); 19 try { 20 doBind(localAddress); 21 } catch (Throwable t) { 22 safeSetFailure(promise, t); 23 closeIfClosed(); 24 return; 25 } 26 27 if (!wasActive && isActive()) { 28 invokeLater(new Runnable() { 29 @Override 30 public void run() { 31 pipeline.fireChannelActive(); 32 } 33 }); 34 } 35 36 safeSetSuccess(promise); 37 }
然而,真正的bind還是回調了doBind方法,最終是由NioServerSocketChannel來實現:
1 @Override 2 protected void doBind(SocketAddress localAddress) throws Exception { 3 if (PlatformDependent.javaVersion() >= 7) { 4 javaChannel().bind(localAddress, config.getBacklog()); 5 } else { 6 javaChannel().socket().bind(localAddress, config.getBacklog()); 7 } 8 }
在這裡終於完成了對JDK的ServerSocketChannel的bind操作
在上面的
1 if (!wasActive && isActive()) { 2 invokeLater(new Runnable() { 3 @Override 4 public void run() { 5 pipeline.fireChannelActive(); 6 } 7 }); 8 }
這個判斷,就是確保在register0中isActive時,還沒完成繫結,也就沒有beginRead操作來向Selector註冊ACCEPT事件,那麼就在這裡進行註冊,進而讓ServerSocket去偵聽客戶端的連線
在服務端ACCEPT到客戶端的連線後,在NioEventLoop輪詢中,就會呼叫processSelectedKeys處理ACCEPT的事件就緒,然後交給unsafe的read去處理 Netty中NioEventLoopGroup的建立原始碼分析
在服務端,由NioMessageUnsafe實現:
1 public void read() { 2 assert eventLoop().inEventLoop(); 3 final ChannelConfig config = config(); 4 final ChannelPipeline pipeline = pipeline(); 5 final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle(); 6 allocHandle.reset(config); 7 8 boolean closed = false; 9 Throwable exception = null; 10 try { 11 try { 12 do { 13 int localRead = doReadMessages(readBuf); 14 if (localRead == 0) { 15 break; 16 } 17 if (localRead < 0) { 18 closed = true; 19 break; 20 } 21 22 allocHandle.incMessagesRead(localRead); 23 } while (allocHandle.continueReading()); 24 } catch (Throwable t) { 25 exception = t; 26 } 27 28 int size = readBuf.size(); 29 for (int i = 0; i < size; i ++) { 30 readPending = false; 31 pipeline.fireChannelRead(readBuf.get(i)); 32 } 33 readBuf.clear(); 34 allocHandle.readComplete(); 35 pipeline.fireChannelReadComplete(); 36 37 if (exception != null) { 38 closed = closeOnReadError(exception); 39 40 pipeline.fireExceptionCaught(exception); 41 } 42 43 if (closed) { 44 inputShutdown = true; 45 if (isOpen()) { 46 close(voidPromise()); 47 } 48 } 49 } finally { 50 if (!readPending && !config.isAutoRead()) { 51 removeReadOp(); 52 } 53 } 54 } 55 }
核心在doReadMessages方法,由NioServerSocketChannel實現:
1 protected int doReadMessages(List<Object> buf) throws Exception { 2 SocketChannel ch = SocketUtils.accept(javaChannel()); 3 4 try { 5 if (ch != null) { 6 buf.add(new NioSocketChannel(this, ch)); 7 return 1; 8 } 9 } catch (Throwable t) { 10 logger.warn("Failed to create a new channel from an accepted socket.", t); 11 12 try { 13 ch.close(); 14 } catch (Throwable t2) { 15 logger.warn("Failed to close a socket.", t2); 16 } 17 } 18 19 return 0; 20 }
SocketUtils的accept方法其實就是用來呼叫JDK中ServerSocketChannel原生的accept方法,來得到一個JDK的SocketChannel物件,然後通過這個SocketChannel物件,將其包裝成NioSocketChannel物件新增在buf這個List中
由此可以看到doReadMessages用來偵聽所有就緒的連線,包裝成NioSocketChannel將其放在List中
然後遍歷這個List,呼叫 NioServerSocketChannel的pipeline的fireChannelRead方法,傳遞channelRead請求,、
在前面向pipeline中添加了ServerBootstrapAcceptor這個ChannelHandler,此時,它也會響應這個請求,回撥channelRead方法:
1 public void channelRead(ChannelHandlerContext ctx, Object msg) { 2 final Channel child = (Channel) msg; 3 4 child.pipeline().addLast(childHandler); 5 6 setChannelOptions(child, childOptions, logger); 7 8 for (Entry<AttributeKey<?>, Object> e: childAttrs) { 9 child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue()); 10 } 11 12 try { 13 childGroup.register(child).addListener(new ChannelFutureListener() { 14 @Override 15 public void operationComplete(ChannelFuture future) throws Exception { 16 if (!future.isSuccess()) { 17 forceClose(child, future.cause()); 18 } 19 } 20 }); 21 } catch (Throwable t) { 22 forceClose(child, t); 23 } 24 }
msg就是偵聽到的NioSocketChannel物件,給該物件的pipeline新增childHandler,也就是我們在ServerBootstrap中通過childHandler方法新增的
然後通過register方法完成對NioSocketChannel的註冊(和NioServerSocketChannel註冊邏輯一樣)
至此Netty服務端的啟動結