【Netty原始碼解析】NioEventLoop
阿新 • • 發佈:2019-01-10
上一篇部落格【Netty原始碼學習】EventLoopGroup中我們介紹了EventLoopGroup,實際說來EventLoopGroup是EventLoop的一個集合,EventLoop是一個單執行緒的執行緒池,其介面和類實現關係如下:
接下來我們主要介紹實現類NioEventLoop中實現的操作,通過NioEventLoop的繼承關係圖我們可以看到,其就是一個單執行緒的執行緒池。
首先我們看NioEventLoop的建構函式
在建構函式中我們可以看到,其主要操作有獲得NIO操作的Selector, selector = openSelector();
NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider, SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler) { super(parent, executor, false, DEFAULT_MAX_PENDING_TASKS, rejectedExecutionHandler); if (selectorProvider == null) { throw new NullPointerException("selectorProvider"); } if (strategy == null) { throw new NullPointerException("selectStrategy"); } provider = selectorProvider; selector = openSelector(); selectStrategy = strategy; }
selector = openSelector();具體的操作其實就是獲取一個Selector
NioEventLoop中還有的操作就是註冊Channel到Selector中,具體實現如下:private Selector openSelector() { final Selector selector; try { selector = provider.openSelector(); } catch (IOException e) { throw new ChannelException("failed to open a new selector", e); } if (DISABLE_KEYSET_OPTIMIZATION) { return selector; } final SelectedSelectionKeySet selectedKeySet = new SelectedSelectionKeySet(); Object maybeSelectorImplClass = AccessController.doPrivileged(new PrivilegedAction<Object>() { @Override public Object run() { try { return Class.forName( "sun.nio.ch.SelectorImpl", false, PlatformDependent.getSystemClassLoader()); } catch (ClassNotFoundException e) { return e; } catch (SecurityException e) { return e; } } }); if (!(maybeSelectorImplClass instanceof Class) || // ensure the current selector implementation is what we can instrument. !((Class<?>) maybeSelectorImplClass).isAssignableFrom(selector.getClass())) { if (maybeSelectorImplClass instanceof Exception) { Exception e = (Exception) maybeSelectorImplClass; logger.trace("failed to instrument a special java.util.Set into: {}", selector, e); } return selector; } final Class<?> selectorImplClass = (Class<?>) maybeSelectorImplClass; Object maybeException = AccessController.doPrivileged(new PrivilegedAction<Object>() { @Override public Object run() { try { Field selectedKeysField = selectorImplClass.getDeclaredField("selectedKeys"); Field publicSelectedKeysField = selectorImplClass.getDeclaredField("publicSelectedKeys"); selectedKeysField.setAccessible(true); publicSelectedKeysField.setAccessible(true); selectedKeysField.set(selector, selectedKeySet); publicSelectedKeysField.set(selector, selectedKeySet); return null; } catch (NoSuchFieldException e) { return e; } catch (IllegalAccessException e) { return e; } } }); if (maybeException instanceof Exception) { selectedKeys = null; Exception e = (Exception) maybeException; logger.trace("failed to instrument a special java.util.Set into: {}", selector, e); } else { selectedKeys = selectedKeySet; logger.trace("instrumented a special java.util.Set into: {}", selector); } return selector; }
在建構函式中呼叫了父類的建構函式,其父類的建構函式及實現更多的是和任務佇列相關的,public void register(final SelectableChannel ch, final int interestOps, final NioTask<?> task) { if (ch == null) { throw new NullPointerException("ch"); } if (interestOps == 0) { throw new IllegalArgumentException("interestOps must be non-zero."); } if ((interestOps & ~ch.validOps()) != 0) { throw new IllegalArgumentException( "invalid interestOps: " + interestOps + "(validOps: " + ch.validOps() + ')'); } if (task == null) { throw new NullPointerException("task"); } if (isShutdown()) { throw new IllegalStateException("event loop shut down"); } try { ch.register(selector, interestOps, task); } catch (Exception e) { throw new EventLoopException("failed to register a channel", e); } }
父類重寫了執行緒池操作的類
@Override
public void execute(Runnable task) {
if (task == null) {
throw new NullPointerException("task");
}
boolean inEventLoop = inEventLoop();
if (inEventLoop) {
addTask(task);
} else {
startThread();
addTask(task);
if (isShutdown() && removeTask(task)) {
reject();
}
}
if (!addTaskWakesUp && wakesUpForTask(task)) {
wakeup(inEventLoop);
}
}
在NioEventLoop中的run方法中我們可以看到任務佇列的執行
@Override
protected void run() {
for (;;) {
try {
switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
case SelectStrategy.CONTINUE:
continue;
case SelectStrategy.SELECT:
select(wakenUp.getAndSet(false));
if (wakenUp.get()) {
selector.wakeup();
}
default:
// fallthrough
}
cancelledKeys = 0;
needsToSelectAgain = false;
final int ioRatio = this.ioRatio;
if (ioRatio == 100) {
processSelectedKeys();
runAllTasks();
} else {
final long ioStartTime = System.nanoTime();
processSelectedKeys();
final long ioTime = System.nanoTime() - ioStartTime;
runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
}
if (isShuttingDown()) {
closeAll();
if (confirmShutdown()) {
break;
}
}
} catch (Throwable t) {
logger.warn("Unexpected exception in the selector loop.", t);
// Prevent possible consecutive immediate failures that lead to
// excessive CPU consumption.
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
// Ignore.
}
}
}
}
通過上面的分析,我們基本上了解了eventloop和excutor,以及執行緒之間的關係了,其實netty就是實現了自己的一個執行緒池,來執行執行緒任務。大概的流程就是在eventloop的selector中註冊channel,然後channel的事件處理都以執行緒任務的形式執行,並且先放入任務佇列中(因為它重寫了executor方法,在SingleThreadEventExecutor中我們可以看到重寫的executor函式的實現),然後執行緒不斷的從任務佇列裡面取任務執行。