Netty-主從Reactor多執行緒模式的原始碼實現
Netty--主從Reactor多執行緒模式的原始碼實現
總覽
EventLoopGroup到底是什麼?
EventLoopGroup是一個儲存EventLoop的容器,同時他應該具備執行緒池的功能。
graph BT; EventLoopGroup --> EventExecutorGroup; EventExecutorGroup --> ScheduledExecutorService;由於EventLoopGroup間接繼承ScheduledExecutorService介面,因此其實現類應該具備執行緒池的功能。
看一下NioEventLoopGroup的核心屬性
// 預設的執行緒池大小 private static final int DEFAULT_EVENT_LOOP_THREADS; static { DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt( "io.netty.eventLoopThreads", NettyRuntime.availableProcessors() * 2)); // CPU核數 x 2 } // 儲存EventLoop的陣列 private final EventExecutor[] children;
構造方法
// 如果傳入的nThread為空,那麼使用預設的執行緒池大小(CPU核數 x 2) protected MultithreadEventLoopGroup(int nThreads, ThreadFactory threadFactory, Object... args) { super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, threadFactory, args); } // 最終的構建方法 protected MultithreadEventExecutorGroup(int nThreads, Executor executor, EventExecutorChooserFactory chooserFactory, Object... args) { // ......省略 children = new EventExecutor[nThreads]; // 初始化EventExecutor陣列的大小 for (int i = 0; i < nThreads; i ++) { boolean success = false; try { children[i] = newChild(executor, args); // 初始化EventLoop success = true; } catch (Exception e) { throw new IllegalStateException("failed to create a child event loop", e); } finally { // ......省略 } } // ......省略 }
當建立NioEventLoopGroup例項後,就已經初始化好其EventExecutor陣列的大小以及其儲存的EventLoop。
graph TB; NioEventLoopGroup --> array[EventExecutor陣列]; array --> E1[EventLoop]; array --CPU核數 x 2--> E2[EventLoop]; array --> E3[EventLoop...];EventLoop又是什麼?
graph BT; EventLoop --> EventExecutor; NioEventLoop -->EventLoop是EventExecutor的子介面。
看一下NioEventLoop的核心屬性
private Selector selector;
private volatile Thread thread;
private final Executor executor;
private final Queue<Runnable> taskQueue;
static final int DEFAULT_MAX_PENDING_EXECUTOR_TASKS = Math.max(16,
SystemPropertyUtil.getInt("io.netty.eventexecutor.maxPendingTasks", Integer.MAX_VALUE));
每一個NioEventLoop都有一個Selector、Thread、Executor、Queue屬性,當建立EventLoop例項後其thread屬性仍為NULL,還沒有建立執行緒。
Channel
看一下Channel的核心屬性
private volatile EventLoop eventLoop; // 指向EventLoop的引用
每一個Channel都有一個指向EventLoop的引用,也就是說每一個Channel都會與一個EventLoop進行繫結,其eventLoop()方法返回此Channel繫結的EventLoop。
graph TB; EventLoop --> A[Channel]; EventLoop --> B[Channel]; EventLoop --> C[Channel...];Netty啟動伺服器後的核心流程
// 建立bossGroup和workerGroup
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
// 繫結埠,啟動服務
serverBootstrap.bind(8888).sync();
AbstarctBootstrap的doBind()方法
private ChannelFuture doBind(final SocketAddress localAddress) {
final ChannelFuture regFuture = initAndRegister(); // 初始化Channel並將Channel註冊到bossGroup中的EventLoop中的Selector
final Channel channel = regFuture.channel();
if (regFuture.cause() != null) {
return regFuture;
}
if (regFuture.isDone()) {
ChannelPromise promise = channel.newPromise();
doBind0(regFuture, channel, localAddress, promise);
return promise;
} else {
final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
regFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception { // 當initAndRegister()方法完成後,由IO執行緒自動呼叫
Throwable cause = future.cause();
if (cause != null) {
promise.setFailure(cause);
} else {
promise.registered();
doBind0(regFuture, channel, localAddress, promise); // 進行繫結操作
}
}
});
return promise;
}
}
final ChannelFuture initAndRegister() {
Channel channel = null;
try {
channel = channelFactory.newChannel(); // 建立NioServerSocketChannel
init(channel); // 初始化Channel
} catch (Throwable t) {
if (channel != null) {
channel.unsafe().closeForcibly();
return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);
}
return new DefaultChannelPromise(new FailedChannel(), GlobalEventExecutor.INSTANCE).setFailure(t);
}
// config()方法返回AbstractBootStrap
// AbstractBootStrap儲存bossGroup,ServerBootStrap儲存workerGroup
ChannelFuture regFuture = config().group().register(channel);
if (regFuture.cause() != null) {
if (channel.isRegistered()) {
channel.close();
} else {
channel.unsafe().closeForcibly();
}
}
return regFuture;
}
EventLoopGroup的register()方法會通過選擇器從EventLoopGroup中取出一個EventLoop,然後呼叫EventLoop的register()方法
// EventLoopGroup的register()方法
public ChannelFuture register(Channel channel) {
return next().register(channel); // 呼叫EventLoop的register()方法
}
/**
* 通過選擇器從EventLoopGroup中取出一個EventLoop
*/
public EventExecutor next() {
return chooser.next();
}
SingleThreadEventLoop的register()方法
public ChannelFuture register(final ChannelPromise promise) {
ObjectUtil.checkNotNull(promise, "promise");
promise.channel().unsafe().register(this, promise); // 呼叫Channel的register()方法,傳遞當前物件,即當前EventLoop例項
return promise;
}
AbstarctChannel的register()方法
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
if (eventLoop == null) {
throw new NullPointerException("eventLoop");
}
if (isRegistered()) {
promise.setFailure(new IllegalStateException("registered to an event loop already"));
return;
}
if (!isCompatible(eventLoop)) {
promise.setFailure(new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName()));
return;
}
AbstractChannel.this.eventLoop = eventLoop; // 初始化Channel的EventLoop屬性
// 第一次呼叫時,EventLoop中的thread為空,因此inEventLoop()返回false
if (eventLoop.inEventLoop()) {
register0(promise);
} else {
try {
eventLoop.execute(new Runnable() { // 呼叫EventLoop的execute()方法,並將註冊操作作為一個任務,放入到Runnable例項中
@Override
public void run() {
// register0()方法最終將會呼叫doRegister()方法,將Channel註冊到EventLoop中的Selector
register0(promise);
}
});
} catch (Throwable t) {
logger.warn("Force-closing a channel whose registration task was not accepted by an event loop: {}",AbstractChannel.this, t);
closeForcibly();
closeFuture.setClosed();
safeSetFailure(promise, t);
}
}
}
SingleThreadEventExecutor的execute()方法
public void execute(Runnable task) {
if (task == null) {
throw new NullPointerException("task");
}
boolean inEventLoop = inEventLoop();
addTask(task); // 將任務放入到當前EventLoop的任務佇列中
if (!inEventLoop) {
startThread(); // 開啟一個執行緒,最終呼叫doStartThread()方法
if (isShutdown()) {
boolean reject = false;
try {
if (removeTask(task)) {
reject = true;
}
} catch (UnsupportedOperationException e) {
}
if (reject) {
reject();
}
}
}
if (!addTaskWakesUp && wakesUpForTask(task)) {
wakeup(inEventLoop);
}
}
SingleThreadEventExecutor的doStartThread()方法
private void doStartThread() {
assert thread == null; // 當前EventLoop中的thread為空,輸出結果肯定為true
executor.execute(new Runnable() { // 向執行緒池中提交一個任務,到這裡才建立執行緒同時非同步執行
@Override
public void run() {
thread = Thread.currentThread(); // 初始化EventLoop中的thread屬性,即執行緒池中執行該任務的執行緒
if (interrupted) {
thread.interrupt();
}
boolean success = false;
updateLastExecutionTime();
try {
SingleThreadEventExecutor.this.run(); // 核心方法(當前物件為NioEventLoop)
success = true;
} catch (Throwable t) {
logger.warn("Unexpected exception from an event executor: ", t);
} finally {
// ......省略
}
}
});
}
NioEventLoop的run()方法
protected void run() {
for (;;) { // 死迴圈
try {
try {
switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
case SelectStrategy.CONTINUE:
continue;
case SelectStrategy.BUSY_WAIT:
case SelectStrategy.SELECT:
select(wakenUp.getAndSet(false));
if (wakenUp.get()) {
selector.wakeup();
}
default:
}
} catch (IOException e) {
rebuildSelector0();
handleLoopException(e);
continue;
}
cancelledKeys = 0;
needsToSelectAgain = false;
final int ioRatio = this.ioRatio;
if (ioRatio == 100) {
try {
processSelectedKeys(); // 處理Channel的就緒事件,最終呼叫processSelectedKeysOptimized()方法
} finally {
runAllTasks(); // 執行儲存在任務佇列中的所有任務
}
} else {
final long ioStartTime = System.nanoTime();
try {
processSelectedKeys();
} finally {
// Ensure we always run tasks.
final long ioTime = System.nanoTime() - ioStartTime;
runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
}
}
} catch (Throwable t) {
handleLoopException(t);
}
try {
if (isShuttingDown()) {
closeAll();
if (confirmShutdown()) {
return;
}
}
} catch (Throwable t) {
handleLoopException(t);
}
}
}
private void processSelectedKeysOptimized() {
for (int i = 0; i < selectedKeys.size; ++i) { // 遍歷就緒的Channel對應的SelectionKey集合,如果Channel沒有事件就緒則集合為空
final SelectionKey k = selectedKeys.keys[i];
selectedKeys.keys[i] = null;
final Object a = k.attachment();
if (a instanceof AbstractNioChannel) {
processSelectedKey(k, (AbstractNioChannel) a);
} else {
@SuppressWarnings("unchecked")
NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
processSelectedKey(k, task);
}
if (needsToSelectAgain) {
selectedKeys.reset(i + 1);
selectAgain();
i = -1;
}
}
}
NioEventLoop的processSelectedKey()方法
private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
if (!k.isValid()) {
final EventLoop eventLoop;
try {
eventLoop = ch.eventLoop();
} catch (Throwable ignored) {
return;
}
if (eventLoop != this || eventLoop == null) {
return;
}
unsafe.close(unsafe.voidPromise());
return;
}
try {
int readyOps = k.readyOps(); // 就緒的事件
if ((readyOps & SelectionKey.OP_CONNECT) != 0) { // 連線就緒
int ops = k.interestOps();
ops &= ~SelectionKey.OP_CONNECT;
k.interestOps(ops);
unsafe.finishConnect(); // 處理連線就緒
}
if ((readyOps & SelectionKey.OP_WRITE) != 0) { // 寫就緒
ch.unsafe().forceFlush(); // 處理寫就緒
}
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) { // 接收和讀就緒
unsafe.read(); // 處理接收和讀就緒
}
} catch (CancelledKeyException ignored) {
unsafe.close(unsafe.voidPromise());
}
}
當EventLoop中的執行緒處理完Channel的就緒事件後將會執行儲存在任務佇列中的所有任務,此時註冊任務將被執行
private void register0(ChannelPromise promise) {
try {
if (!promise.setUncancellable() || !ensureOpen(promise)) {
return;
}
boolean firstRegistration = neverRegistered;
doRegister(); // 執行註冊操作
neverRegistered = false;
registered = true;
pipeline.invokeHandlerAddedIfNeeded();
safeSetSuccess(promise); // ChannelFuture設定成成功狀態,同時isDone()方法將返回true
pipeline.fireChannelRegistered();
if (isActive()) {
if (firstRegistration) {
pipeline.fireChannelActive();
} else if (config().isAutoRead()) {
beginRead();
}
}
} catch (Throwable t) {
closeForcibly();
closeFuture.setClosed();
safeSetFailure(promise, t); // ChannelFuture設定成失敗狀態,同時isDone()方法將返回true
}
}
AbstractNioChannel的doRegister()方法
protected void doRegister() throws Exception {
boolean selected = false;
for (;;) {
try {
// 將Channel註冊到EventLoop中的Selector
selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this); // 將Channel註冊到Selector當中,當Channel執行完相應操作後,再向Selector傳遞需要監聽此Channel的事件型別
return;
} catch (CancelledKeyException e) {
if (!selected) {
eventLoop().selectNow();
selected = true;
} else {
throw e;
}
}
}
}
回到一開始的AbstarctBootstrap的doBind()方法
private ChannelFuture doBind(final SocketAddress localAddress) {
final ChannelFuture regFuture = initAndRegister();
final Channel channel = regFuture.channel();
if (regFuture.cause() != null) {
return regFuture;
}
if (regFuture.isDone()) { // 由於註冊任務是非同步執行的,此時任務還未被執行,因此isDone()方法將返回false
ChannelPromise promise = channel.newPromise();
doBind0(regFuture, channel, localAddress, promise);
return promise;
} else {
final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
regFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception { // 當註冊任務被執行完畢後,由IO執行緒自動
Throwable cause = future.cause();
if (cause != null) {
promise.setFailure(cause);
} else {
promise.registered();
doBind0(regFuture, channel, localAddress, promise); // 進行繫結操作
}
}
});
return promise;
}
}
private static void doBind0(
final ChannelFuture regFuture, final Channel channel,
final SocketAddress localAddress, final ChannelPromise promise) {
channel.eventLoop().execute(new Runnable() { // 同樣的,獲取Channel繫結的EventLoop,呼叫EventLoop的execute()方法,並將繫結操作作為一個任務,放入到Runnable例項中
@Override
public void run() {
if (regFuture.isSuccess()) {
channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
} else {
promise.setFailure(regFuture.cause());
}
}
});
}
public void execute(Runnable task) {
if (task == null) {
throw new NullPointerException("task");
}
boolean inEventLoop = inEventLoop(); // 由於當前方法是被處理註冊任務的那個執行緒呼叫的,即EventLoop中的那個執行緒,因此inEventLoop()方法返回true
addTask(task); // 將任務放入到佇列中,等待被執行
if (!inEventLoop) { // 不會執行,到目前為止仍然只開啟了一個執行緒
startThread();
if (isShutdown()) {
boolean reject = false;
try {
if (removeTask(task)) {
reject = true;
}
} catch (UnsupportedOperationException e) {
}
if (reject) {
reject();
}
}
}
if (!addTaskWakesUp && wakesUpForTask(task)) {
wakeup(inEventLoop);
}
}
這就是Netty啟動服務端後的核心流程
1.建立ServerSocketChannel並進行初始化。
2.呼叫bossGroup的register()方法,方法內部通過選擇器從bossGroup中取出一個EventLoop,然後呼叫EventLoop的register()方法。
3.最終呼叫Channel的register()方法,方法中初始化Channel的eventLoop屬性,然後將Channel註冊到bossGroup中的EventLoop中的Selector作為一個任務,放入到Runnable例項中,然後呼叫EventLoop的execute(Runnable)方法。
4.execute()方法將任務放入到任務隊列當中,然後向執行緒池中提交一個任務,此時才建立一個執行緒,同時初始化EventLoop中的thread屬性。
5.任務中初始化EventLoop的thread屬性,然後呼叫NioEventLoop的run()方法,死迴圈去處理Channel的就緒事件以及執行儲存在任務佇列中的所有任務。
6.當註冊任務被執行完畢後,該執行緒會回撥ChannelFutureListener中的operationComplete()方法,將繫結操作作為一個任務,然後呼叫EventLoop的execute(Runnable)方法。
7.重複第4步驟,將任務放入到任務佇列中,由於當前執行緒就是EventLoop中的thread,因此inEventLoop()方法返回true,不會向執行緒池中提交任務,任務等待被EventLoop中的執行緒執行。
Netty處理接收和讀就緒事件的核心流程
BossGroup中的EventLoop中的Thread正在死迴圈的處理Channel的就緒事件以及執行儲存在任務佇列中的所有任務
// 建立一個連線
SocketChannel socketChannel = SocketChannel.open();
socketChannel.connect(new InetSocketAddress(InetAddress.getLocalHost(), 8888));
BossGroup中的EventLoop中的Selector監聽到ServerSocketChannel有接收就緒事件
// NioEventLoop的processSelectedKey()方法
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
unsafe.read();
}
public void read() {
assert eventLoop().inEventLoop(); // 肯定是true
final ChannelConfig config = config();
final ChannelPipeline pipeline = pipeline();
final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
allocHandle.reset(config);
boolean closed = false;
Throwable exception = null;
try {
try {
do {
int localRead = doReadMessages(readBuf); // 讀取Channel中的資料
if (localRead == 0) {
break;
}
if (localRead < 0) {
closed = true;
break;
}
allocHandle.incMessagesRead(localRead);
} while (allocHandle.continueReading());
} catch (Throwable t) {
exception = t;
}
int size = readBuf.size();
for (int i = 0; i < size; i ++) {
readPending = false;
pipeline.fireChannelRead(readBuf.get(i)); // 最終會呼叫ServerBootStrap的register()方法
}
readBuf.clear();
allocHandle.readComplete();
pipeline.fireChannelReadComplete();
if (exception != null) {
closed = closeOnReadError(exception);
pipeline.fireExceptionCaught(exception);
}
if (closed) {
inputShutdown = true;
if (isOpen()) {
close(voidPromise());
}
}
} finally {
if (!readPending && !config.isAutoRead()) {
removeReadOp();
}
}
}
NioServerSocketChannel的doReadMessage()方法
protected int doReadMessages(List<Object> buf) throws Exception {
SocketChannel ch = SocketUtils.accept(javaChannel()); // 接收連線,呼叫ServerSocketChannel的accept()方法
try {
if (ch != null) {
buf.add(new NioSocketChannel(this, ch)); // 將接收到連線放入到buffer中
return 1;
}
} catch (Throwable t) {
logger.warn("Failed to create a new channel from an accepted socket.", t);
try {
ch.close();
} catch (Throwable t2) {
logger.warn("Failed to close a socket.", t2);
}
}
return 0;
}
ServerBootstrap的register()方法
public void channelRead(ChannelHandlerContext ctx, Object msg) {
final Channel child = (Channel) msg;
child.pipeline().addLast(childHandler);
setChannelOptions(child, childOptions, logger);
setAttributes(child, childAttrs);
try {
// 呼叫workerGroup的register()方法,方法內部通過選擇器從workerGroup中取出一個EventLoop,然後呼叫EventLoop的register()方法,最終呼叫AbstractChannel的register()方法
childGroup.register(child).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
forceClose(child, future.cause());
}
}
});
} catch (Throwable t) {
forceClose(child, t);
}
}
AbstractChannel的register()方法
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
if (eventLoop == null) {
throw new NullPointerException("eventLoop");
}
if (isRegistered()) {
promise.setFailure(new IllegalStateException("registered to an event loop already"));
return;
}
if (!isCompatible(eventLoop)) {
promise.setFailure(
new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName()));
return;
}
AbstractChannel.this.eventLoop = eventLoop; // 初始化Channel的eventLoop屬性
// 如果取出的是新的EventLoop,那麼其thread屬性為空,當前執行緒總是bossGroup中的EventLoop中的thread,因此inEventLoop()返回false。
// 如果取出的是舊的EventLoop,那麼其thread屬性不為空,當前執行緒總是bossGroup中的EventLoop中的thread,因此inEventLoop()返回false。
if (eventLoop.inEventLoop()) { // 總是返回false,當前執行緒總是bossGroup中的EventLoop中的thread,肯定與workerGroup中的任意一個EventLoop中的thread都不相等。
register0(promise);
} else {
try {
eventLoop.execute(new Runnable() { // 總是會執行該方法
@Override
public void run() {
register0(promise);
}
});
} catch (Throwable t) {
logger.warn("Force-closing a channel whose registration task was not accepted by an event loop: {}",AbstractChannel.this, t);
closeForcibly();
closeFuture.setClosed();
safeSetFailure(promise, t);
}
}
}
public void execute(Runnable task) {
if (task == null) {
throw new NullPointerException("task");
}
boolean inEventLoop = inEventLoop();
addTask(task); // 將任務放入到隊列當中
if (!inEventLoop) {
startThread(); // 不管是新的還是舊的EventLoop都會呼叫該方法
if (isShutdown()) {
boolean reject = false;
try {
if (removeTask(task)) {
reject = true;
}
} catch (UnsupportedOperationException e) {
}
if (reject) {
reject();
}
}
}
if (!addTaskWakesUp && wakesUpForTask(task)) {
wakeup(inEventLoop);
}
}
private void startThread() {
if (state == ST_NOT_STARTED) { // 如果取出的是舊的EventLoop,那麼其thread屬性本身就不為空,因此其state屬性就不等於ST_NOT_STARTED,因此不會開啟新的執行緒,註冊任務等待被EventLoop中的執行緒執行
if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {
boolean success = false;
try {
doStartThread(); // 取出的是新的EventLoop,其thread屬性為null,同時其state等於ST_NOT_STARTED,因此需要開啟執行緒,向執行緒池中提交一個任務,死迴圈去處理Channel的就緒事件以及執行儲存在任務佇列中的所有任務
success = true;
} finally {
if (!success) {
STATE_UPDATER.compareAndSet(this, ST_STARTED, ST_NOT_STARTED);
}
}
}
}
}
這就是Netty處理接收和讀就緒事件的核心流程
1.客戶端建立SocketChannel並進行連線。
2.bossGroup中的EventLoop中的Selector監聽到ServerSocketChannel有接收就緒事件。
3.接收連線,最終呼叫workerGroup的register()方法,方法內部通過選擇器從workGroup中取出一個EventLoop,然後呼叫EventLoop的register()方法。
4.最終呼叫Channel的register()方法,方法中初始化Channel的eventLoop屬性,然後將Channel註冊到bossGroup中的EventLoop中的Selector作為一個任務,放入到Runnable例項中,然後呼叫EventLoop的execute(Runnable)方法。
5.execute()方法將任務放入到任務佇列中,如果取出的是新的EventLoop,那麼其thread屬性為空,此時將會開啟執行緒,向執行緒池中提交一個任務,死迴圈去處理Channel的就緒事件以及執行儲存在任務佇列中的所有任務,如果取出的是舊的EventLoop,那麼其thread屬性不為空,任務等待被EventLoop中的執行緒執行。
總結
1.當建立NioEventLoopGroup例項後,就已經初始化好其EventExecutor陣列的大小以及其儲存的EventLoop。
2.每一個NioEventLoop都有一個Selector、Thread、Executor、Queue屬性,當建立NioEventLoop例項後,其thread屬性仍為空。
3.每一個Channel都會與一個EventLoop進行繫結,其eventLoop()方法返回其繫結的EventLoop,同時該Channel會註冊到其繫結的EventLoop的Selector中。
4.EventLoopGroup的register()方法會通過選擇器從EventLoopGroup中取出一個EventLoop,然後呼叫EventLoop的register()方法。
5.EventLoop的execute()方法會將任務放入到任務隊列當中,如果inEventLoop()方法返回false同時其thread屬性為空,則建立一個執行緒,向執行緒池中提交一個任務(任務中初始化EventLoop中的thread屬性,然後死迴圈去處理Channel的就緒事件以及執行儲存在任務佇列中的所有任務