Netty Channel原始碼分析
前面,我們大致瞭解了Netty中的幾個核心元件。今天我們就來先來介紹Netty的網路通訊元件,用於執行網路I/O操作 —— Channel。
Netty版本:4.1.30
概述
資料在網路中總是以位元組的形式進行流通。我們在進行網路程式設計時選用何種傳輸方式編碼(OIO、NIO等)決定了這些位元組的傳輸方式。
在沒有Netty之前,為了提升系統的併發能力,從OIO切換到NIO時,需要對程式碼進行大量的重構,因為相應的Java NIO 與 IO API大不相同。而Netty在這些Java原生API的基礎上做了一層封裝,對使用者提供了高度抽象而又統一的API,從而讓傳輸方式的切換不在變得困難,只需要直接使用即可,而不需要對整個程式碼進行重構。
Netty Channel UML
netty channel族如下:
整個族群中,AbstractChannel 是最為關鍵的一個抽象類,從它繼承出了AbstractNioChannel、AbstractOioChannel、AbstractEpollChannel、LocalChannel、EmbeddedChannel等類,每個類代表了不同的協議以及相應的IO模型。除了 TCP 協議以外,Netty 還支援很多其他的連線協議,並且每種協議還有 NIO(非同步 IO) 和 OIO(Old-IO,即傳統的阻塞 IO) 版本的區別. 不同協議不同的阻塞型別的連線都有不同的 Channel 型別與之對應。下面是一些常用的 Channel 型別:
- NioSocketChannel:代表非同步的客戶端 TCP Socket 連線
- NioServerSocketChannel:非同步的伺服器端 TCP Socket 連線
- NioDatagramChannel:非同步的 UDP 連線
- NioSctpChannel:非同步的客戶端 Sctp 連線
- NioSctpServerChannel:非同步的 Sctp 伺服器端連線
- OioSocketChannel:同步的客戶端 TCP Socket 連線
- OioServerSocketChannel:同步的伺服器端 TCP Socket 連線
- OioDatagramChannel:同步的 UDP 連線
- OioSctpChannel:同步的 Sctp 伺服器端連線
- OioSctpServerChannel:同步的客戶端 TCP Socket 連線
Channel API
我們先來看下最頂層介面 channel 主要的API,常用的如下:
介面名 | 描述 |
---|---|
eventLoop() | Channel需要註冊到EventLoop的多路複用器上,用於處理I/O事件,通過eventLoop()方法可以獲取到Channel註冊的EventLoop。EventLoop本質上就是處理網路讀寫事件的Reactor執行緒。在Netty中,它不僅僅用來處理網路事件,也可以用來執行定時任務和使用者自定義NioTask等任務。 |
pipeline() | 返回channel分配的ChannelPipeline |
isActive() | 判斷channel是否啟用。啟用的意義取決於底層的傳輸型別。例如,一個Socket傳輸一旦連線到了遠端節點便是活動的,而一個Datagram傳輸一旦被開啟便是活動的 |
localAddress() | 返回本地的socket地址 |
remoteAddress() | 返回遠端的socket地址 |
flush() | 將之前已寫的資料沖刷到底層Channel上去 |
write(Object msg) | 請求將當前的msg通過ChannelPipeline寫入到目標Channel中。注意,write操作只是將訊息存入到訊息傳送環形陣列中,並沒有真正被髮送,只有呼叫flush操作才會被寫入到Channel中,傳送給對方。 |
writeAndFlush() | 等同於呼叫write()並接著呼叫flush() |
metadate() | 熟悉TCP協議的讀者可能知道,當建立Socket的時候需要指定TCP引數,例如接收和傳送的TCP緩衝區大小,TCP的超時時間。是否重用地址等。在Netty中,每個Channel對應一個物理連結,每個連線都有自己的TCP引數配置。所以,Channel會聚合一個ChannelMetadata用來對TCP引數提供元資料描述資訊,通過metadata()方法就可以獲取當前Channel的TCP引數配置。 |
read() | 從當前的Channel中讀取資料到第一個inbound緩衝區中,如果資料被成功讀取,觸發ChannelHandler.channelRead(ChannelHandlerContext,Object)事件。讀取操作API呼叫完成後,緊接著會觸發ChannelHander.channelReadComplete(ChannelHandlerContext)事件,這樣業務的ChannelHandler可以決定是否需要繼續讀取資料。如果已經有操作請求被掛起,則後續的讀操作會被忽略。 |
close(ChannelPromise promise) | 主動關閉當前連線,通過ChannelPromise設定操作結果並進行結果通知,無論操作是否成功,都可以通過ChannelPromise獲取操作結果。該操作會級聯觸發ChannelPipeline中所有ChannelHandler的ChannelHandler.close(ChannelHandlerContext,ChannelPromise)事件。 |
parent() | 對於服務端Channel而言,它的父Channel為空;對於客戶端Channel,它的父Channel就是建立它的ServerSocketChannel。 |
id() | 返回ChannelId物件,ChannelId是Channel的唯一標識。 |
Channel建立
對Netty Channel API以及相關的類有了一個初步瞭解之後,接下來我們來詳細瞭解一下在Netty的啟動過程中Channel是如何建立的。服務端Channel的建立過程,主要分為四個步驟:1)Channel建立;2)Channel初始化;3)Channel註冊;4)Channel繫結。
我們以下面的程式碼為例進行解析:
// 建立兩個執行緒組,專門用於網路事件的處理,Reactor執行緒組
// 用來接收客戶端的連線,
EventLoopGroup bossGroup = new NioEventLoopGroup();
// 用來進行SocketChannel的網路讀寫
EventLoopGroup workGroup = new NioEventLoopGroup();
// 建立輔助啟動類ServerBootstrap,並設定相關配置:
ServerBootstrap b = new ServerBootstrap();
// 設定處理Accept事件和讀寫操作的事件迴圈組
b.group(bossGroup, workGroup)
// 配置Channel型別
.channel(NioServerSocketChannel.class)
// 配置監聽地址
.localAddress(new InetSocketAddress(port))
// 設定伺服器通道的選項,設定TCP屬性
.option(ChannelOption.SO_KEEPALIVE, Boolean.TRUE)
// 設定建立連線後的客戶端通道的選項
.childOption(ChannelOption.CONNECT_TIMEOUT_MILLIS, 5000)
// channel屬性,便於儲存使用者自定義資料
.attr(AttributeKey.newInstance("UserId"), "60293")
.handler(new LoggingHandler(LogLevel.INFO))
// 設定子處理器,主要是使用者的自定義處理器,用於處理IO網路事件
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(serverHandler);
}
});
// 呼叫bind()方法繫結埠,sync()會阻塞等待處理請求。這是因為bind()方法是一個非同步過程,會立即返回一個ChannelFuture物件,呼叫sync()會等待執行完成
ChannelFuture f = b.bind().sync();
// 獲得Channel的closeFuture阻塞等待關閉,伺服器Channel關閉時closeFuture會完成
f.channel().closeFuture().sync();
複製程式碼
呼叫channel()介面設定 AbstractBootstrap 的成員變數 channelFactory,該變數顧名思義就是用於建立channel的工廠類。原始碼如下:
...
public B channel(Class<? extends C> channelClass) {
if (channelClass == null) {
throw new NullPointerException("channelClass");
}
// 建立 channelFactory
return channelFactory(new ReflectiveChannelFactory<C>(channelClass));
}
...
public B channelFactory(ChannelFactory<? extends C> channelFactory) {
if (channelFactory == null) {
throw new NullPointerException("channelFactory");
}
if (this.channelFactory != null) {
throw new IllegalStateException("channelFactory set already");
}
this.channelFactory = channelFactory;
return (B) this;
}
...
複製程式碼
channelFactory 設定為 ReflectiveChannelFactory ,在我們這個例子中 clazz 為 NioServerSocketChannel ,我們可以看到其中有個 newChannel() 介面,通過反射的方式來呼叫,這個介面的呼叫處我們後面會介紹到。原始碼如下:
// Channel工廠類
public class ReflectiveChannelFactory<T extends Channel> implements ChannelFactory<T> {
private final Class<? extends T> clazz;
public ReflectiveChannelFactory(Class<? extends T> clazz) {
if (clazz == null) {
throw new NullPointerException("clazz");
}
this.clazz = clazz;
}
@Override
public T newChannel() {
try {
// 通過反射來進行常見Channel例項
return clazz.newInstance();
} catch (Throwable t) {
throw new ChannelException("Unable to create Channel from class " + clazz, t);
}
}
@Override
public String toString() {
return StringUtil.simpleClassName(clazz) + ".class";
}
}
複製程式碼
接下來我們來看下 NioServerSocketChannel 的建構函式,主要就是:
- 生成ServerSocketChannel物件。NioServerSocketChannel建立時,首先使用SelectorProvider的openServerSocketChannel開啟伺服器套接字通道。SelectorProvider是Java的NIO提供的抽象類,是選擇器和可選擇通道的服務提供者。具體的實現類有SelectorProviderImpl,EPollSelectorProvide,PollSelectorProvider。選擇器的主要工作是根據作業系統型別和版本選擇合適的Provider:如果LInux核心版本>=2.6則,具體的SelectorProvider為EPollSelectorProvider,否則為預設的PollSelectorProvider。
- 設定 ServerSocketChannelConfig 成員變數。
private static ServerSocketChannel newSocket(SelectorProvider provider) {
try {
// 呼叫JDK底層API生成 ServerSocketChannel 物件例項
return provider.openServerSocketChannel();
} catch (IOException e) {
throw new ChannelException("Failed to open a server socket.", e);
}
}
private final ServerSocketChannelConfig config;
public NioServerSocketChannel() {
this(newSocket(DEFAULT_SELECTOR_PROVIDER));
}
public NioServerSocketChannel(SelectorProvider provider) {
this(newSocket(provider));
}
public NioServerSocketChannel(ServerSocketChannel channel) {
// 呼叫 AbstractNioChannel 構造器,建立 NioServerSocketChannel,設定SelectionKey為ACCEPT
super(null, channel, SelectionKey.OP_ACCEPT);
// 建立ChannleConfig物件,主要是TCP引數配置類
config = new NioServerSocketChannelConfig(this, javaChannel().socket());
}
複製程式碼
AbstractNioChannel 的構造器如下:
protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
// 呼叫 AbstractChannel 構造器
super(parent);
this.ch = ch;
// 從上一步過來,這裡設定為 SelectionKey.OP_ACCEPT
this.readInterestOp = readInterestOp;
try {
// 設定為非阻塞狀態
ch.configureBlocking(false);
} catch (IOException e) {
try {
ch.close();
} catch (IOException e2) {
if (logger.isWarnEnabled()) {
logger.warn("Failed to close a partially initialized socket.", e2);
}
}
throw new ChannelException("Failed to enter non-blocking mode.", e);
}
}
複製程式碼
在 AbstractChannel 構造器中,會設Channel關聯的三個核心物件:ChannelId、ChannelPipeline、Unsafe。
- 初始化ChannelId,ChannelId是一個全域性唯一的值;
- 建立 NioMessageUnsafe 例項,該類為Channel提供了用於完成網路通訊相關的底層操作,如connect(),read(),register(),bind(),close()等;
- 為Channel建立DefaultChannelPipeline,初始事件傳播管道。關於Pipeline的分析,請看 後文 的分析。
protected AbstractChannel(Channel parent) {
this.parent = parent;
// 設定ChannelId
id = newId();
// 設定Unsafe
unsafe = newUnsafe();
// 設定Pipeline
pipeline = newChannelPipeline();
}
複製程式碼
從 NioServerSocketChannelConfig 的建構函式追溯下去,在 DefaultChannelConfig 會設定channel成員變數。
public DefaultChannelConfig(Channel channel) {
this(channel, new AdaptiveRecvByteBufAllocator());
}
protected DefaultChannelConfig(Channel channel, RecvByteBufAllocator allocator) {
setRecvByteBufAllocator(allocator, channel.metadata());
// 繫結channel
this.channel = channel;
}
複製程式碼
以上就是channel建立的過程,總結一下:
- 通過 ReflectiveChannelFactory 工廠類,以反射的方式對channel進行建立;
- channel建立的過程中,會建立四個重要的物件:ChannelId、ChannelConfig、ChannelPipeline、Unsafe。
Channel初始化
主要分為以下兩步:
- 將啟動器(Bootstrap)設定的選項和屬性設定到NettyChannel上面
- 向Pipeline新增初始化Handler,供註冊後使用
我們從 AbstractBootstrap 的 bind() 介面進去,呼叫鏈:bind() —> doBind(localAddress) —> initAndRegister() —> init(Channel channel),我們看下 ServerBootstrap 中 init() 介面的實現:
final ChannelFuture initAndRegister() {
Channel channel = null;
try {
// 呼叫Channel工程類的newChannel()介面,建立channel,就是前面我們講的部分內容
channel = channelFactory.newChannel();
// 初始化channel
init(channel);
} catch (Throwable t) {
....
}
複製程式碼
初始化Channel,我們來重點看下 init(channel) 介面:
void init(Channel channel) throws Exception {
// 獲取啟動器 啟動時配置的option引數,主要是TCP的一些屬性
final Map<ChannelOption<?>, Object> options = options0();
// 將獲得到 options 配置到 ChannelConfig 中去
synchronized (options) {
setChannelOptions(channel, options, logger);
}
// 獲取 ServerBootstrap 啟動時配置的 attr 引數
final Map<AttributeKey<?>, Object> attrs = attrs0();
// 配置 Channel attr,主要是設定使用者自定義的一些引數
synchronized (attrs) {
for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {
@SuppressWarnings("unchecked")
AttributeKey<Object> key = (AttributeKey<Object>) e.getKey();
channel.attr(key).set(e.getValue());
}
}
// 獲取channel中的 pipeline,這個pipeline使我們前面在channel建立過程中設定的 pipeline
ChannelPipeline p = channel.pipeline();
// 將啟動器中配置的 childGroup 儲存到區域性變數 currentChildGroup
final EventLoopGroup currentChildGroup = childGroup;
// 將啟動器中配置的 childHandler 儲存到區域性變數 currentChildHandler
final ChannelHandler currentChildHandler = childHandler;
final Entry<ChannelOption<?>, Object>[] currentChildOptions;
final Entry<AttributeKey<?>, Object>[] currentChildAttrs;
// 儲存使用者設定的 childOptions 到區域性變數 currentChildOptions
synchronized (childOptions) {
currentChildOptions = childOptions.entrySet().toArray(newOptionArray(childOptions.size()));
}
// 儲存使用者設定的 childAttrs 到區域性變數 currentChildAttrs
synchronized (childAttrs) {
currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(childAttrs.size()));
}
p.addLast(new ChannelInitializer<Channel>() {
@Override
public void initChannel(final Channel ch) throws Exception {
final ChannelPipeline pipeline = ch.pipeline();
// 獲取啟動器上配置的handler
ChannelHandler handler = config.handler();
if (handler != null) {
// 新增 handler 到 pipeline 中
pipeline.addLast(handler);
}
ch.eventLoop().execute(new Runnable() {
@Override
public void run() {
// 用child相關的引數創建出一個新連線接入器ServerBootstrapAcceptor
// 通過 ServerBootstrapAcceptor 可以將一個新連線繫結到一個執行緒上去
// 每次有新的連線進來 ServerBootstrapAcceptor 都會用child相關的屬性對它們進行配置,並註冊到ChaildGroup上去
pipeline.addLast(new ServerBootstrapAcceptor(
ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
}
});
}
});
}
複製程式碼
對於新連線接入器 ServerBootstrapAcceptor 的分析 ,請檢視 後文
Channel註冊
在channel完成建立和初始化之後,接下來就需要將其註冊到事件輪循器Selector上去。我們回到 initAndRegister 介面上去:
final ChannelFuture initAndRegister() {
...
// 獲取 EventLoopGroup ,並呼叫它的 register 方法來註冊 channel
ChannelFuture regFuture = config().group().register(channel);
if (regFuture.cause() != null) {
if (channel.isRegistered()) {
channel.close();
} else {
channel.unsafe().closeForcibly();
}
}
return regFuture;
}
複製程式碼
最終會向下呼叫到 SingleThreadEventLoop 中的 register 介面:
如何呼叫到這裡,裡面的細節需要等到後面文章講到 MultithreadEventExecutorGroup 再詳細說明
@Override
public ChannelFuture register(final ChannelPromise promise) {
ObjectUtil.checkNotNull(promise, "promise");
// 呼叫unsafe的register介面
promise.channel().unsafe().register(this, promise);
return promise;
}
複製程式碼
程式碼跟蹤下去,直到 AbstractChannel 中的 AbstractUnsafe 這個類中的 register 介面。
@Override
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
if (eventLoop == null) {
throw new NullPointerException("eventLoop");
}
if (isRegistered()) {
promise.setFailure(new IllegalStateException("registered to an event loop already"));
return;
}
if (!isCompatible(eventLoop)) {
promise.setFailure(
new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName()));
return;
}
// 將該Channel與eventLoop 進行繫結,後續與該channel相關的IO操作都由eventLoop來處理
AbstractChannel.this.eventLoop = eventLoop;
// 初次註冊時 eventLoop.inEventLoop() 返回false
if (eventLoop.inEventLoop()) {
// 呼叫實際的註冊介面register0
register0(promise);
} else {
try {
eventLoop.execute(new Runnable() {
@Override
public void run() {
// 呼叫實際的註冊介面register0
register0(promise);
}
});
} catch (Throwable t) {
logger.warn(
"Force-closing a channel whose registration task was not accepted by an event loop: {}",
AbstractChannel.this, t);
closeForcibly();
closeFuture.setClosed();
safeSetFailure(promise, t);
}
}
}
複製程式碼
register0介面主要分為以下三段邏輯:
-
doRegister();
-
pipeline.invokeHandlerAddedIfNeeded();
-
pipeline.fireChannelRegistered();
private void register0(ChannelPromise promise) {
try {
if (!promise.setUncancellable() || !ensureOpen(promise)) {
return;
}
boolean firstRegistration = neverRegistered;
// 呼叫 doRegister() 介面
doRegister();
neverRegistered = false;
registered = true;
// 通過pipeline的傳播機制,觸發handlerAdded事件
pipeline.invokeHandlerAddedIfNeeded();
safeSetSuccess(promise);
// 通過pipeline的傳播機制,觸發channelRegistered事件
pipeline.fireChannelRegistered();
// 還沒有繫結,所以這裡的 isActive() 返回false.
if (isActive()) {
if (firstRegistration) {
pipeline.fireChannelActive();
} else if (config().isAutoRead()) {
beginRead();
}
}
} catch (Throwable t) {
closeForcibly();
closeFuture.setClosed();
safeSetFailure(promise, t);
}
}
複製程式碼
我們來看 AbstractNioChannel 中的 doRegister()介面,最終呼叫的就是Java JDK底層的NIO API來註冊。
@Override
protected void doRegister() throws Exception {
boolean selected = false;
for (;;) {
try {
// eventLoop().unwrappedSelector():獲取selector,將在後面介紹 EventLoop 建立時會講到
// 將selector註冊到Java NIO Channel上
// ops 設定為 0,表示不關心任何事件
// att 設定為 channel自身,表示後面還會將channel取出來用作它用(後面文章會講到)
selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
return;
} catch (CancelledKeyException e) {
if (!selected) {
eventLoop().selectNow();
selected = true;
} else {
throw e;
}
}
}
}
複製程式碼
Channel繫結
在完成建立、初始化以及註冊之後,接下來就是Channel繫結操作。
本小節涉及到的pipeline事件傳播機制,我們放到後面的文章中去講解。
從啟動器的bind()介面開始,往下呼叫 doBind() 方法:
private ChannelFuture doBind(final SocketAddress localAddress) {
// 初始化及註冊
final ChannelFuture regFuture = initAndRegister();
final Channel channel = regFuture.channel();
if (regFuture.cause() != null) {
return regFuture;
}
if (regFuture.isDone()) {
// At this point we know that the registration was complete and successful.
ChannelPromise promise = channel.newPromise();
// 呼叫 doBind0
doBind0(regFuture, channel, localAddress, promise);
return promise;
} else {
....
}
}
複製程式碼
doBind 方法又會呼叫 doBind0() 方法,在doBind0()方法中會通過EventLoop去執行channel的bind()任務,關於EventLoop的execute介面的分析,請看後面的 文章 。
private static void doBind0(
final ChannelFuture regFuture, final Channel channel,
final SocketAddress localAddress, final ChannelPromise promise) {
channel.eventLoop().execute(new Runnable() {
@Override
public void run() {
if (regFuture.isSuccess()) {
// 呼叫channel.bind介面
channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
} else {
promise.setFailure(regFuture.cause());
}
}
});
}
複製程式碼
doBind0() 方法往下會條用到 pipeline.bind(localAddress, promise);
方法,通過pipeline的傳播機制,最終會呼叫到 AbstractChannel.AbstractUnsafe.bind() 方法,這個方法主要做兩件事情:
- 呼叫doBind():呼叫底層JDK API進行Channel的埠繫結。
- 呼叫pipeline.fireChannelActive():
關於Pipeline的傳播機制,請看 後文
@Override
public final void bind(final SocketAddress localAddress, final ChannelPromise promise) {
....
// wasActive 在繫結成功前為 false
boolean wasActive = isActive();
try {
// 呼叫doBind()呼叫JDK底層API進行埠繫結
doBind(localAddress);
} catch (Throwable t) {
safeSetFailure(promise, t);
closeIfClosed();
return;
}
// 完成繫結之後,isActive() 返回true
if (!wasActive && isActive()) {
invokeLater(new Runnable() {
@Override
public void run() {
// 觸發channelActive事件
pipeline.fireChannelActive();
}
});
}
safeSetSuccess(promise);
}
複製程式碼
我們這裡看服務端 NioServerSocketChannel 實現的 doBind方法,最終會呼叫JDK 底層 NIO Channel的bind方法:
@Override
protected void doBind(SocketAddress localAddress) throws Exception {
if (PlatformDependent.javaVersion() >= 7) {
javaChannel().bind(localAddress, config.getBacklog());
} else {
javaChannel().socket().bind(localAddress, config.getBacklog());
}
}
複製程式碼
呼叫 pipeline.fireChannelActive(),開始傳播active事件,pipeline首先就會呼叫HeadContext節點進行事件傳播,會呼叫到 DefaultChannelPipeline.HeadContext.channelActive() 方法:
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
// 觸發heanlder 的 ChannelActive 方法
ctx.fireChannelActive();
// 呼叫介面readIfIsAutoRead
readIfIsAutoRead();
}
private void readIfIsAutoRead() {
if (channel.config().isAutoRead()) {
// 呼叫channel.read()
channel.read();
}
}
複製程式碼
channel.read() 方法往下會呼叫到 AbstractChannelHandlerContext.read() 方法:
@Override
public ChannelHandlerContext read() {
// 獲取下一個ChannelHandlerContext節點
final AbstractChannelHandlerContext next = findContextOutbound();
// 獲取EventExecutor
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
// 呼叫下一個節點的invokeRead介面
next.invokeRead();
} else {
Runnable task = next.invokeReadTask;
if (task == null) {
next.invokeReadTask = task = new Runnable() {
@Override
public void run() {
next.invokeRead();
}
};
}
executor.execute(task);
}
return this;
}
複製程式碼
通過pipeline的事件傳播機制,最終會呼叫到 AbstractChannel.AbstractUnsafe.beginRead() 方法:
@Override
public final void beginRead() {
assertEventLoop();
if (!isActive()) {
return;
}
try {
// 呼叫 doBeginRead();
doBeginRead();
} catch (final Exception e) {
invokeLater(new Runnable() {
@Override
public void run() {
pipeline.fireExceptionCaught(e);
}
});
close(voidPromise());
}
}
複製程式碼
我們看下 AbstractNioChannel 對doBeginRead介面的實現邏輯:
// 註冊一個OP_ACCEPT
@Override
protected void doBeginRead() throws Exception {
// Channel.read() or ChannelHandlerContext.read() was called
// 獲取channel註冊是的設定的 selectionKey
final SelectionKey selectionKey = this.selectionKey;
// selectionKey無效則返回
if (!selectionKey.isValid()) {
return;
}
readPending = true;
// 前面講到channel在註冊的時候,這是 interestOps 設定的是 0
final int interestOps = selectionKey.interestOps();
// readInterestOp 在前面講到channel建立的時候,設定值為 SelectionKey.OP_ACCEPT
if ((interestOps & readInterestOp) == 0) {
// 最終 selectionKey 的興趣集就會設定為 SelectionKey.OP_ACCEPT
// 表示隨時可以接收新連線的接入
selectionKey.interestOps(interestOps | readInterestOp);
}
}
複製程式碼
總結
至此,我們就分析完了Channel的建立、初始化、註冊、繫結的流程。其中涉及到的EventLoopGroup和Pipeline事件傳播機制的知識點,我們放到後面的文章中去講解。