netty中的發動機--EventLoop及其實現類NioEventLoop的原始碼分析
EventLoop
在之前介紹Bootstrap的初始化以及啟動過程時,我們多次接觸了NioEventLoopGroup這個類,關於這個類的理解,還需要了解netty的執行緒模型。NioEventLoopGroup可以理解為一組執行緒,這些執行緒每一個都可以獨立地處理多個channel產生的io事件。
NioEventLoopGroup初始化
我們看其中一個引數比較多的構造方法,其他一些引數較少的構造方法使用了一些預設值,使用的預設引數如下:
- SelectorProvider型別,用於建立socket通道,udp通道,建立Selector物件等,預設值是SelectorProvider.provider(),大部分情況下使用預設值就行,這個方法最終建立的是一個WindowsSelectorProvider物件
- SelectStrategyFactory,Select策略類的工廠類,它的預設值是DefaultSelectStrategyFactory.INSTANCE,就是一個SelectStrategyFactory物件本身,而SelectStrategyFactory工廠產生的是DefaultSelectStrategy策略類。
- RejectedExecutionHandler,拒絕任務的策略類,決定在任務佇列已滿時採取什麼樣的策略,類似於jdk執行緒池的RejectedExecutionHandler的作用
接下來,我們看一下其中的一個常用的構造方法,
public NioEventLoopGroup(int nThreads, ThreadFactory threadFactory, final SelectorProvider selectorProvider, final SelectStrategyFactory selectStrategyFactory) { super(nThreads, threadFactory, selectorProvider, selectStrategyFactory, RejectedExecutionHandlers.reject()); }
可見,當前類中並沒有什麼初始化邏輯,直接呼叫了父類的構造方法,所以我們接著看父類MultithreadEventLoopGroup的構造方法:
protected MultithreadEventLoopGroup(int nThreads, ThreadFactory threadFactory, Object... args) {
super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, threadFactory, args);
}
同樣,並未做任務處理,直接呼叫父類構造方法,所以我們接著看MultithreadEventExecutorGroup構造方法,初始化邏輯的實現在這個類中,
MultithreadEventExecutorGroup構造方法
通過上一小結的分析,我們知道NioEventLoopGroup的構造方法的主要邏輯的實現是在MultithreadEventExecutorGroup類中,並且在呼叫構造方法的過程中加上了一個引數的預設值,即EventExecutorChooserFactory型別引數的預設值DefaultEventExecutorChooserFactory.INSTANCE,這個類以輪詢(roundrobin)的方式從多個執行緒中依次選出執行緒用於註冊channel。
總結一下這段程式碼的主要步驟:
- 首先是一些變數的非空檢查和合法性檢查
- 然後根據傳入的執行緒數量,建立若干個子執行器,每個執行器對應一個執行緒
- 最後以子執行器陣列為引數,使用選擇器工廠類建立一個選擇器
最後給每個子執行器新增一個監聽器,以監聽子執行器的終止,做一些簿記工作,使得在所有子執行器全部終止後將當前的執行器組終止
protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
EventExecutorChooserFactory chooserFactory, Object... args) {
// 首先是變數的非空檢查以及合法性判斷,
// nThreads在MultithreadEventLoopGroup的構造方法中已經經過一些預設值處理,
if (nThreads <= 0) {
throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads));
}// 這裡一般都會使用預設值, // ThreadPerTaskExecutor的作用即字面意思,一個任務一個執行緒 if (executor == null) { executor = new ThreadPerTaskExecutor(newDefaultThreadFactory()); } // 子執行器的陣列,一個子執行器對應一個執行緒 children = new EventExecutor[nThreads]; // 根據傳入的執行緒數量建立多個自執行器 // 注意,這裡子執行器建立好後並不會立即執行起來 for (int i = 0; i < nThreads; i ++) { boolean success = false; try { children[i] = newChild(executor, args); success = true; } catch (Exception e) { // TODO: Think about if this is a good exception type throw new IllegalStateException("failed to create a child event loop", e); } finally { // 如果建立子執行器不成功,那麼需要將已經建立好的子執行器也全部銷燬 if (!success) { for (int j = 0; j < i; j ++) { children[j].shutdownGracefully(); } // 等待所以子執行器停止後在退出 for (int j = 0; j < i; j ++) { EventExecutor e = children[j]; try { while (!e.isTerminated()) { e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS); } } catch (InterruptedException interrupted) { // Let the caller handle the interruption. Thread.currentThread().interrupt(); break; } } } } } // 建立一個子執行器的選擇器,選擇器的作用是從子執行器中選出一個 // 預設使用roundRobin的方式 chooser = chooserFactory.newChooser(children); final FutureListener<Object> terminationListener = new FutureListener<Object>() { @Override public void operationComplete(Future<Object> future) throws Exception { if (terminatedChildren.incrementAndGet() == children.length) { terminationFuture.setSuccess(null); } } }; // 給每個子執行器新增監聽器,在子執行器終止的時候做一些工作 // 每有一個子執行器終止時就將terminatedChildren變數加一 // 當所有子執行器全部終止時,當前這個執行器組就終止了 for (EventExecutor e: children) { e.terminationFuture().addListener(terminationListener); } // 包裝一個不可變的集合 Set<EventExecutor> childrenSet = new LinkedHashSet<EventExecutor>(children.length); Collections.addAll(childrenSet, children); readonlyChildren = Collections.unmodifiableSet(childrenSet);
}
NioEventLoopGroup.newChild
上面的方法中呼叫了newChild方法來建立一個子執行器,而這個方法是一個抽象方法,我們看NioEventLoopGroup類的實現:
protected EventLoop newChild(Executor executor, Object... args) throws Exception {
return new NioEventLoop(this, executor, (SelectorProvider) args[0],
((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2]);
}
可見僅僅是簡單地建立了一個NioEventLoop物件。
小結
到這裡,我們就把NioEventLoopGroup的初始化過程分析完了。我們不禁思考,既然NioEventLoopGroup是一個執行器組,說白了就是一組執行緒,那這些執行緒是什麼時候跑起來的呢?如果讀者還有印象,應該能記得我們在分析Bootstrap建立連線過程時,channel初始化之後需要註冊到EventLoopGroup中,其實是註冊到其中的一個EventLoop上,註冊邏輯最終是在AbstractChannel.AbstractUnsafe.register方法中實現的,其中有一段程式碼:
if (eventLoop.inEventLoop()) {
register0(promise);
} else {
try {
eventLoop.execute(new Runnable() {
@Override
public void run() {
register0(promise);
}
});
} catch (Throwable t) {
logger.warn(
"Force-closing a channel whose registration task was not accepted by an event loop: {}",
AbstractChannel.this, t);
closeForcibly();
closeFuture.setClosed();
safeSetFailure(promise, t);
}
}
首先呼叫eventLoop.inEventLoop()判斷執行器的執行緒與當前執行緒是否是同一個,如果是則直接執行註冊的程式碼,如果不是就呼叫eventLoop.execute將註冊邏輯封裝成一個任務放到執行器的任務佇列中,接下里我們就以這個方法為切入點,探究一下子執行器執行緒的啟動過程。
AbstractEventExecutor.inEventLoop
首先,讓我們來看一下這個方法,這個方法的作用是判斷當前執行緒與執行器的執行緒是否同一個執行緒。
public boolean inEventLoop() {
return inEventLoop(Thread.currentThread());
}
SingleThreadEventExecutor.inEventLoop
程式碼很簡單,就不多說了。
public boolean inEventLoop(Thread thread) {
return thread == this.thread;
}
SingleThreadEventExecutor.execute
方法很簡單,核心邏輯在startThread方法中,
public void execute(Runnable task) {
// 非空檢查
if (task == null) {
throw new NullPointerException("task");
}
// 執行到這裡一般都是外部呼叫者,
boolean inEventLoop = inEventLoop();
// 向任務佇列中新增一個任務
addTask(task);
// 如果當前執行緒不是執行器的執行緒,那麼需要檢查執行器執行緒是否已經執行,
// 如果還沒在執行,就需要啟動執行緒
if (!inEventLoop) {
startThread();
// 檢查執行緒是否被關閉
if (isShutdown()) {
boolean reject = false;
try {
// 將剛剛新增的任務移除
if (removeTask(task)) {
reject = true;
}
} catch (UnsupportedOperationException e) {
// The task queue does not support removal so the best thing we can do is to just move on and
// hope we will be able to pick-up the task before its completely terminated.
// In worst case we will log on termination.
}
if (reject) {
reject();
}
}
}
// addTaskWakesUp不知道這個變數意義是什麼,NioEventLoop傳進來的是false
// 向任務佇列中新增一個空任務,這樣就能夠喚醒阻塞的執行器執行緒
// 有些情況下執行器執行緒會阻塞在taskQueue上,
// 所以向阻塞佇列中新增一個元素能夠喚醒哪些因為佇列空而被阻塞的執行緒
if (!addTaskWakesUp && wakesUpForTask(task)) {
wakeup(inEventLoop);
}
}
SingleThreadEventExecutor.startThread
這個方法的主要作用是維護內部的狀態量state,使用cas指令併發情況下對狀態量的修改是執行緒安全的,並且對於狀態量的判斷保證啟動邏輯只被執行一次
private void startThread() {
// 狀態量的維護
if (state == ST_NOT_STARTED) {
// 這裡使用了jdk中的原子更新器AtomicIntegerFieldUpdater類,
// 使用cpu的cas指令保證併發情況下能夠安全地維護狀態量
// 保證只有一個執行緒能夠執行啟動邏輯,保證啟動邏輯只被執行一次
if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {
boolean success = false;
try {
// 實際啟動執行緒的邏輯
doStartThread();
success = true;
} finally {
if (!success) {
STATE_UPDATER.compareAndSet(this, ST_STARTED, ST_NOT_STARTED);
}
}
}
}
}
SingleThreadEventExecutor.doStartThread
這個方法我就不貼程式碼了,說一下它的主要作用:
- 使用內部的Executor物件(一般是一個ThreadPerTaskExecutor)啟動一個執行緒,並執行任務
- 維護執行器的執行狀態,主要是通過內部的狀態量和cas指令來保證執行緒安全;此外維護內部的一些簿記量,例如執行緒本身的引用,執行緒啟動時間等
- 執行緒結束時做一些收尾和清理工作,例如將剩餘的任務跑完,執行關閉鉤子,關閉底層的selector(這個是具體的子類的清理邏輯),同時更新狀態量
具體的業務邏輯仍然是在子類中實現的,也就是SingleThreadEventExecutor.run()方法的具體實現。
NioEventLoop.run
我們仍然以NioEventLoop為例,看一下它實現的run方法。還大概講一下它的主要邏輯:
- 首選這個方法是一個迴圈,不斷地通過呼叫jdk底層的selector接收io事件,並對不同的io事件做處理,同時也會處理任務佇列中的任務,以及定時排程或延遲排程的任務
- 呼叫jdk的api, selector接收io事件
- 處理各種型別的io事件
- 處理任務
這裡,我就不貼程式碼了,其中比較重要的是對一些併發情況的考慮和處理,如selector的喚醒時機。接下來,主要看一下對於各種io事件的處理,至於任務佇列以及排程佇列中任務的處理比較簡單,就不展開了。
NioEventLoop.processSelectedKeysOptimized
這個方法會遍歷所有接受到的io事件對應的selectionKey,然後依次處理。
private void processSelectedKeysOptimized() {
// 遍歷所有的io事件的SelectionKey
for (int i = 0; i < selectedKeys.size; ++i) {
final SelectionKey k = selectedKeys.keys[i];
// null out entry in the array to allow to have it GC'ed once the Channel close
// See https://github.com/netty/netty/issues/2363
selectedKeys.keys[i] = null;
final Object a = k.attachment();
if (a instanceof AbstractNioChannel) {
// 處理事件
processSelectedKey(k, (AbstractNioChannel) a);
} else {
@SuppressWarnings("unchecked")
NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
processSelectedKey(k, task);
}
// 如果需要重新select,那麼把後面的selectionKey全部置0,然後再次呼叫selectNow方法
if (needsToSelectAgain) {
// null out entries in the array to allow to have it GC'ed once the Channel close
// See https://github.com/netty/netty/issues/2363
selectedKeys.reset(i + 1);
selectAgain();
i = -1;
}
}
}
NioEventLoop.processSelectedKey
這個方法首先對SelectionKey無效的情況做了處理,分為兩種情況:channel本身無效了;channel仍然是正常的,只不過是被從當前的selector上登出了,可能在其他的selector中仍然是正常執行的
- 對於第一種情況,需要關閉channel,即關閉底層的連線
- 對於第二種情況則不需要做任何處理。
接下來,我們著重分析一下對於四種事件的處理邏輯。
private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
// 如果selectionKey是無效的,那麼說明相應的channel是無效的,此時需要關閉這個channel
if (!k.isValid()) {
final EventLoop eventLoop;
try {
eventLoop = ch.eventLoop();
} catch (Throwable ignored) {
// If the channel implementation throws an exception because there is no event loop, we ignore this
// because we are only trying to determine if ch is registered to this event loop and thus has authority
// to close ch.
return;
}
// Only close ch if ch is still registered to this EventLoop. ch could have deregistered from the event loop
// and thus the SelectionKey could be cancelled as part of the deregistration process, but the channel is
// still healthy and should not be closed.
// See https://github.com/netty/netty/issues/5125
// 只關閉註冊在當前EventLoop上的channel,
// 理論上來說,一個channel是可以註冊到多個Eventloop上的,
// SelectionKey無效可能是因為channel從當前EventLoop上登出了,
// 但是channel本身依然是正常的,並且註冊在其他的EventLoop中
if (eventLoop != this || eventLoop == null) {
return;
}
// close the channel if the key is not valid anymore
// 到這裡說明channel已經無效了,關閉它
unsafe.close(unsafe.voidPromise());
return;
}
// 下面處理正常情況
try {
// 準備好的io事件
int readyOps = k.readyOps();
// We first need to call finishConnect() before try to trigger a read(...) or write(...) as otherwise
// the NIO JDK channel implementation may throw a NotYetConnectedException.
// 處理connect事件
if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
// remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking
// See https://github.com/netty/netty/issues/924
int ops = k.interestOps();
ops &= ~SelectionKey.OP_CONNECT;
k.interestOps(ops);
unsafe.finishConnect();
}
// Process OP_WRITE first as we may be able to write some queued buffers and so free memory.
// 處理write事件
if ((readyOps & SelectionKey.OP_WRITE) != 0) {
// Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write
ch.unsafe().forceFlush();
}
// Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead
// to a spin loop
// 處理read和accept事件
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
unsafe.read();
}
} catch (CancelledKeyException ignored) {
unsafe.close(unsafe.voidPromise());
}
}
connect事件處理
從程式碼中可以看出,connect事件的處理時通過呼叫NioUnsafe.finishConnect完成的,我們看一下AbstractNioUnsafe.finishConnect的實現:
public final void finishConnect() {
// Note this method is invoked by the event loop only if the connection attempt was
// neither cancelled nor timed out.
assert eventLoop().inEventLoop();
try {
// 是否已經處於連線成功的狀態
boolean wasActive = isActive();
// 抽象方法,有子類實現
doFinishConnect();
// 處理future物件,將其標記為成功
fulfillConnectPromise(connectPromise, wasActive);
} catch (Throwable t) {
fulfillConnectPromise(connectPromise, annotateConnectException(t, requestedRemoteAddress));
} finally {
// Check for null as the connectTimeoutFuture is only created if a connectTimeoutMillis > 0 is used
// See https://github.com/netty/netty/issues/1770
if (connectTimeoutFuture != null) {
connectTimeoutFuture.cancel(false);
}
connectPromise = null;
}
}
可以看出,主要是通過呼叫doFinishConnect實現完成連線的邏輯,具體到子類中,NioSocketChannel.doFinishConnect的實現是:
protected void doFinishConnect() throws Exception {
if (!javaChannel().finishConnect()) {
throw new Error();
}
}
write事件處理
對於的write事件的處理時通過呼叫NioUnsafe.forceFlush方法完成,最終的實現在AbstractChannel.AbstractUnsafe.flush0中:
大體上看,這個方法的邏輯比較簡單,但是實際上最複雜也是最核心的寫入邏輯在子類實現的doWrite方法中。由於本篇的重點在於把NioEventLoop的主幹邏輯梳理一下,所以這裡不再繼續展開,後面會單獨來分析這一塊的原始碼,這裡涉及到netty中對緩衝區的封裝,其中涉及到一些比較複雜的邏輯。
protected void flush0() {
// 如果正在寫資料,直接返回
if (inFlush0) {
// Avoid re-entrance
return;
}
// 輸出的緩衝區
final ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
if (outboundBuffer == null || outboundBuffer.isEmpty()) {
return;
}
inFlush0 = true;
// Mark all pending write requests as failure if the channel is inactive.
if (!isActive()) {
try {
if (isOpen()) {
outboundBuffer.failFlushed(new NotYetConnectedException(), true);
} else {
// Do not trigger channelWritabilityChanged because the channel is closed already.
outboundBuffer.failFlushed(newClosedChannelException(initialCloseCause), false);
}
} finally {
inFlush0 = false;
}
return;
}
try {
// 將緩衝區的資料寫入到channel中
doWrite(outboundBuffer);
} catch (Throwable t) {
if (t instanceof IOException && config().isAutoClose()) {
/**
* Just call {@link #close(ChannelPromise, Throwable, boolean)} here which will take care of
* failing all flushed messages and also ensure the actual close of the underlying transport
* will happen before the promises are notified.
*
* This is needed as otherwise {@link #isActive()} , {@link #isOpen()} and {@link #isWritable()}
* may still return {@code true} even if the channel should be closed as result of the exception.
*/
initialCloseCause = t;
close(voidPromise(), t, newClosedChannelException(t), false);
} else {
try {
shutdownOutput(voidPromise(), t);
} catch (Throwable t2) {
initialCloseCause = t;
close(voidPromise(), t2, newClosedChannelException(t), false);
}
}
} finally {
inFlush0 = false;
}
}
read事件和accept事件處理
乍看會比較奇怪,為什麼這兩個事件要放到一起處理呢,他們明明是不同的事件。這裡主要還是考慮到編碼的統一,因為read事件只有NioSocketChannel才會有,而accept事件只有NioServerSocketChannel才會有,所以這裡通過抽象方法,讓不同的子類去實現各自的邏輯,是的程式碼結構上更統一。我們這裡看一下NioScketChannel的實現,而對於NioServerSocketChannel的實現我會在後續分析netty服務端的啟動過程時在具體講到,即ServerBootstrap的啟動過程。
NioByteUnsafe.read
總結一下這個方法的主要邏輯:
- 首先會獲取緩衝分配器和相應的處理器RecvByteBufAllocator.Handle物件
- 迴圈讀取資料,每次分配一個一定大小(大小可配置)的緩衝,將channel中待讀取的資料讀取到緩衝中
- 以裝載有資料的緩衝為訊息體,向channel的處理流水線(即pipeline)中觸發一個讀取的事件,讓讀取到的資料在流水線中傳播,被各個處理器處理
- 重複此過程,知道channel中沒有可供讀取的資料
- 最後向pipeline中觸發一個讀取完成的事件
最後還要根據最後一次讀取到的資料量決定是否關閉通道,如果最後一次讀取到的資料量小於0,說明對端已經關閉了輸出,所以這裡需要將輸入關閉,即通道處於半關閉狀態。
public final void read() { final ChannelConfig config = config(); // 如果通道已經關閉,那麼就不需要再讀取資料,直接返回 if (shouldBreakReadReady(config)) { clearReadPending(); return; } final ChannelPipeline pipeline = pipeline(); // 緩衝分配器 final ByteBufAllocator allocator = config.getAllocator(); // 緩衝分配的處理器,處理緩衝分配,讀取計數等 final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle(); allocHandle.reset(config); ByteBuf byteBuf = null; boolean close = false; try { do { // 分配一個緩衝 byteBuf = allocHandle.allocate(allocator); // 將通道的資料讀取到緩衝中 allocHandle.lastBytesRead(doReadBytes(byteBuf)); // 如果沒有讀取到資料,說明通道中沒有待讀取的資料了, if (allocHandle.lastBytesRead() <= 0) { // nothing was read. release the buffer. // 因為沒讀取到資料,所以應該釋放緩衝 byteBuf.release(); byteBuf = null; // 如果讀取到的資料量是負數,說明通道已經關閉了 close = allocHandle.lastBytesRead() < 0; if (close) { // There is nothing left to read as we received an EOF. readPending = false; } break; } // 更新Handle內部的簿記量 allocHandle.incMessagesRead(1); readPending = false; // 向channel的處理器流水線中觸發一個事件, // 讓取到的資料能夠被流水線上的各個ChannelHandler處理 pipeline.fireChannelRead(byteBuf); byteBuf = null; // 這裡根據如下條件判斷是否繼續讀: // 上一次讀取到的資料量大於0,並且讀取到的資料量等於分配的緩衝的最大容量, // 此時說明通道中還有待讀取的資料 } while (allocHandle.continueReading()); // 讀取完成 allocHandle.readComplete(); // 觸發一個讀取完成的事件 pipeline.fireChannelReadComplete(); if (close) { closeOnRead(pipeline); } } catch (Throwable t) { handleReadException(pipeline, byteBuf, t, close, allocHandle); } finally { // Check if there is a readPending which was not processed yet. // This could be for two reasons: // * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method // * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method // // See https://github.com/netty/netty/issues/2254 // 這裡isAutoRead預設是true, 所以正常情況下會繼續監聽read事件 if (!readPending && !config.isAutoRead()) { removeReadOp(); } } } }
總結
本篇主要分析了EventLoop的事件監聽以及處理邏輯,此外處理處理io事件,也會處理新增進來的任務和定時排程任務和延遲排程任務。EventLoop就像是整個框架的發動機或者說是心臟,它通過jdk api進而簡介地呼叫系統呼叫,不斷地監聽各種io事件,同時對不同的io事件分門別類採用不同的處理方式,對於read事件則會將網路io資料讀取到緩衝中,並將讀取到的資料傳遞給使用者的處理器進行鏈式處理。Channelpipeline就像一個流水線一樣,對觸發的的各種事件進行處理。
遺留問題
- NioSocketChannel.doWrite方法的寫入邏輯的,待進一步分析
- ChannelPipeline的詳細分析,各種事件是怎麼在處理器之間傳播的,設計模式,程式碼結構等
- 緩衝分配器和緩衝處理器的分析,它們是怎麼對記憶體進行管理的,這也是netty高效能的原因之一。