Netty NioEventLoop原始碼解讀
NioEventLoop中維護了一個執行緒,執行緒啟動時會呼叫NioEventLoop的run方法,執行I/O任務和非I/O任務:I/O任務:即selectionKey中ready的事件,如accept、connect、read、write等,由processSelectedKeys方法觸發。非IO任務:新增到taskQueue中的任務,如register0、bind0等任務,由runAllTasks方法觸發。兩種任務的執行時間比由變數ioRatio控制,預設為50,則表示允許非IO任務執行的時間與IO任務的執行時間相等。以下是NioEventLoop的構造方法。
//這是NioEventLoop的構造方法,NioEventLoopGroup 是管理NioEventLoop的集合,executor則為任務執行池,SelectorProvider 用於構造Selector物件,RejectedExecutionHandler用於在executor滿了執行的邏輯 NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider, SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler) { super(parent, executor, false, DEFAULT_MAX_PENDING_TASKS, rejectedExecutionHandler); if (selectorProvider == null) { throw new NullPointerException("selectorProvider"); } if (strategy == null) { throw new NullPointerException("selectStrategy"); } provider = selectorProvider; final SelectorTuple selectorTuple = openSelector(); selector = selectorTuple.selector; unwrappedSelector = selectorTuple.unwrappedSelector; selectStrategy = strategy; }
在NioEventLoop類中,最重要的便是run方法,以下是run方法的原始碼:
@Override protected void run() { //這裡是個死迴圈,裡面的run方法邏輯會一直執行 for (;;) { try { //根據是佇列裡是否有任務執行不同的策略 switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) { case SelectStrategy.CONTINUE: continue; case SelectStrategy.SELECT: //select內部具體的方法看下面的原始碼 select(wakenUp.getAndSet(false)); if (wakenUp.get()) { selector.wakeup(); } default: } cancelledKeys = 0; needsToSelectAgain = false; //這裡是I/O與executor裡任務執行的比率,如果設為100則表示I/O的傳先級最高 final int ioRatio = this.ioRatio; if (ioRatio == 100) { try { //處理網路I/O事件processSelectedKeys可以看下面的原始碼 processSelectedKeys(); } finally { //最終不是會執行任務佇列裡的任務的 runAllTasks(); } } else { final long ioStartTime = System.nanoTime(); try { processSelectedKeys(); } finally { //傳入一個執行了io的時間,根據io執行時間算出任務能夠執行多長時間 final long ioTime = System.nanoTime() - ioStartTime; // runAllTasks看下面的原始碼 runAllTasks(ioTime * (100 - ioRatio) / ioRatio); } } } catch (Throwable t) { handleLoopException(t); } // Always handle shutdown even if the loop processing threw an exception. try { if (isShuttingDown()) { closeAll(); if (confirmShutdown()) { return; } } } catch (Throwable t) { handleLoopException(t); } } }
通過上面的run方法分析,內部有比較重要的三個方法, 我們下面來一一分析,其中比較重要的是select方法。裡面最重要的邏輯便是判斷執行selector的selectNow方法還是執行select方法。其中selectNow方法是非阻塞的,則select方法是阻塞的。同時這個方法也解決了JVM裡Selector空迴圈的bug,解決思想是發現執行了512次select還是沒有I/O事件,剛通過新建一個新的selector。具體程式碼如下(關鍵程式碼有相應的註釋):
//這裡處理的是select邏輯 private void select(boolean oldWakenUp) throws IOException { //這個Selector就 是 java.nio裡的selector啦 Selector selector = this.selector; try { int selectCnt = 0; long currentTimeNanos = System.nanoTime(); //selectDeadLineNanos得到的是最早任務的的一個執行時間 long selectDeadLineNanos = currentTimeNanos + delayNanos(currentTimeNanos); for (;;) { //如果最早需要執行的任務時間在0.5秒內,則執行selector的sellectNow方法,selectNow方法是非阻塞的 long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L; if (timeoutMillis <= 0) { if (selectCnt == 0) { selector.selectNow(); selectCnt = 1; } break; } //再次確認有任務則執行selectNow方法 if (hasTasks() && wakenUp.compareAndSet(false, true)) { selector.selectNow(); selectCnt = 1; break; } //如果沒有佇列裡沒有任務需要執行,則通過select阻塞方法,得到selectedKeys int selectedKeys = selector.select(timeoutMillis); selectCnt ++; //selectedKeys不為空,則說明有相應的I/O事件,跳出迴圈,的run方法裡會處理具體的I/O事件 if (selectedKeys != 0 || oldWakenUp || wakenUp.get() || hasTasks() || hasScheduledTasks()) { break; } if (Thread.interrupted()) { if (logger.isDebugEnabled()) { logger.debug("Selector.select() returned prematurely because " + "Thread.currentThread().interrupt() was called. Use " + "NioEventLoop.shutdownGracefully() to shutdown the NioEventLoop."); } selectCnt = 1; break; } long time = System.nanoTime(); //如果最早任務的執行時間還沒有相應的I/O事件,則把selectCnt置為1,重新開始內部的for迴圈 if (time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos) { // timeoutMillis elapsed without anything selected. selectCnt = 1; } else if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 && selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) { //下面的邏輯用於處理java selector空輪詢的bug logger.warn( "Selector.select() returned prematurely {} times in a row; rebuilding Selector {}.", selectCnt, selector); rebuildSelector(); selector = this.selector; // Select again to populate selectedKeys. selector.selectNow(); selectCnt = 1; break; } currentTimeNanos = time; } if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS) { if (logger.isDebugEnabled()) { logger.debug("Selector.select() returned prematurely {} times in a row for Selector {}.", selectCnt - 1, selector); } } } catch (CancelledKeyException e) { if (logger.isDebugEnabled()) { logger.debug(CancelledKeyException.class.getSimpleName() + " raised by a Selector {} - JDK bug?", selector, e); } // Harmless exception - log anyway } }
另一個比較重要的方法是processSelectedKeys方法,裡面會處理具體的I/O事件,根據SelectionKey的readyops屬性處理不同的I/O事件,具體程式碼分析如下:
//根據selectedKeys 不同,採用不同的方法處理
private void processSelectedKeys() {
if (selectedKeys != null) {
processSelectedKeysOptimized();
} else {
//呼叫selector.selectedKeys()方法得到selectedKeys
processSelectedKeysPlain(selector.selectedKeys());
}
}
//這裡處理的是沒有優化過的SelectedKeys
private void processSelectedKeysPlain(Set<SelectionKey> selectedKeys) {
if (selectedKeys.isEmpty()) {
return;
}
//迴圈處理SelectionKey
Iterator<SelectionKey> i = selectedKeys.iterator();
for (;;) {
final SelectionKey k = i.next();
final Object a = k.attachment();
i.remove();
if (a instanceof AbstractNioChannel) {
//具體的I/O事件在processSelectedKey方法裡
processSelectedKey(k, (AbstractNioChannel) a);
} else {
@SuppressWarnings("unchecked")
NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
processSelectedKey(k, task);
}
if (!i.hasNext()) {
break;
}
if (needsToSelectAgain) {
selectAgain();
selectedKeys = selector.selectedKeys();
// Create the iterator again to avoid ConcurrentModificationException
if (selectedKeys.isEmpty()) {
break;
} else {
i = selectedKeys.iterator();
}
}
}
}
private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
if (!k.isValid()) {
final EventLoop eventLoop;
try {
eventLoop = ch.eventLoop();
} catch (Throwable ignored) {
return;
}
if (eventLoop != this || eventLoop == null) {
return;
}
unsafe.close(unsafe.voidPromise());
return;
}
try {
int readyOps = k.readyOps();
//根據 readyOps 判斷I/O事件
if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
//處理connect事件,
int ops = k.interestOps();
ops &= ~SelectionKey.OP_CONNECT;
k.interestOps(ops);
unsafe.finishConnect();
}
if ((readyOps & SelectionKey.OP_WRITE) != 0) {
//處理write事件
ch.unsafe().forceFlush();
}
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
//處理read與accept事件
unsafe.read();
}
} catch (CancelledKeyException ignored) {
unsafe.close(unsafe.voidPromise());
}
}
還有一個比較重要的方法便是runAllTasks,這個方法傳入能夠處理task的時間,在超過這個時間後會退出執行,以使執行緒能夠執行I/O邏輯,程式碼邏輯如下:
protected boolean runAllTasks(long timeoutNanos) {
fetchFromScheduledTaskQueue();
//從佇列裡取出執需要執行的task
Runnable task = pollTask();
if (task == null) {
afterRunningAllTasks();
return false;
}
//這個方法求出執行的最後時間
final long deadline = ScheduledFutureTask.nanoTime() + timeoutNanos;
long runTasks = 0;
long lastExecutionTime;
for (;;) {
//執行任務
safeExecute(task);
runTasks ++;
//只有執行了64個任務才比較當前時間與deadline的大小,如果超過了則直接退出,這樣做的原因是取得系統的nanoTime也是個相對耗時的操作
if ((runTasks & 0x3F) == 0) {
lastExecutionTime = ScheduledFutureTask.nanoTime();
if (lastExecutionTime >= deadline) {
break;
}
}
//繼續從佇列裡拿出未執行的task
task = pollTask();
if (task == null) {
lastExecutionTime = ScheduledFutureTask.nanoTime();
break;
}
}
afterRunningAllTasks();
this.lastExecutionTime = lastExecutionTime;
return true;
}
Netty是接收使用者連線後是如何處理的
下面來看看netty接收client端的請求後都做了那些事情,我們從上面的processSelectedKey方法裡我們知道在處理SelectionKey.OP_ACCEPT事件的時候會呼叫
unsafe.read()方法,程式碼截圖如下:
image.png
下面我們通過unsafe來一步步追蹤netty是如何處理接收到的連線的,這裡的unsafe物件是NioMessageUnsafe 物件,程式碼如下:
private final class NioMessageUnsafe extends AbstractNioUnsafe {
//這裡接收讀到的buf物件
private final List<Object> readBuf = new ArrayList<Object>();
@Override
public void read() {
assert eventLoop().inEventLoop();
// config()方法得到的是關於channel的配置資訊
final ChannelConfig config = config();
final ChannelPipeline pipeline = pipeline();
//這裡會呼叫父類的unsafe方法
final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
allocHandle.reset(config);
boolean closed = false;
Throwable exception = null;
try {
try {
do {
//doReadMessages方法呼叫的是channel裡的方法,在這裡會呼叫NioServerSocketChannel類裡的方法。通過readBuf會返回新建的SocketChannel
int localRead = doReadMessages(readBuf);
if (localRead == 0) {
break;
}
if (localRead < 0) {
closed = true;
break;
}
allocHandle.incMessagesRead(localRead);
} while (allocHandle.continueReading());
} catch (Throwable t) {
exception = t;
}
int size = readBuf.size();
//這裡對增加到readBuf裡的物件進行處理通過pipeline物件,最終會呼叫到ServerBootstrapAcceptor類裡的channelRead方法
for (int i = 0; i < size; i ++) {
readPending = false;
pipeline.fireChannelRead(readBuf.get(i));
}
readBuf.clear();
allocHandle.readComplete();
pipeline.fireChannelReadComplete();
if (exception != null) {
closed = closeOnReadError(exception);
pipeline.fireExceptionCaught(exception);
}
if (closed) {
inputShutdown = true;
if (isOpen()) {
close(voidPromise());
}
}
} finally {
if (!readPending && !config.isAutoRead()) {
removeReadOp();
}
}
}
}
下面來看看NioServerSocketChannel類裡是如何處理doReadMessages的,主要的邏輯只是呼叫了java NIO的accept,然後返回SocketChannel,將返回的物件包裝成Netty內部使用的物件。具體程式碼如下:
@Override
protected int doReadMessages(List<Object> buf) throws Exception {
//這裡會呼叫java NIO裡channel的accept方法,返回的也是java NIO裡的SocketChannel物件
SocketChannel ch = SocketUtils.accept(javaChannel());
try {
if (ch != null) {
//新建NioSocketChannel物件,傳的的parent就是當前NioServerSocketChannel, SocketChannel 就是剛剛呼叫accpt方法返回的物件。
buf.add(new NioSocketChannel(this, ch));
return 1;
}
} catch (Throwable t) {
logger.warn("Failed to create a new channel from an accepted socket.", t);
try {
ch.close();
} catch (Throwable t2) {
logger.warn("Failed to close a socket.", t2);
}
}
return 0;
}
ServerBootstrapAcceptor是一個ChannelInboundHandler類,裡面的核心方法如下圖所示:邏輯在圖裡都有相應的標註:
image.png