Netty(五)服務端啟動過程原始碼分析——好文摘抄
下面先來一段 Netty 服務端的程式碼:
public class NettyServer { public void bind(int port){ // 建立EventLoopGroup EventLoopGroup bossGroup = new NioEventLoopGroup(); //建立BOSS執行緒組 用於服務端接受客戶端的連線 EventLoopGroup workerGroup = new NioEventLoopGroup(); //建立WORK執行緒組 用於進行SocketChannel的網路讀寫 try { // 建立ServerBootStrap例項 // ServerBootstrap 用於啟動NIO服務端的輔助啟動類,目的是降低服務端的開發複雜度 ServerBootstrap b = new ServerBootstrap(); // 繫結Reactor執行緒池 b.group(bossGroup, workerGroup) // 設定並繫結服務端Channel // 指定所使用的NIO傳輸的Channel .channel(NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG, 1024) .handler(new LoggingServerHandler()) .childHandler(new ChannelInitializer(){ @Override protected void initChannel(Channel ch) throws Exception { //do something } }); // 繫結埠,同步等待成功 ChannelFuture future = b.bind(port).sync(); // 等待服務端監聽埠關閉 future.channel().closeFuture().sync(); } catch (InterruptedException e) { e.printStackTrace(); } finally { // 優雅地關閉 bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } private class LoggingServerHandler extends ChannelInboundHandlerAdapter{ @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { System.out.println("loggin-channelActive"); } @Override public void channelRegistered(ChannelHandlerContext ctx) throws Exception { System.out.println("loggin-channelRegistered"); } @Override public void handlerAdded(ChannelHandlerContext ctx) throws Exception { System.out.println("loggin-handlerAdded"); } } public static void main(String[] args){ new NettyServer().bind(8899); } }
上面程式碼為 Netty 伺服器端的完整程式碼,在整個服務端程式碼中會涉及如下幾個核心類。
ServerBootstrap
ServerBootstrap 為 Netty 服務端的啟動輔助類,它提供了一系列的方法用於設定服務端啟動相關的引數。
Channel
Channel 為 Netty 網路操作抽象類,它定義了一組功能,其提供的 API 大大降低了直接使用 Socket 類的複雜性。當然它也不僅僅只是包括了網路 IO 操作的基本功能,還包括一些與 Netty 框架相關的功能,包括獲取該 Channel 的 EventLoop 等等。
EventLoopGroup
EventLoopGroup 為 Netty 的 Reactor 執行緒池,它實際上就是 EventLoop 的容器,而 EventLoop 為 Netty 的核心抽象類,它的主要職責是處理所有註冊到本執行緒多路複用器 Selector 上的 Channel。
ChannelHandler
ChannelHandler 作為 Netty 的主要元件,它主要負責 I/O 事件或者 I/O 操作進行攔截和處理,它可以選擇性地攔截和處理自己感覺興趣的事件,也可以透傳和終止事件的傳遞。
ChannelPipeline
ChannelPipeline 是 ChannelHandler 鏈的容器,它負責 ChannelHandler 的管理和事件攔截與排程。每當新建一個 Channel 都會分配一個新的 ChannelPepeline,同時這種關聯是永久性的。
以上是簡要介紹,詳細介紹請參考(【死磕Netty】-----Netty的核心元件及其設計
)
服務端建立流程
Netty 服務端建立的時序圖,如下(摘自《Netty權威指南(第二版)》)
主要步驟為:
- 建立 ServerBootstrap 例項
- 設定並繫結 Reactor 執行緒池
- 設定並繫結服務端 Channel
- 建立並初始化 ChannelPipeline
- 新增並設定 ChannelHandler
- 繫結並啟動監聽埠
服務端原始碼分析
1、建立兩個EventLoopGroup
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
bossGroup 為 BOSS 執行緒組,用於服務端接受客戶端的連線, workerGroup 為 worker 執行緒組,用於進行 SocketChannel 的網路讀寫。當然也可以建立一個並共享。
2、建立ServerBootstrap例項
ServerBootstrap b = new ServerBootstrap();
ServerBootStrap為Netty服務端的啟動引導類,用於幫助使用者快速配置、啟動服務端服務。提供的方法如下:
方法名稱 | 方法描述 |
---|---|
group |
設定 ServerBootstrap 要用的 EventLoopGroup |
channel |
設定將要被例項化的 ServerChannel 類 |
option |
例項化的 ServerChannel 的配置項 |
childHandler |
設定並新增 ChannelHandler |
bind |
繫結 ServerChannel |
ServerBootStrap底層採用裝飾者模式。
關於 ServerBootStrap 我們後續做詳細分析。
3、設定並繫結Reactor執行緒池
呼叫 group()
方法,為 ServerBootstrap 例項設定並繫結 Reactor 執行緒池。
b.group(bossGroup, workerGroup)
EventLoopGroup 為 Netty 執行緒池,它實際上就是 EventLoop 的陣列容器。EventLoop 的職責是處理所有註冊到本執行緒多路複用器 Selector 上的 Channel,Selector 的輪詢操作由繫結的 EventLoop 執行緒 run 方法驅動,在一個迴圈體內迴圈執行。通俗點講就是一個死迴圈,不斷的檢測 I/O 事件、處理 I/O 事件。
這裡設定了兩個group,這個其實有點兒像我們工作一樣。需要兩型別的工人,一個老闆(bossGroup),一個工人(workerGroup),老闆負責從外面接活,工人則負責死命幹活(尼瑪,和我上家公司一模一樣)。所以這裡 bossGroup 的作用就是不斷地接收新的連線,接收之後就丟給 workerGroup 來處理,workerGroup 負責幹活就行(負責客戶端連線的 IO 操作)。
原始碼如下:
public ServerBootstrap group(EventLoopGroup parentGroup, EventLoopGroup childGroup) {
super.group(parentGroup); // 繫結boosGroup
if (childGroup == null) {
throw new NullPointerException("childGroup");
}
if (this.childGroup != null) {
throw new IllegalStateException("childGroup set already");
}
this.childGroup = childGroup; // 繫結workerGroup
return this;
}
其中父 EventLoopGroup 傳遞到父類的建構函式中:
public B group(EventLoopGroup group) {
if (group == null) {
throw new NullPointerException("group");
}
if (this.group != null) {
throw new IllegalStateException("group set already");
}
this.group = group;
return (B) this;
}
4、設定並繫結服務端Channel
繫結執行緒池後,則需要設定 channel 型別,服務端用的是 NioServerSocketChannel 。
.channel(NioServerSocketChannel.class)
呼叫 ServerBootstrap.channel
方法用於設定服務端使用的 Channel,傳遞一個 NioServerSocketChannel Class物件,Netty通過工廠類,利用反射建立NioServerSocketChannel 物件,如下:
public B channel(Class<? extends C> channelClass) {
if (channelClass == null) {
throw new NullPointerException("channelClass");
}
return channelFactory(new ReflectiveChannelFactory<C>(channelClass));
}
channelFactory()
用於設定 Channel 工廠的:
public B channelFactory(io.netty.channel.ChannelFactory<? extends C> channelFactory) {
return channelFactory((ChannelFactory<C>) channelFactory);
}
public B channelFactory(ChannelFactory<? extends C> channelFactory) {
if (channelFactory == null) {
throw new NullPointerException("channelFactory");
}
if (this.channelFactory != null) {
throw new IllegalStateException("channelFactory set already");
}
this.channelFactory = channelFactory;
return (B) this;
}
這裡傳遞的是 ReflectiveChannelFactory,其原始碼如下:
public class ReflectiveChannelFactory<T extends Channel> implements ChannelFactory<T> {
private final Class<? extends T> clazz;
public ReflectiveChannelFactory(Class<? extends T> clazz) {
if (clazz == null) {
throw new NullPointerException("clazz");
}
this.clazz = clazz;
}
//需要建立 channel 的時候,該方法將被呼叫
@Override
public T newChannel() {
try {
// 反射建立對應 channel
return clazz.newInstance();
} catch (Throwable t) {
throw new ChannelException("Unable to create Channel from class " + clazz, t);
}
}
@Override
public String toString() {
return StringUtil.simpleClassName(clazz) + ".class";
}
}
確定服務端的 Channel(NioServerSocketChannel)後,呼叫 option()
方法設定 Channel 引數,作為服務端,主要是設定TCP的backlog引數,如下:
.option(ChannelOption.SO_BACKLOG, 1024)
option()
原始碼如下:
public <T> B option(ChannelOption<T> option, T value) {
if (option == null) {
throw new NullPointerException("option");
}
if (value == null) {
synchronized (options) {
options.remove(option);
}
} else {
synchronized (options) {
options.put(option, value);
}
}
return (B) this;
}
private final Map<ChannelOption<?>, Object> options = new LinkedHashMap<ChannelOption<?>, Object>();
五、新增並設定ChannelHandler
設定完 Channel 引數後,使用者可以為啟動輔助類和其父類分別指定 Handler。
.handler(new LoggingServerHandler())
.childHandler(new ChannelInitializer(){
//省略程式碼
})
這兩個 Handler 不一樣,前者(handler()
)設定的 Handler 是服務端 NioServerSocketChannel的,後者(childHandler()
)設定的 Handler 是屬於每一個新建的 NioSocketChannel 的。跟蹤原始碼會發現兩種所處的類不一樣,handler 位於 AbstractBootstrap 中,childHandler 位於 ServerBootstrap 中,如下:
// AbstractBootstrap
public B handler(ChannelHandler handler) {
if (handler == null) {
throw new NullPointerException("handler");
}
this.handler = handler;
return (B) this;
}
// ServerBootstrap
public ServerBootstrap childHandler(ChannelHandler childHandler) {
if (childHandler == null) {
throw new NullPointerException("childHandler");
}
this.childHandler = childHandler;
return this;
}
ServerBootstrap 中的 Handler 是 NioServerSocketChannel 使用的,所有連線該監聽埠的客戶端都會執行它,父類 AbstractBootstrap 中的 Handler 是一個工廠類,它為每一個新接入的客戶端都建立一個新的 Handler。如下圖(《Netty權威指南(第二版)》):
六、繫結埠,啟動服務
服務端最後一步,繫結埠並啟動服務,如下:
ChannelFuture future = b.bind(port).sync();
呼叫 ServerBootstrap 的 bind()
方法進行埠繫結:
public ChannelFuture bind(int inetPort) {
return bind(new InetSocketAddress(inetPort));
}
public ChannelFuture bind(SocketAddress localAddress) {
validate();
if (localAddress == null) {
throw new NullPointerException("localAddress");
}
return doBind(localAddress);
}
首先呼叫 validate()
方法進行引數校驗,然後呼叫 doBind()
方法:
private ChannelFuture doBind(final SocketAddress localAddress) {
// 初始化並註冊一個Channel
final ChannelFuture regFuture = initAndRegister();
final Channel channel = regFuture.channel();
if (regFuture.cause() != null) {
return regFuture;
}
// 註冊成功
if (regFuture.isDone()) {
// At this point we know that the registration was complete and successful.
ChannelPromise promise = channel.newPromise();
// 呼叫doBind0繫結
doBind0(regFuture, channel, localAddress, promise);
return promise;
} else {
// Registration future is almost always fulfilled already, but just in case it's not.
final AbstractBootstrap.PendingRegistrationPromise promise = new AbstractBootstrap.PendingRegistrationPromise(channel);
regFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
Throwable cause = future.cause();
if (cause != null) {
// Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an
// IllegalStateException once we try to access the EventLoop of the Channel.
promise.setFailure(cause);
} else {
// Registration was successful, so set the correct executor to use.
// See https://github.com/netty/netty/issues/2586
promise.registered();
doBind0(regFuture, channel, localAddress, promise);
}
}
});
return promise;
}
}
該方法涉及內容較多,我們分解來看,如下:
- 首先通過
initAndRegister()
得到一個 ChannelFuture 物件 regFuture; - 根據得到的 regFuture 物件判斷該物件是否丟擲異常 (
regFuture.cause()
),如果是,直接返回; - 根據
regFuture.isDone()
判斷initAndRegister()
是否執行完畢,如果執行完成,則呼叫doBind0
; - 若
initAndRegister()
沒有執行完畢,則向 regFuture 物件新增一個 ChannelFutureListener 監聽,當initAndRegister()
執行完畢後會呼叫operationComplete()
,在operationComplete()
中依然會判斷 ChannelFuture 是否丟擲異常,如果沒有則呼叫doBind0
進行繫結。
按照上面的步驟我們一步一步來剖析 doBind()
方法。
initAndRegister()
執行 initAndRegister()
會得到一個 ChannelFuture 物件 regFuture,程式碼如下:
final ChannelFuture initAndRegister() {
Channel channel = null;
try {
// 新建一個Channel
channel = channelFactory.newChannel();
// 初始化Channel
init(channel);
} catch (Throwable t) {
if (channel != null) {
channel.unsafe().closeForcibly();
}
return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);
}
// /向EventLoopGroup中註冊一個channel
ChannelFuture regFuture = config().group().register(channel);
if (regFuture.cause() != null) {
if (channel.isRegistered()) {
channel.close();
} else {
channel.unsafe().closeForcibly();
}
}
return regFuture;
}
首先呼叫 newChannel()
新建一個Channel,這裡是NioServerSocketChannel,還記前面 4、設定並繫結服務端Channel(.channel(NioServerSocketChannel.class)
)中 設定的Channel工廠類麼?在這裡派上用處了。在上面提到了通過反射的機制我們可以得到一個 NioServerSocketChannel 類的例項。那麼 NioServerSocketChannel 到底是一個什麼東西呢?如下圖:
上圖是 NioServerSocketChannel 的繼承體系結構圖, NioServerSocketChannel 在建構函式中會依靠父類來完成一項一項的初始化工作。先看 NioServerSocketChannel 建構函式。
public NioServerSocketChannel() {
this(newSocket(DEFAULT_SELECTOR_PROVIDER));
}
newSocket()
方法較為簡單,它是利用 SelectorProvider.openServerSocketChannel()
,產生一個 ServerSocketChannel 物件。
public NioServerSocketChannel(ServerSocketChannel channel) {
super(null, channel, SelectionKey.OP_ACCEPT);
config = new NioServerSocketChannelConfig(this, javaChannel().socket());
}
該建構函式首先是呼叫父類的構造方法,然後設定 config屬性。父類構造方法如下:
// AbstractNioMessageChannel
protected AbstractNioMessageChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
super(parent, ch, readInterestOp);
}
// AbstractNioChannel
protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
super(parent);
this.ch = ch;
this.readInterestOp = readInterestOp;
try {
ch.configureBlocking(false);
} catch (IOException e) {
try {
ch.close();
} catch (IOException e2) {
if (logger.isWarnEnabled()) {
logger.warn(
"Failed to close a partially initialized socket.", e2);
}
}
throw new ChannelException("Failed to enter non-blocking mode.", e);
}
}
// AbstractChannel
protected AbstractChannel(Channel parent) {
this.parent = parent;
id = newId();
unsafe = newUnsafe();
pipeline = newChannelPipeline();
}
通過 super()
,一層一層往上,直到 AbstractChannel。我們從最上層解析。
- AbstractChannel 設定了 unsafe (
unsafe = newUnsafe()
)和 pipeline(pipeline = newChannelPipeline()
); - AbstractNioChannel 將當前 ServerSocketChannel 設定成了非阻塞(
ch.configureBlocking(false);
),同時設定SelectionKey.OP_ACCEPT事件(this.readInterestOp = readInterestOp;
readInterestOp 值由 NioServerSocketChannel 中傳遞); - NioServerSocketChannel 設定 config屬性(
config = new NioServerSocketChannelConfig(this, javaChannel().socket())
)。
所以
channel = channelFactory.newChannel()
通過反射機制產生了 NioServerSocketChannel 類例項。同時該例項設定了NioMessageUnsafe、DefaultChannelPipeline、非阻塞、SelectionKey.OP_ACCEPT事件 和 NioServerSocketChannelConfig 屬性。
看完了 channelFactory.newChannel();
,我們再看 init()
。
void init(Channel channel) throws Exception {
// 設定配置的option引數
final Map<ChannelOption<?>, Object> options = options0();
synchronized (options) {
channel.config().setOptions(options);
}
final Map<AttributeKey<?>, Object> attrs = attrs0();
synchronized (attrs) {
for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {
@SuppressWarnings("unchecked")
AttributeKey<Object> key = (AttributeKey<Object>) e.getKey();
channel.attr(key).set(e.getValue());
}
}
// 獲取繫結的pipeline
ChannelPipeline p = channel.pipeline();
// 準備child用到的4個part
final EventLoopGroup currentChildGroup = childGroup;
final ChannelHandler currentChildHandler = childHandler;
final Entry<ChannelOption<?>, Object>[] currentChildOptions;
final Entry<AttributeKey<?>, Object>[] currentChildAttrs;
synchronized (childOptions) {
currentChildOptions = childOptions.entrySet().toArray(newOptionArray(childOptions.size()));
}
synchronized (childAttrs) {
currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(childAttrs.size()));
}
// 為NioServerSocketChannel的pipeline新增一個初始化Handler,
// 當NioServerSocketChannel在EventLoop註冊成功時,該handler的init方法將被呼叫
p.addLast(new ChannelInitializer<Channel>() {
@Override
public void initChannel(Channel ch) throws Exception {
final ChannelPipeline pipeline = ch.pipeline();
ChannelHandler handler = config.handler();
//如果使用者配置過Handler
if (handler != null) {
pipeline.addLast(handler);
}
ch.eventLoop().execute(new Runnable() {
@Override
public void run() {
// 為NioServerSocketChannel的pipeline新增ServerBootstrapAcceptor處理器
// 該Handler主要用來將新建立的NioSocketChannel註冊到EventLoopGroup中
pipeline.addLast(new ServerBootstrapAcceptor(
currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
}
});
}
});
}
其實整個過程可以分為三個步驟:
- 設定 Channel 的 option 和 attr;
- 獲取繫結的 pipeline,然後為 NioServerSocketChanne l繫結的 pipeline 新增 Handler;
- 將用於服務端註冊的 Handler ServerBootstrapAcceptor 新增到 ChannelPipeline 中。ServerBootstrapAcceptor 為一個接入器,專門接受新請求,把新的請求扔給某個事件迴圈器。
至此初始化部分已經結束,我們再看註冊部分,
// /向EventLoopGroup中註冊一個channel
ChannelFuture regFuture = config().group().register(channel);
註冊方法的呼叫位於 initAndRegister()
方法中。注意這裡的 group()
返回的是前面的 boss NioEvenLoopGroup,它繼承 MultithreadEventLoopGroup,呼叫的 register()
,也是 MultithreadEventLoopGroup 中的。如下:
public ChannelFuture register(Channel channel) {
return next().register(channel);
}
呼叫 next()
方法從 EventLoopGroup 中獲取下一個 EventLoop,呼叫 register()
方法註冊:
public ChannelFuture register(Channel channel) {
return register(new DefaultChannelPromise(channel, this));
}
將Channel和EventLoop封裝成一個DefaultChannelPromise物件,然後呼叫register()方法。DefaultChannelPromis為ChannelPromise的預設實現,而ChannelPromisee繼承Future,具備非同步執行結構,繫結Channel,所以又具備了監聽的能力,故而ChannelPromis是Netty非同步執行的核心介面。
public ChannelFuture register(ChannelPromise promise) {
ObjectUtil.checkNotNull(promise, "promise");
promise.channel().unsafe().register(this, promise);
return promise;
}
首先獲取 channel 的 unsafe 物件,該 unsafe 物件就是在之前設定過得。然後呼叫 register()
方法,如下:
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
if (eventLoop == null) {
throw new NullPointerException("eventLoop");
}
if (isRegistered()) {
promise.setFailure(new IllegalStateException("registered to an event loop already"));
return;
}
if (!isCompatible(eventLoop)) {
promise.setFailure(
new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName()));
return;
}
AbstractChannel.this.eventLoop = eventLoop;
// 必須要保證註冊是由該EventLoop發起的
if (eventLoop.inEventLoop()) {
register0(promise); // 註冊
} else {
// 如果不是單獨封裝成一個task非同步執行
try {
eventLoop.execute(new Runnable() {
@Override
public void run() {
register0(promise);
}
});
} catch (Throwable t) {
logger.warn(
"Force-closing a channel whose registration task was not accepted by an event loop: {}",
AbstractChannel.this, t);
closeForcibly();
closeFuture.setClosed();
safeSetFailure(promise, t);
}
}
}
過程如下:
- 首先通過
isRegistered()
判斷該 Channel 是否已經註冊到 EventLoop 中; - 通過
eventLoop.inEventLoop()
來判斷當前執行緒是否為該 EventLoop 自身發起的,如果是,則呼叫register0()
直接註冊; - 如果不是,說明該 EventLoop 中的執行緒此時沒有執行權,則需要新建一個執行緒,單獨封裝一個 Task,而該 Task 的主要任務則是執行
register0()
。
無論當前 EventLoop 的執行緒是否擁有執行權,最終都會要執行 register0()
,如下:
private void register0(ChannelPromise promise) {
try {
// 確保 Channel 處於 open
if (!promise.setUncancellable() || !ensureOpen(promise)) {
return;
}
boolean firstRegistration = neverRegistered;
// 真正的註冊動作
doRegister();
neverRegistered = false;
registered = true;
pipeline.invokeHandlerAddedIfNeeded();
safeSetSuccess(promise); //設定註冊結果為成功
pipeline.fireChannelRegistered();
if (isActive()) {
//如果是首次註冊,發起 pipeline 的 fireChannelActive
if (firstRegistration) {
pipeline.fireChannelActive();
} else if (config().isAutoRead()) {
beginRead();
}
}
} catch (Throwable t) {
closeForcibly();
closeFuture.setClosed();
safeSetFailure(promise, t);
}
}
如果 Channel 處於 open 狀態,則呼叫 doRegister()
方法完成註冊,然後將註冊結果設定為成功。最後判斷如果是首次註冊且處於啟用狀態,則發起 pipeline 的 fireChannelActive()
。
protected void doRegister() throws Exception {
boolean selected = false;
for (;;) {
try {
// 註冊到NIOEventLoop的Selector上
selectionKey = javaChannel().register(eventLoop().selector, 0, this);
return;
} catch (CancelledKeyException e) {
if (!selected) {
eventLoop().selectNow();
selected = true;
} else {
throw e;
}
}
}
}
這裡註冊時 ops 設定的是 0,也就是說 ServerSocketChannel 僅僅只是表示了註冊成功,還不能監聽任何網路操作,這樣做的目的是(摘自《Netty權威指南(第二版)》):
- 註冊方式是多型的,它既可以被 NIOServerSocketChannel 用來監聽客戶端的連線接入,也可以註冊 SocketChannel 用來監聽網路讀或者寫操作。
- 通過
SelectionKey.interestOps(int ops)
方法可以方便地修改監聽操作位。所以,此處註冊需要獲取 SelectionKey 並給 AbstractNIOChannel 的成員變數 selectionKey 賦值。
由於這裡 ops 設定為 0,所以還不能監聽讀寫事件。呼叫 doRegister()
後,然後呼叫pipeline.invokeHandlerAddedIfNeeded();
,這個時候控制檯會出現 loggin-handlerAdded
,內部如何呼叫,我們在剖析 pipeline 時再做詳細分析。然後將註冊結果設定為成功(safeSetSuccess(promise)
)。呼叫 pipeline.fireChannelRegistered();
這個時候控制檯會列印 loggin-channelRegistered
。這裡簡單分析下該方法。
public final ChannelPipeline fireChannelRegistered() {
AbstractChannelHandlerContext.invokeChannelRegistered(head);
return this;
}
static void invokeChannelRegistered(final AbstractChannelHandlerContext next) {
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
next.invokeChannelRegistered();
} else {
executor.execute(new Runnable() {
@Override
public void run() {
next.invokeChannelRegistered();
}
});
}
}
pipeline 維護著 handle 連結串列,事件會在 NioServerSocketChannel 的 pipeline 中傳播。最終都會呼叫 next.invokeChannelRegistered()
,如下:
private void invokeChannelRegistered() {
if (invokeHandler()) {
try {
((ChannelInboundHandler) handler()).channelRegistered(this);
} catch (Throwable t) {
notifyHandlerException(t);
}
} else {
fireChannelRegistered();
}
}
在 invokeChannelRegistered()
會呼叫我們在前面設定的 handler (還記得簽名的 handler(new LoggingServerHandler()
)麼)的 channelRegistered()
,這個時候控制檯應該會列印 loggin-channelRegistered
。
到這裡initAndRegister() (final ChannelFuture regFuture = initAndRegister();)
就分析完畢了,該方法主要做如下三件事:
- 通過反射產生了一個 NioServerSocketChannle 物件;
- 呼叫
init(channel)
完成初始化工作; - 將NioServerSocketChannel進行了註冊。
initAndRegister()
篇幅較長,分析完畢了,我們再返回到doBind(final SocketAddress localAddress)
。在 doBind(final SocketAddress localAddress)
中如果 initAndRegister()
執行完成,則 regFuture.isDone()
則為 true,執行 doBind0()
。如果沒有執行完成,則會註冊一個監聽 ChannelFutureListener,當 initAndRegister()
完成後,會呼叫該監聽的 operationComplete()
方法,最終目的還是執行 doBind0()
。故而我們下面分析 doBind0()
到底做了些什麼。原始碼如下:
private static void doBind0(
final ChannelFuture regFuture, final Channel channel,
final SocketAddress localAddress, final ChannelPromise promise) {
channel.eventLoop().execute(new Runnable() {
@Override
public void run() {
if (regFuture.isSuccess()) {
channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
} else {
promise.setFailure(regFuture.cause());
}
}
});
}
doBind0()
較為簡單,首先new 一個執行緒 task,然後將該任務提交到 NioEventLoop 中進行處理,我們先看 execute()
。
public void execute(Runnable task) {
if (task == null) {
throw new NullPointerException("task");
}
boolean inEventLoop = inEventLoop();
if (inEventLoop) {
addTask(task);
} else {
startThread();
addTask(task);
if (isShutdown() && removeTask(task)) {
reject();
}
}
if (!addTaskWakesUp && wakesUpForTask(task)) {
wakeup(inEventLoop);
}
}
呼叫 inEventLoop()
判斷當前執行緒是否為該 NioEventLoop 所關聯的執行緒,如果是,則呼叫 addTask()
將任務 task 新增到佇列中,如果不是,則先啟動執行緒,在呼叫 addTask()
將任務 task 新增到佇列中。addTask()
如下:
protected void addTask(Runnable task) {
if (task == null) {
throw new NullPointerException("task");
}
if (!offerTask(task)) {
reject(task);
}
}
offerTask()
新增到佇列中:
final boolean offerTask(Runnable task) {
if (isShutdown()) {
reject();
}
return taskQueue.offer(task);
}
task 新增到任務佇列 taskQueue成功後,執行任務會呼叫如下方法:
channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
channel 首先呼叫 bind()
完成 channel 與埠的繫結,如下:
public ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
return pipeline.bind(localAddress, promise);
}
public final ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
return tail.bind(localAddress, promise);
}
tail 在 DefaultChannelPipeline 中定義:final AbstractChannelHandlerContext tail;
有 tail 就會有 head ,在 DefaultChannelPipeline 中維護這一個 AbstractChannelHandlerContext 節點的雙向連結串列,該連結串列是實現 Pipeline 機制的關鍵,更多詳情會在 ChannelPipeline 中做詳細說明。bind()
最終會呼叫 DefaultChannelPipeline 的 bind()
方法。如下:
public ChannelFuture bind(final SocketAddress localAddress, final ChannelPromise promise) {
if (localAddress == null) {
throw new NullPointerException("localAddress");
}
if (!validatePromise(promise, false)) {
// cancelled
return promise;
}
final AbstractChannelHandlerContext next = findContextOutbound();
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
next.invokeBind(localAddress, promise);
} else {
safeExecute(executor, new Runnable() {
@Override
public void run() {
next.invokeBind(localAddress, promise);
}
}, promise, null);
}
return promise;
}
首先對 localAddress 、 promise 進行校驗,符合規範則呼叫 findContextOutbound()
,該方法用於在 pipeline 中獲取 AbstractChannelHandlerContext 雙向連結串列中的一個節點,如下:
private AbstractChannelHandlerContext findContextOutbound() {
AbstractChannelHandlerContext ctx = this;
do {
ctx = ctx.prev;
} while (!ctx.outbound);
return ctx;
}
從該方法可以看出,所獲取的節點是從 tail 開始遍歷,獲取第一個節點屬性 outbound 為 true 的節點。其實該節點是 AbstractChannelHandlerContext 雙向連結串列的 head 節點。獲取該節點後,呼叫 invokeBind()
,如下:
private void invokeBind(SocketAddress localAddress, ChannelPromise promise) {
if (invokeHandler()) {
try {
((ChannelOutboundHandler) handler()).bind(this, localAddress, promise);
} catch (Throwable t) {
notifyOutboundHandlerException(t, promise);
}
} else {
bind(localAddress, promise);
}
}
handler()
返回的是 HeadContext 物件,然後呼叫其bind()
,如下:
public void bind(
ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise)
throws Exception {
unsafe.bind(localAddress, promise);
}
unsafe 定義在 HeadContext 中,在建構函式中初始化(unsafe = pipeline.channel().unsafe();
),呼叫 bind()
如下:
public final void bind(final SocketAddress localAddress, final ChannelPromise promise) {
assertEventLoop();
if (!promise.setUncancellable() || !ensureOpen(promise)) {
return;
}
if (Boolean.TRUE.equals(config().getOption(ChannelOption.SO_BROADCAST)) &&
localAddress instanceof InetSocketAddress &&
!((InetSocketAddress) localAddress).getAddress().isAnyLocalAddress() &&
!PlatformDependent.isWindows() && !PlatformDependent.isRoot()) {
logger.warn(
"A non-root user can't receive a broadcast packet if the socket " +
"is not bound to a wildcard address; binding to a non-wildcard " +
"address (" + localAddress + ") anyway as requested.");
}
boolean wasActive = isActive();
try {
// 最核心方法
doBind(localAddress);
} catch (Throwable t) {
safeSetFailure(promise, t);
closeIfClosed();
return;
}
if (!wasActive && isActive()) {
invokeLater(new Runnable() {
@Override
public void run() {
pipeline.fireChannelActive();
}
});
}
safeSetSuccess(promise);
}
內部呼叫 doBind()
,該方法為繫結中最核心的方法,位於 NioServerSocketChannel 中,如下:
protected void doBind(SocketAddress localAddress) throws Exception {
if (PlatformDependent.javaVersion() >= 7) {
javaChannel().bind(localAddress, config.getBacklog());
} else {
javaChannel().socket().bind(localAddress, config.getBacklog());
}
}
javaChannel()
返回的是 NioServerSocketChannel 例項初始化時所產生的 Java NIO ServerSocketChannel 例項(ServerSocketChannelImple例項),然後呼叫其 bind()
,如下:
public ServerSocketChannel bind(SocketAddress var1, int var2) throws IOException {
Object var3 = this.lock;
synchronized(this.lock) {
if(!this.isOpen()) {
throw new ClosedChannelException();
} else if(this.isBound()) {
throw new AlreadyBoundException();
} else {
InetSocketAddress var4 = var1 == null?new InetSocketAddress(0):Net.checkAddress(var1);
SecurityManager var5 = System.getSecurityManager();
if(var5 != null) {
var5.checkListen(var4.getPort());
}
NetHooks.beforeTcpBind(this.fd, var4.getAddress(), var4.getPort());
Net.bind(this.fd, var4.getAddress(), var4.getPort());
Net.listen(this.fd, var2 < 1?50:var2);
Object var6 = this.stateLock;
synchronized(this.stateLock) {
this.localAddress = Net.localAddress(this.fd);
}
return this;
}
}
}
該方法屬於 Java NIO 層次的,該方法涉及到服務端埠的繫結,埠的監聽,這些內容在後續的 Channel 時做詳細介紹。
到這裡就真正完成了服務端埠的繫結。