Netty原始碼分析 (三)----- 服務端啟動原始碼分析
本文接著前兩篇文章來講,主要講服務端類剩下的部分,我們還是來先看看服務端的程式碼
/** * Created by chenhao on 2019/9/4. */ public final class SimpleServer { public static void main(String[] args) throws Exception { EventLoopGroup bossGroup = new NioEventLoopGroup(1); EventLoopGroup workerGroup = new NioEventLoopGroup(); try { ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .handler(new SimpleServerHandler()) .childHandler(new SimpleServerInitializer()) .option(ChannelOption.SO_BACKLOG, 128) .childOption(ChannelOption.SO_KEEPALIVE, true); ChannelFuture f = b.bind(8888).sync(); f.channel().closeFuture().sync(); } finally { bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } }
在前面兩篇博文中從原始碼的角度分析瞭如下幾行程式碼主要做了哪些工作。
EventLoopGroup bossGroup = new NioEventLoopGroup(1); EventLoopGroup workerGroup = new NioEventLoopGroup(); ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .handler(new SimpleServerHandler()) .childHandler(new SimpleServerInitializer()) .option(ChannelOption.SO_BACKLOG, 128) .childOption(ChannelOption.SO_KEEPALIVE, true);
本篇博文將從原始碼的角度分析ChannelFuture f = b.bind(8888).sync()
的內部實現。這樣就完成了Netty伺服器端啟動過程的原始碼分析。
原始碼分析ChannelFuture f = b.bind(8888).sync()
AbstractBootstrap.java
public ChannelFuture bind(int inetPort) { return bind(new InetSocketAddress(inetPort)); }
我們接著看過載的bind
public ChannelFuture bind(SocketAddress localAddress) { validate();//相關引數的檢查 if (localAddress == null) { throw new NullPointerException("localAddress"); } return doBind(localAddress);//下面將分析 }
該函式主要看兩點:validate()和doBind(localAddress)
validate()方法
//函式功能:檢查相關引數是否設定了 @SuppressWarnings("unchecked") public B validate() { if (group == null) {//這裡的group指的是:b.group(bossGroup, workerGroup)程式碼中的bossGroup throw new IllegalStateException("group not set"); } if (channelFactory == null) { throw new IllegalStateException("channel or channelFactory not set"); } return (B) this; }
該方法主要檢查了兩個引數,一個是group,一個是channelFactory,在這裡可以想一想這兩個引數是在哪裡以及何時被賦值的?答案是在如下程式碼塊中被賦值的,其中是將bossGroup賦值給了group,將BootstrapChannelFactory賦值給了channelFactory.
ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class)
doBind(localAddress)方法
doBind方法的原始碼如下:
private ChannelFuture doBind(final SocketAddress localAddress) { final ChannelFuture regFuture = initAndRegister();//1 final Channel channel = regFuture.channel();//2 if (regFuture.cause() != null) { return regFuture; } final ChannelPromise promise; if (regFuture.isDone()) { promise = channel.newPromise(); doBind0(regFuture, channel, localAddress, promise); } else { // Registration future is almost always fulfilled already, but just in case it's not. promise = new PendingRegistrationPromise(channel); regFuture.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { doBind0(regFuture, channel, localAddress, promise); } }); } return promise; }
doBind這個函式是我們要分析的重點,這個函式的主要工作有如下幾點:
1、通過initAndRegister()方法得到一個ChannelFuture的例項regFuture。
2、通過regFuture.cause()方法判斷是否在執行initAndRegister方法時產生來異常。如果產生來異常,則直接返回,如果沒有產生異常則進行第3步。
3、通過regFuture.isDone()來判斷initAndRegister方法是否執行完畢,如果執行完畢來返回true,然後呼叫doBind0進行socket繫結。如果沒有執行完畢則返回false進行第4步。
4、regFuture會新增一個ChannelFutureListener監聽,當initAndRegister執行完成時,呼叫operationComplete方法並執行doBind0進行socket繫結。
第3、4點想幹的事就是一個:呼叫doBind0方法進行socket繫結。
下面將分成4部分對每行程式碼具體做了哪些工作進行詳細分析。
initAndRegister()
該方法的具體程式碼如下:
final ChannelFuture initAndRegister() { //結論:這裡的channel為一個NioServerSocketChannel物件,具體分析見後面 final Channel channel = channelFactory().newChannel();//1 try { init(channel);//2 } catch (Throwable t) { channel.unsafe().closeForcibly(); // as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t); } ChannelFuture regFuture = group().register(channel);//3 if (regFuture.cause() != null) { if (channel.isRegistered()) { channel.close(); } else { channel.unsafe().closeForcibly(); } } return regFuture; }
通過函式名以及內部呼叫的函式可以猜測該函式幹了兩件事情:
1、初始化一個Channel,要想初始化,肯定要先得到一個Channel。
final Channel channel = channelFactory().newChannel();//1 init(channel);//2
2、將Channel進行註冊。
ChannelFuture regFuture = group().register(channel);//3
下面我們將分析這幾行程式碼內部幹來些什麼。
final Channel channel = channelFactory().newChannel();
在上一篇文章中(Netty原始碼分析 (二)----- ServerBootstrap)分析中,我們知道b.channel(NioServerSocketChannel.class)的功能為:設定父類屬性channelFactory 為: BootstrapChannelFactory類的物件。其中這裡BootstrapChannelFactory物件中包括一個clazz屬性為:NioServerSocketChannel.class
因此,final Channel channel = channelFactory().newChannel();就是呼叫的BootstrapChannelFactory類中的newChannel()方法,該方法的具體內容為:
private static final class BootstrapChannelFactory<T extends Channel> implements ChannelFactory<T> { private final Class<? extends T> clazz; BootstrapChannelFactory(Class<? extends T> clazz) { this.clazz = clazz; } @Override public T newChannel() { try { return clazz.newInstance(); } catch (Throwable t) { throw new ChannelException("Unable to create Channel from class " + clazz, t); } } @Override public String toString() { return StringUtil.simpleClassName(clazz) + ".class"; } }
看到這個類,我們可以得到的結論:final Channel channel = channelFactory().newChannel();
這行程式碼的作用為通過反射產生來一個NioServerSocketChannel類的例項。
NioServerSocketChannel構造器
下面將看下NioServerSocketChannel類的建構函式做了哪些工作。
NioServerSocketChannel類的繼承體系結構如下:
其無參建構函式如下:
public NioServerSocketChannel() { this(newSocket(DEFAULT_SELECTOR_PROVIDER)); }
無參建構函式中SelectorProvider DEFAULT_SELECTOR_PROVIDER = SelectorProvider.provider()
。
函式newSocket的功能為:利用SelectorProvider產生一個SocketChannelImpl物件。
private static ServerSocketChannel newSocket(SelectorProvider provider) { try { return provider.openServerSocketChannel(); } catch (IOException e) { throw new ChannelException( "Failed to open a server socket.", e); } } public SocketChannel openSocketChannel() throws IOException { return new SocketChannelImpl(this); }
無參建構函式通過newSocket函式產生了一個SocketChannelImpl物件
然後呼叫瞭如下建構函式,我們繼續看
public NioServerSocketChannel(ServerSocketChannel channel) { super(null, channel, SelectionKey.OP_ACCEPT); config = new NioServerSocketChannelConfig(this, javaChannel().socket()); } //父類AbstractNioMessageChannel的建構函式 protected AbstractNioMessageChannel(Channel parent, SelectableChannel ch, int readInterestOp) { super(parent, ch, readInterestOp); } //父類 AbstractNioChannel的建構函式 protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) { super(parent); this.ch = ch; this.readInterestOp = readInterestOp;//SelectionKey.OP_ACCEPT try { ch.configureBlocking(false);//設定當前的ServerSocketChannel為非阻塞的 } catch (IOException e) { try { ch.close(); } catch (IOException e2) { if (logger.isWarnEnabled()) { logger.warn( "Failed to close a partially initialized socket.", e2); } } throw new ChannelException("Failed to enter non-blocking mode.", e); } } //父類AbstractChannel的建構函式 protected AbstractChannel(Channel parent) { this.parent = parent; unsafe = newUnsafe(); pipeline = new DefaultChannelPipeline(this); }
new NioServerSocketChannel()產生一個例項物件時,呼叫上面這麼多建構函式主要乾了兩件事情:
1、產生來一個SocketChannelImpl類的例項,設定到ch屬性中,並設定為非阻塞的。
this.ch = ch;
ch.configureBlocking(false);
2、設定了config屬性
config = new NioServerSocketChannelConfig(this, javaChannel().socket()
3、設定SelectionKey.OP_ACCEPT事件
this.readInterestOp = readInterestOp;//SelectionKey.OP_ACCEPT
4、設定unsafe屬性
@Override protected AbstractNioUnsafe newUnsafe() { return new NioMessageUnsafe(); }
主要作用為:用來負責底層的connect、register、read和write等操作。
5、設定pipeline屬性
pipeline = new DefaultChannelPipeline(this);
每個Channel都有自己的pipeline,當有請求事件發生時,pipeline負責呼叫相應的hander進行處理。
這些屬性在後面都會用到,至於NioServerSocketChannel 物件中的unsafe、pipeline屬性的具體實現後面進行分析。
結論:final Channel channel = channelFactory().newChannel();這行程式碼的作用為通過反射產生來一個NioServerSocketChannel類的例項,其中這個NioServerSocketChannel類物件有這樣幾個屬性:SocketChannel、NioServerSocketChannelConfig 、SelectionKey.OP_ACCEPT事件、NioMessageUnsafe、DefaultChannelPipeline
init(channel)
init方法的具體程式碼如下:
@Override void init(Channel channel) throws Exception { //1、設定新接入channel的option final Map<ChannelOption<?>, Object> options = options(); synchronized (options) { channel.config().setOptions(options);//NioServerSocketChannelConfig } //2、設定新接入channel的attr final Map<AttributeKey<?>, Object> attrs = attrs(); synchronized (attrs) { for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) { @SuppressWarnings("unchecked") AttributeKey<Object> key = (AttributeKey<Object>) e.getKey(); channel.attr(key).set(e.getValue()); } } //3、設定handler到pipeline上 ChannelPipeline p = channel.pipeline(); if (handler() != null) {//這裡的handler()返回的就是第二部分.handler(new SimpleServerHandler())所設定的SimpleServerHandler p.addLast(handler()); } final EventLoopGroup currentChildGroup = childGroup; final ChannelHandler currentChildHandler = childHandler; final Entry<ChannelOption<?>, Object>[] currentChildOptions; final Entry<AttributeKey<?>, Object>[] currentChildAttrs; synchronized (childOptions) { currentChildOptions = childOptions.entrySet().toArray(newOptionArray(childOptions.size())); } synchronized (childAttrs) { currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(childAttrs.size())); } //p.addLast()向serverChannel的流水線處理器中加入了一個ServerBootstrapAcceptor,從名字上就可以看出來,這是一個接入器,專門接受新請求,把新的請求扔給某個事件迴圈器 p.addLast(new ChannelInitializer<Channel>() { @Override public void initChannel(Channel ch) throws Exception { ch.pipeline().addLast(new ServerBootstrapAcceptor( currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs)); } }); }
該函式的功能為:
1、設定channel的options
如果沒有設定,則options為空,該屬性在ServerBootstrap類中的定義如下
Map<ChannelOption<?>, Object> options = new LinkedHashMap<ChannelOption<?>, Object>();
options可能如下:
public <T> boolean setOption(ChannelOption<T> option, T value) { validate(option, value); if (option == CONNECT_TIMEOUT_MILLIS) { setConnectTimeoutMillis((Integer) value); } else if (option == MAX_MESSAGES_PER_READ) { setMaxMessagesPerRead((Integer) value); } else if (option == WRITE_SPIN_COUNT) { setWriteSpinCount((Integer) value); } else if (option == ALLOCATOR) { setAllocator((ByteBufAllocator) value); } else if (option == RCVBUF_ALLOCATOR) { setRecvByteBufAllocator((RecvByteBufAllocator) value); } else if (option == AUTO_READ) { setAutoRead((Boolean) value); } else if (option == AUTO_CLOSE) { setAutoClose((Boolean) value); } else if (option == WRITE_BUFFER_HIGH_WATER_MARK) { setWriteBufferHighWaterMark((Integer) value); } else if (option == WRITE_BUFFER_LOW_WATER_MARK) { setWriteBufferLowWaterMark((Integer) value); } else if (option == MESSAGE_SIZE_ESTIMATOR) { setMessageSizeEstimator((MessageSizeEstimator) value); } else { return false; } return true; }
2、設定channel的attrs
如果沒有設定,則attrs為空,該屬性在ServerBootstrap類中的定義如下
private final Map<AttributeKey<?>, Object> attrs = new LinkedHashMap<AttributeKey<?>, Object>();
3、設定handler到channel的pipeline上
其中,這裡的handler為:在博文(Netty原始碼分析 (二)----- ServerBootstrap)中分析的通過b.handler(new SimpleServerHandler())
所設定的SimpleServerHandler物件
4、在pipeline上新增來一個ChannelInitializer物件,其中重寫來initChannel方法。該方法通過p.addLast()向serverChannel的流水線處理器中加入了一個 ServerBootstrapAcceptor,
從名字上就可以看出來,這是一個接入器,專門接受新請求,把新的請求扔給某個事件迴圈器
看到這裡,我們發現其實init只是初始化了一些基本的配置和屬性,以及在pipeline上加入了一個接入器,用來專門接受新連線,並沒有啟動服務.
group().register(channel)
回到 initAndRegister 方法中,繼續看 config().group().register(channel) 這行程式碼,config 方法返回了 ServerBootstrapConfig,這個 ServerBootstrapConfig 呼叫了 group 方法,實際上就是 bossGroup。bossGroup 呼叫了 register 方法。
前面的分析我們知道group為:NioEvenLoopGroup,其繼承MultithreadEventLoopGroup,該類中的register方法如下:
@Override public ChannelFuture register(Channel channel) { return next().register(channel);//呼叫了NioEvenLoop物件中的register方法,NioEventLoop extends SingleThreadEventLoop }
next()方法的程式碼如下,其功能為選擇下一個NioEventLoop物件。
@Override public EventExecutor next() { return chooser.next();//呼叫MultithreadEventExecutorGroup中的next方法 }
根據執行緒個數nThreads是否為2的冪次方來選擇chooser,其中這兩個chooser為: PowerOfTwoEventExecutorChooser、GenericEventExecutorChooser,這兩個chooser功能都是一樣,只是求餘的方式不一樣。
next()方法返回的是一個NioEvenLoop物件
private final class PowerOfTwoEventExecutorChooser implements EventExecutorChooser { @Override public EventExecutor next() { return children[childIndex.getAndIncrement() & children.length - 1];//利用2的N次方法的特點,使用&求餘更快。 } } private final class GenericEventExecutorChooser implements EventExecutorChooser { @Override public EventExecutor next() { return children[Math.abs(childIndex.getAndIncrement() % children.length)]; } }
結論:由於NioEventLoopGroup中維護著多個NioEventLoop,next方法回撥用chooser策略找到下一個NioEventLoop,並執行該物件的register方法進行註冊。
由於NioEventLoop extends SingleThreadEventLoop,NioEventLoop沒有重寫該方法,因此看 SingleThreadEventLoop類中的register方法
@Override public ChannelFuture register(Channel channel) { return register(channel, new DefaultChannelPromise(channel, this)); } @Override public ChannelFuture register(final Channel channel, final ChannelPromise promise) { if (channel == null) { throw new NullPointerException("channel"); } if (promise == null) { throw new NullPointerException("promise"); } channel.unsafe().register(this, promise); return promise; }
在本博文第1部分的NioServerSocketChannel例項化中設定來unsafe屬性,具體是呼叫如下的方法來設定的,因此這裡的channel.unsafe()
就是NioMessageUnsafe例項。
@Override protected AbstractNioUnsafe newUnsafe() { return new NioMessageUnsafe(); }
channel.unsafe().register(this, promise)這行程式碼呼叫的是AbstractUnsafe類中的register方法,具體程式碼如下:
@Override public final void register(EventLoop eventLoop, final ChannelPromise promise) { if (eventLoop == null) { throw new NullPointerException("eventLoop"); } //判斷該channel是否已經被註冊到EventLoop中 if (isRegistered()) { promise.setFailure(new IllegalStateException("registered to an event loop already")); return; } if (!isCompatible(eventLoop)) { promise.setFailure( new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName())); return; } //1 將eventLoop設定在NioServerSocketChannel上 AbstractChannel.this.eventLoop = eventLoop; //判斷當前執行緒是否為該EventLoop中擁有的執行緒,如果是,則直接註冊,如果不是,則新增一個任務到該執行緒中 if (eventLoop.inEventLoop()) { register0(promise); } else { try { eventLoop.execute(new OneTimeTask() { //重點 @Override public void run() { register0(promise);//分析 } }); } catch (Throwable t) { logger.warn( "Force-closing a channel whose registration task was not accepted by an event loop: {}", AbstractChannel.this, t); closeForcibly(); closeFuture.setClosed(); safeSetFailure(promise, t); } } }
上面的重點是register0(promise)方法。基本邏輯為:
1、通過呼叫eventLoop.inEventLoop()方法判斷當前執行緒是否為該EventLoop中擁有的執行緒,如果是,則直接註冊,如果不是,說明該EventLoop在等待並沒有執行權,則進行第二步。
AbstractEventExecutor.java
@Override public boolean inEventLoop() { return inEventLoop(Thread.currentThread()); }
SingleThreadEventExecutor.java
@Override public boolean inEventLoop(Thread thread) { return thread == this.thread; }
2、既然該EventLoop中的執行緒此時沒有執行權,但是我們可以提交一個任務到該執行緒中,等該EventLoop的執行緒有執行權的時候就自然而然的會執行此任務,而該任務負責呼叫register0方法,這樣也就達到了呼叫register0方法的目的。
下面看register0這個方法,具體程式碼如下:
private void register0(ChannelPromise promise) { try { // check if the channel is still open as it could be closed in the mean time when the register // call was outside of the eventLoop if (!promise.setUncancellable() || !ensureOpen(promise)) { return; } doRegister(); registered = true; safeSetSuccess(promise); //執行完,控制檯輸出:channelRegistered pipeline.fireChannelRegistered(); if (isActive()) { //分析 pipeline.fireChannelActive(); } } catch (Throwable t) { // Close the channel directly to avoid FD leak. closeForcibly(); closeFuture.setClosed(); safeSetFailure(promise, t); } }
在上面的程式碼中,是通過呼叫doRegister()方法完成NioServerSocketChannel的註冊,該方法的具體程式碼如下:
@Override protected void doRegister() throws Exception { boolean selected = false; for (;;) { try { selectionKey = javaChannel().register(eventLoop().selector, 0, this); return; } catch (CancelledKeyException e) { if (!selected) { // Force the Selector to select now as the "canceled" SelectionKey may still be // cached and not removed because no Select.select(..) operation was called yet. eventLoop().selectNow(); selected = true; } else { // We forced a select operation on the selector before but the SelectionKey is still cached // for whatever reason. JDK bug ? throw e; } } } } protected SelectableChannel javaChannel() { return ch; }
在本博文的第1部分的NioServerSocketChannel的例項化分析中,我們知道這裡的javaChannel()方法返回的ch為例項化NioServerSocketChannel時產生的一個SocketChannelImpl類的例項,並設定為非阻塞的,具體見本博文的第1部分。
selectionKey = javaChannel().register(eventLoop().selector, 0, this);就完成了ServerSocketChannel註冊到Selector中。
回顧下,這裡的eventLoop().selector是什麼?答案是:KQueueSelectorImpl物件。
NioEventLoop(NioEventLoopGroup parent, ThreadFactory threadFactory, SelectorProvider selectorProvider) { super(parent, threadFactory, false); if (selectorProvider == null) { throw new NullPointerException("selectorProvider"); } provider = selectorProvider; selector = openSelector(); } private Selector openSelector() { final Selector selector; try { selector = provider.openSelector(); } catch (IOException e) { throw new ChannelException("failed to open a new selector", e); } //...省略了一部分程式碼 return selector; }
ServerSocketChannel註冊完之後,接著執行pipeline.fireChannelRegistered方法。
public final ChannelPipeline fireChannelRegistered() { AbstractChannelHandlerContext.invokeChannelRegistered(this.head); return this; }
我們看到invokeChannelRegistered(this.head)傳的引數是head,這個head我們再下一篇文章中講,繼續往下看
static void invokeChannelRegistered(final AbstractChannelHandlerContext next) { EventExecutor executor = next.executor(); if (executor.inEventLoop()) { next.invokeChannelRegistered(); } else { executor.execute(new Runnable() { public void run() { next.invokeChannelRegistered(); } }); } }
看next.invokeChannelRegistered();
private void invokeChannelRegistered() { if (this.invokeHandler()) { try { ((ChannelInboundHandler)this.handler()).channelRegistered(this); } catch (Throwable var2) { this.notifyHandlerException(var2); } } else { this.fireChannelRegistered(); } }
接著看看this.handler(),實際上就是head的handler()
public ChannelHandler handler() { return this; }
返回的是this,那接著看 head中的channelRegistered(this)
public void channelRegistered(ChannelHandlerContext ctx) throws Exception { DefaultChannelPipeline.this.invokeHandlerAddedIfNeeded(); ctx.fireChannelRegistered(); }
繼續看ctx.fireChannelRegistered();
public ChannelHandlerContext fireChannelRegistered() { invokeChannelRegistered(this.findContextInbound()); return this; }
我們看看this.findContextInbound()
private AbstractChannelHandlerContext findContextInbound() { AbstractChannelHandlerContext ctx = this; do { ctx = ctx.next; } while(!ctx.inbound); return ctx; }
我們看到 ctx = ctx.next; 實際上是從head開始找,找到第一個 inbound 的hander
static void invokeChannelRegistered(final AbstractChannelHandlerContext next) { EventExecutor executor = next.executor(); if (executor.inEventLoop()) { next.invokeChannelRegistered(); } else { executor.execute(new Runnable() { public void run() { next.invokeChannelRegistered(); } }); } }
最後執行next.invokeChannelRegistered();
pipeline中維護了handler連結串列,還記得之前.handler(new SimpleServerHandler())初始化的handler在本博文的第1.2部分的分析中介紹了此handler被新增到此pipeline中了,通過遍歷連結串列,執行InBound型別handler的channelRegistered方法
因此執行到這裡,我們的控制檯就回輸出:channelRegistered,這行資訊。
到這裡,我們就將doBind方法final ChannelFuture regFuture = initAndRegister();給分析完了,得到的結論如下:
1、通過反射產生了一個NioServerSocketChannle物件。
2、完成了初始化
3、將NioServerSocketChannel進行了註冊。
接下來我們分析doBind方法的剩餘部分程式碼主要做了什麼,
原始碼如下:
private ChannelFuture doBind(final SocketAddress localAddress) { final ChannelFuture regFuture = initAndRegister();//1 final Channel channel = regFuture.channel();//2 if (regFuture.cause() != null) { return regFuture; } final ChannelPromise promise; if (regFuture.isDone()) { promise = channel.newPromise(); doBind0(regFuture, channel, localAddress, promise); } else { // Registration future is almost always fulfilled already, but just in case it's not. promise = new PendingRegistrationPromise(channel); regFuture.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { doBind0(regFuture, channel, localAddress, promise); } }); } return promise; }
doBind0(regFuture, channel, localAddress, promise);
private static void doBind0( final ChannelFuture regFuture, final Channel channel, final SocketAddress localAddress, final ChannelPromise promise) { // This method is invoked before channelRegistered() is triggered. Give user handlers a chance to set up // the pipeline in its channelRegistered() implementation. channel.eventLoop().execute(new Runnable() { @Override public void run() { if (regFuture.isSuccess()) { channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE); } else { promise.setFailure(regFuture.cause()); } } }); }
該函式主要是提交了一個Runnable任務到NioEventLoop執行緒中來進行處理。,這裡先看一下NioEventLoop類的execute方法
@Override public void execute(Runnable task) { if (task == null) { throw new NullPointerException("task"); } boolean inEventLoop = inEventLoop();//判斷當前執行緒是否為該NioEventLoop所關聯的執行緒,如果是,則新增任務到任務佇列中,如果不是,則先啟動執行緒,然後新增任務到任務佇列中去 if (inEventLoop) { addTask(task); } else { startThread(); addTask(task); //如果 if (isShutdown() && removeTask(task)) { reject(); } } if (!addTaskWakesUp && wakesUpForTask(task)) { wakeup(inEventLoop); } }
當提交的任務被執行緒執行後,則會執行channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE)這行程式碼,這行程式碼完成的功能為:實現channel與埠的繫結。
具體如下:
AbstractChannel.java @Override public ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) { return pipeline.bind(localAddress, promise); }
在該方法中直接呼叫了pipeline的bind方法,這裡的pipeline時DefaultChannelPipeline的例項。
DefaultChannelPipeline.java @Override public ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) { return tail.bind(localAddress, promise); }
在上面方法中直接呼叫了TailContext例項tail的bind方法,tail在下一篇博文中有詳細的介紹。繼續看tail例項的bind方法
AbstractChannelHandlerContext.java @Override public ChannelFuture bind(final SocketAddress localAddress, final ChannelPromise promise) { //...省略有效性檢查 final AbstractChannelHandlerContext next = findContextOutbound();// EventExecutor executor = next.executor(); if (executor.inEventLoop()) { next.invokeBind(localAddress, promise); } else { safeExecute(executor, new OneTimeTask() { @Override public void run() { next.invokeBind(localAddress, promise); } }, promise, null); } return promise; }
此上面bind函式中的這行程式碼:final AbstractChannelHandlerContext next = findContextOutbound();所完成的任務就是在pipeline所持有的以AbstractChannelHandlerContext為節點的雙向連結串列中從尾節點tail開始向前尋找第一個outbound=true的handler節點。
private AbstractChannelHandlerContext findContextOutbound() { AbstractChannelHandlerContext ctx = this; do { ctx = ctx.prev; } while (!ctx.outbound); return ctx; }
在 DefaultChannelPipeline 的構造器中, 會例項化兩個物件: head 和 tail, 並形成了雙向連結串列的頭和尾。 head 是 HeadContext 的例項, 它實現了 ChannelOutboundHandler 介面和ChannelInboundHandler 介面, 並且它的 outbound 欄位為 true.而tail 是 TailContext 的例項,它實現了ChannelInboundHandler 介面,並且其outbound 欄位為 false,inbound 欄位為true。 基於此在如上的bind函式中呼叫了 findContextOutbound方法 找到的 AbstractChannelHandlerContext 物件其實就是 head.
繼續看,在pipelie的雙向連結串列中找到第一個outbound=true的AbstractChannelHandlerContext節點head後,然後呼叫此節點的invokeConnect方法,該方法的程式碼如下:
private void invokeBind(SocketAddress localAddress, ChannelPromise promise) { try { ((ChannelOutboundHandler) handler()).bind(this, localAddress, promise); } catch (Throwable t) { notifyOutboundHandlerException(t, promise); } }
HeadContext類中的handler()方法程式碼如下:
@Override public ChannelHandler handler() { return this; }
該方法返回的是其本身,這是因為HeadContext由於其繼承AbstractChannelHandlerContext以及實現了ChannelHandler介面使其具有Context和Handler雙重特性。
繼續看,看HeadContext類中的bind方法,程式碼如下:
@Override public void bind( ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) throws Exception { unsafe.bind(localAddress, promise); }
unsafe這個欄位是在HeadContext建構函式中被初始化的,如下:
HeadContext(DefaultChannelPipeline pipeline) { super(pipeline, null, HEAD_NAME, false, true); unsafe = pipeline.channel().unsafe(); }
而此建構函式中的pipeline.channel().unsafe()這行程式碼返回的就是在本博文前面研究NioServerSocketChannel這個類的建構函式中所初始化的一個例項,如下:
unsafe = newUnsafe();//newUnsafe()方法返回的是NioMessageUnsafe物件。
接下來看NioMessageUnsafe類中的bind方法(準確來說:該方法在AbstractUnsafe中),該類bind具體方法程式碼如下:
@Override public final void bind(final SocketAddress localAddress, final ChannelPromise promise) { //...省略了部分程式碼 boolean wasActive = isActive(); try { doBind(localAddress);//核心程式碼 } catch (Throwable t) { safeSetFailure(promise, t); closeIfClosed(); return; } if (!wasActive && isActive()) { invokeLater(new OneTimeTask() { @Override public void run() { pipeline.fireChannelActive(); } }); } safeSetSuccess(promise); }
上面的核心程式碼就是:doBind(localAddress);需要注意的是,此doBind方法是在NioServerSocketChannel類中的doBind方法,不是其他類中的。
NioServerSocketChannel類中的doBind方法程式碼如下:
@Override protected void doBind(SocketAddress localAddress) throws Exception { javaChannel().socket().bind(localAddress, config.getBacklog()); }
上面方法中javaChannel()方法返回的是NioServerSocketChannel例項初始化時所產生的Java NIO ServerSocketChannel例項(更具體點為ServerSocketChannelImple例項)。 等價於語句serverSocketChannel.socket().bind(localAddress)完成了指定埠的繫結,這樣就開始監聽此埠。繫結埠成功後,是這裡呼叫了我們自定義handler的channelActive方法,在繫結之前,isActive()方法返回false,繫結之後返回true。
@Override public boolean isActive() { return javaChannel().socket().isBound(); }
這樣,就進入瞭如下的if條件的程式碼塊中
if (!wasActive && isActive()) { invokeLater(new OneTimeTask() { @Override public void run() { pipeline.fireChannelActive(); } }); } private void invokeLater(Runnable task) { try { //省略了部分程式碼 eventLoop().execute(task); } catch (RejectedExecutionException e) { logger.warn("Can't invoke task later as EventLoop rejected it", e); } }
進而開始執行 pipeline.fireChannelActive();這行程式碼 ,這行程式碼的具體呼叫鏈如下所示:
DefaultChannelPipeline.java
@Override public ChannelPipeline fireChannelActive() { head.fireChannelActive(); if (channel.config().isAutoRead()) { channel.read(); } return this; } @Override public ChannelHandlerContext fireChannelActive() { final AbstractChannelHandlerContext next = findContextInbound(); EventExecutor executor = next.executor(); if (executor.inEventLoop()) { next.invokeChannelActive(); } else { executor.execute(new OneTimeTask() { @Override public void run() { next.invokeChannelActive(); } }); } return this; } private void invokeChannelActive() { try { ((ChannelInboundHandler) handler()).channelActive(this); } catch (Throwable t) { notifyHandlerException(t); } }
總結
最後,我們來做下總結,netty啟動一個服務所經過的流程
1.設定啟動類引數,最重要的就是設定channel
2.建立server對應的channel,建立各大元件,包括ChannelConfig,ChannelId,ChannelPipeline,ChannelHandler,Unsafe等
3.init 初始化這個 NioServerSocketChannel,設定一些attr,option,以及設定子channel的attr,option,給server的channel新增新channel接入器,並觸發addHandler事件
4.config().group().register(channel) 通過 ServerBootstrap 的 bossGroup 根據group長度取模得到NioEventLoop ,將 NioServerSocketChannel 註冊到 NioEventLoop 中的 selector 上,然後觸發 channelRegistered事件
5.呼叫到jdk底層做埠繫結,並觸發active事件