Netty原始碼之EventLoop建立,啟動,執行
在Netty的執行執行緒中是沒有用鎖來保證同步的,並且它的執行效率非常高,那麼,本文就先看一下Netty是如何設計EventLoop的,是如何執行工作的:
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
從這行程式碼切入,先來看一下Netty的執行緒池的設計:
public NioEventLoopGroup(int nThreads, Executor executor, final SelectorProvider selectorProvider, final SelectStrategyFactory selectStrategyFactory) { super(nThreads, executor, selectorProvider, selectStrategyFactory, RejectedExecutionHandlers.reject()); }
執行緒池的構造方法最終通過這個構造方法呼叫了父類的構造方法,這裡面的引數有,執行緒的數量,執行器,selectorProvider, selectStrategyFactory, 和拒絕策略:
protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) {
super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args);
}
父類的構造方法首先會想我們傳入的執行緒數量進行處理,如果我們傳入的執行緒數量是0,那麼就給他們賦值一個預設的執行緒的數目,這個數目是cpu核數的二倍,然後繼續呼叫父類的構造方法:
protected MultithreadEventExecutorGroup(int nThreads, Executor executor, EventExecutorChooserFactory chooserFactory, Object... args) { //如果執行緒數量小於零,那麼丟擲異常 if (nThreads <= 0) { throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads)); } //如果執行緒任務的執行器為空,那麼就先建立一個 if (executor == null) { executor = new ThreadPerTaskExecutor(newDefaultThreadFactory()); } //建立一個EventLoop的陣列 children = new EventExecutor[nThreads]; //建立每個執行緒 for (int i = 0; i < nThreads; i ++) { boolean success = false; try { //建立執行緒的具體實現 children[i] = newChild(executor, args); success = true; } catch (Exception e) { // TODO: Think about if this is a good exception type throw new IllegalStateException("failed to create a child event loop", e); } finally { //如果一旦有一個執行緒建立失敗,那麼就要關閉所有已經建立的執行緒 if (!success) { for (int j = 0; j < i; j ++) { children[j].shutdownGracefully(); } for (int j = 0; j < i; j ++) { EventExecutor e = children[j]; try { while (!e.isTerminated()) { e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS); } } catch (InterruptedException interrupted) { // Let the caller handle the interruption. Thread.currentThread().interrupt(); break; } } } } } //建立一個執行緒選擇器 chooser = chooserFactory.newChooser(children); //建立一個事件監聽,執行緒成功建立後出發 final FutureListener<Object> terminationListener = new FutureListener<Object>() { @Override public void operationComplete(Future<Object> future) throws Exception { if (terminatedChildren.incrementAndGet() == children.length) { terminationFuture.setSuccess(null); } } }; for (EventExecutor e: children) { e.terminationFuture().addListener(terminationListener); } Set<EventExecutor> childrenSet = new LinkedHashSet<EventExecutor>(children.length); Collections.addAll(childrenSet, children); readonlyChildren = Collections.unmodifiableSet(childrenSet); }
在這個建構函式裡做了如下幾件事: 1、判斷傳入的執行緒數目是否合法 2、是否建立了任務執行器,沒有就先建立一個 3、建立執行緒 4、如果有執行緒建立失敗,那麼就將他們全部關閉 5、建立一個事件,監聽執行緒是否建立完畢 接下來,我們看一下任務執行器excutor是如何建立的
new ThreadPerTaskExecutor(newDefaultThreadFactory());
首先它向構造方法中傳入了一個預設的執行緒工廠作為引數:
protected ThreadFactory newDefaultThreadFactory() {
return new DefaultThreadFactory(getClass());
}
getClass是一個本地方法,得到的是當前執行的類的class物件,在這就是EventLoopGroup:
public DefaultThreadFactory(Class<?> poolType, boolean daemon, int priority) {
this(toPoolName(poolType), daemon, priority);
}
之後它又呼叫了這個構造方法,並且對我們傳入的執行緒池的class物件做了處理,返回了執行緒池的名字:
public static String toPoolName(Class<?> poolType) {
//判斷class物件是否為空
if (poolType == null) {
throw new NullPointerException("poolType");
}
//得到class物件的名字
String poolName = StringUtil.simpleClassName(poolType);
//根據名字的長短建立名字
switch (poolName.length()) {
case 0:
return "unknown";
case 1:
return poolName.toLowerCase(Locale.US);
default:
//這部就是返回了一個nioEventLoopGroup-poolId
if (Character.isUpperCase(poolName.charAt(0)) && Character.isLowerCase(poolName.charAt(1))) {
return Character.toLowerCase(poolName.charAt(0)) + poolName.substring(1);
} else {
return poolName;
}
}
}
這個方法就是建立了一個執行緒池的名字,畢竟一個執行緒池會有一個任務執行器,所以要用名字對他們進行區分:
public DefaultThreadFactory(String poolName, boolean daemon, int priority, ThreadGroup threadGroup) {
if (poolName == null) {
throw new NullPointerException("poolName");
}
if (priority < Thread.MIN_PRIORITY || priority > Thread.MAX_PRIORITY) {
throw new IllegalArgumentException(
"priority: " + priority + " (expected: Thread.MIN_PRIORITY <= priority <= Thread.MAX_PRIORITY)");
}
prefix = poolName + '-' + poolId.incrementAndGet() + '-';
this.daemon = daemon;
this.priority = priority;
this.threadGroup = threadGroup;
}
最終呼叫的構造方法將我們傳入的資訊進行了儲存,回到最初建立任務執行器的那個構造方法:
public ThreadPerTaskExecutor(ThreadFactory threadFactory) {
if (threadFactory == null) {
throw new NullPointerException("threadFactory");
}
this.threadFactory = threadFactory;
}
把剛才建立的執行緒工廠進行儲存,接下來看一下eventLoop是如何被建立的:
children[i] = newChild(executor, args);
回到這行程式碼,進入newChild方法:
protected EventLoop newChild(Executor executor, Object... args) throws Exception {
return new NioEventLoop(this, executor, (SelectorProvider) args[0],
((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2]);
}
向我們返回了一個NioEventLoop物件,繼續看它的構造方法:
NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,
SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler) {
//呼叫父類的構造方法
super(parent, executor, false, DEFAULT_MAX_PENDING_TASKS, rejectedExecutionHandler);
if (selectorProvider == null) {
throw new NullPointerException("selectorProvider");
}
if (strategy == null) {
throw new NullPointerException("selectStrategy");
}
//儲存屬性
provider = selectorProvider;
//通過provider建立一個選擇器,之後的客戶端channel都是要註冊到這上面的
selector = openSelector();
selectStrategy = strategy;
}
這個方法對我們傳入的引數進行了儲存,並且繼續呼叫父類的構造方法:
protected SingleThreadEventExecutor(EventExecutorGroup parent, Executor executor,
boolean addTaskWakesUp, int maxPendingTasks,
RejectedExecutionHandler rejectedHandler) {
super(parent);
this.addTaskWakesUp = addTaskWakesUp;
this.maxPendingTasks = Math.max(16, maxPendingTasks);
this.executor = ObjectUtil.checkNotNull(executor, "executor");
taskQueue = newTaskQueue(this.maxPendingTasks);
rejectedExecutionHandler = ObjectUtil.checkNotNull(rejectedHandler, "rejectedHandler");
}
我們發現,原來一個eventLoop也是一個執行器,在這個構造方法面建立了一個任務佇列,用來放當前eventLoop需要執行的任務:
chooser = chooserFactory.newChooser(children);
接下來看一下執行緒選擇器的建立,現在又出來冒出來了一個chooserFactory,這個是從哪裡來的呢?
protected MultithreadEventExecutorGroup(int nThreads, Executor executor, Object... args) {
this(nThreads, executor, DefaultEventExecutorChooserFactory.INSTANCE, args);
}
我們來到構造方法,發現他是從DefaultEventExecutorChooserFactory這個工廠裡獲取的單例,我們走進去看一下chooser是怎麼被建立的:
public EventExecutorChooser newChooser(EventExecutor[] executors) {
//根據能不能被優化選擇建立不同的類
if (isPowerOfTwo(executors.length)) {
//執行緒數目是2的倍數
return new PowerOfTowEventExecutorChooser(executors);
} else {
//執行緒數目不是2的倍數
return new GenericEventExecutorChooser(executors);
}
}
Netty連最簡單的輪詢查詢都做了優化,那麼我們看一下優化和沒優化有什麼區別:
//PowerOfTowEventExecutorChooser
public EventExecutor next() {
return executors[idx.getAndIncrement() & executors.length - 1];
}
//GenericEventExecutorChooser
public EventExecutor next() {
return executors[Math.abs(idx.getAndIncrement() % executors.length)];
}
其實只是對next方法做了優化,如果執行緒數目是2的倍數,那麼就可以用位運算來提高效率。 到這在建立執行緒池的構造方法就執行結束了,那麼執行緒是建立好了,什麼時候啟動的呢?不知道大家還記不記得下面這個方法:
private static void doBind0(
final ChannelFuture regFuture, final Channel channel,
final SocketAddress localAddress, final ChannelPromise promise) {
// 讓需要繫結埠的channel繫結的eventLoop來執行繫結的任務
channel.eventLoop().execute(new Runnable() {
@Override
public void run() {
if (regFuture.isSuccess()) {
channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
} else {
promise.setFailure(regFuture.cause());
}
}
});
}
在進行繫結的時候就已經開始讓執行緒去執行任務了,可是這個時候執行緒還沒有啟動,我們看一下Netty是如何操作的:
public void execute(Runnable task) {
if (task == null) {
throw new NullPointerException("task");
}
//看下一下外部執行緒是不是和當前eventLoop繫結的執行緒是同一個,如果是就立即執行
boolean inEventLoop = inEventLoop();
if (inEventLoop) {
addTask(task);
} else {
//如果不是可能執行緒還沒有啟動
startThread();
addTask(task);
if (isShutdown() && removeTask(task)) {
reject();
}
}
if (!addTaskWakesUp && wakesUpForTask(task)) {
wakeup(inEventLoop);
}
}
這個方法首先判斷外部執行緒是不是和當前eventLoop繫結的執行緒是同一個,如果是就立即放到任務佇列裡面,如果不是那就是執行緒還沒有啟動,啟動執行緒,再新增到任務佇列裡面:
private void startThread() {
//判斷執行緒是否真的沒有啟動
if (STATE_UPDATER.get(this) == ST_NOT_STARTED) {
//通過cas操作將當前eventLoop的狀態修改成激動狀態
if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {
//啟動eventLoop
doStartThread();
}
}
}
這個方法做了三件事 1、判斷eventLoop是否未啟動 2、通過CAS操作修改eventLoop的狀態 3、啟動eventLoop 我們仔細看一下eventLoop是如何啟動的:
private void doStartThread() {
assert thread == null;
executor.execute(new Runnable() {
@Override
public void run() {
//得到當前執行緒
thread = Thread.currentThread();
if (interrupted) {
thread.interrupt();
}
boolean success = false;
updateLastExecutionTime();
try {
//使當前eventLoop啟動
SingleThreadEventExecutor.this.run();
success = true;
} catch (Throwable t) {
logger.warn("Unexpected exception from an event executor: ", t);
} finally {
for (;;) {
int oldState = STATE_UPDATER.get(SingleThreadEventExecutor.this);
if (oldState >= ST_SHUTTING_DOWN || STATE_UPDATER.compareAndSet(
SingleThreadEventExecutor.this, oldState, ST_SHUTTING_DOWN)) {
break;
}
}
// Check if confirmShutdown() was called at the end of the loop.
if (success && gracefulShutdownStartTime == 0) {
logger.error("Buggy " + EventExecutor.class.getSimpleName() + " implementation; " +
SingleThreadEventExecutor.class.getSimpleName() + ".confirmShutdown() must be called " +
"before run() implementation terminates.");
}
...
}
}
});
}
這個方法將啟動eventLoop封裝成了一個task交給執行緒池的執行器去執行,這個task的主要工作就是將eventLoop啟動,我們先看一下執行緒池的執行器是如何執行任務的:
public void execute(Runnable command) {
threadFactory.newThread(command).start();
}
它呼叫了之前建立的threadFactory的newThread方法,接著又呼叫了start(),我們大致也能猜出來newThread返回了一個執行緒:
public Thread newThread(Runnable r) {
Thread t = newThread(new DefaultRunnableDecorator(r), prefix + nextId.incrementAndGet());
try {
if (t.isDaemon()) {
if (!daemon) {
t.setDaemon(false);
}
} else {
if (daemon) {
t.setDaemon(true);
}
}
if (t.getPriority() != priority) {
t.setPriority(priority);
}
} catch (Exception ignored) {
// Doesn't matter even if failed to set.
}
return t;
}
真正建立執行緒的是這個方法:
Thread t = newThread(new DefaultRunnableDecorator(r), prefix + nextId.incrementAndGet());
protected Thread newThread(Runnable r, String name) {
return new FastThreadLocalThread(threadGroup, r, name);
}
原來,eventLoop的執行緒是一個FastThreadLocalThread,但是這個執行緒類是繼承自Thread,只不過是對他做了優化,之後有時間我們看一下這個執行緒類是如何做優化的。OK,eventLoop裡面工作的執行緒就建立好了,接下來就看他是如何被啟動的:
SingleThreadEventExecutor.this.run();
protected void run() {
for (;;) {
try {
switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
case SelectStrategy.CONTINUE:
continue;
//輪詢獲得io事件
case SelectStrategy.SELECT:
select(wakenUp.getAndSet(false));
if (wakenUp.get()) {
selector.wakeup();
}
default:
// fallthrough
}
cancelledKeys = 0;
needsToSelectAgain = false;
final int ioRatio = this.ioRatio;
if (ioRatio == 100) {
try {
//執行IO事件
processSelectedKeys();
} finally {
// 執行任務佇列的任務
runAllTasks();
}
} else {
final long ioStartTime = System.nanoTime();
try {
processSelectedKeys();
} finally {
// Ensure we always run tasks.
final long ioTime = System.nanoTime() - ioStartTime;
runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
}
}
} catch (Throwable t) {
handleLoopException(t);
}
// Always handle shutdown even if the loop processing threw an exception.
try {
if (isShuttingDown()) {
closeAll();
if (confirmShutdown()) {
return;
}
}
} catch (Throwable t) {
handleLoopException(t);
}
}
}
這個方法裡面有一個死迴圈,先是輪詢獲取io事件,然後根據執行io事件的時間和執行任務佇列的時間是否是1:1,否則來決定如何處理這兩種任務的銜接,我們先看一下如何輪詢獲得io事件的:
private void select(boolean oldWakenUp) throws IOException {
//得到當前eventLoop的選擇器
Selector selector = this.selector;
try {
//記錄空輪詢的次數
int selectCnt = 0;
//得到開始輪詢的時間
long currentTimeNanos = System.nanoTime();
//獲得輪詢的截至時間
long selectDeadLineNanos = currentTimeNanos + delayNanos(currentTimeNanos);
for (;;) {
long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L;
//如果已經到截至時間了,進行一次非阻塞的輪詢,返回
if (timeoutMillis <= 0) {
if (selectCnt == 0) {
selector.selectNow();
selectCnt = 1;
}
break;
}
// 如果任務佇列裡面有任務並且成功的將select喚醒,進行一次非阻塞select返回
if (hasTasks() && wakenUp.compareAndSet(false, true)) {
selector.selectNow();
selectCnt = 1;
break;
}
//設定過期時間進行一次阻塞輪詢
int selectedKeys = selector.select(timeoutMillis);
//空輪詢次數+1
selectCnt ++;
//如果有io事件,或者執行緒設定的狀態為喚醒狀態,或者當前狀態處於喚醒或者任務佇列裡面有任務,或者定時任務佇列裡面有任務,則退出
if (selectedKeys != 0 || oldWakenUp || wakenUp.get() || hasTasks() || hasScheduledTasks()) {
break;
}
if (Thread.interrupted()) {
// Thread was interrupted so reset selected keys and break so we not run into a busy loop.
// As this is most likely a bug in the handler of the user or it's client library we will
// also log it.
//
// See https://github.com/netty/netty/issues/2426
if (logger.isDebugEnabled()) {
logger.debug("Selector.select() returned prematurely because " +
"Thread.currentThread().interrupt() was called. Use " +
"NioEventLoop.shutdownGracefully() to shutdown the NioEventLoop.");
}
selectCnt = 1;
break;
}
long time = System.nanoTime();
if (time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos) {
// 如果已經超過設定的輪詢時間了,重置空輪詢次數
selectCnt = 1;
} else if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 &&
//如果在截至時間內空輪詢的次數超過512次,就要重建一個selector,為了防止cpu100%的jdkbug
rebuildSelector();
selector = this.selector;
// 進行一次非阻塞輪詢
selector.selectNow();
selectCnt = 1;
break;
}
currentTimeNanos = time;
}
if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS) {
if (logger.isDebugEnabled()) {
logger.debug("Selector.select() returned prematurely {} times in a row for Selector {}.",
selectCnt - 1, selector);
}
}
} catch (CancelledKeyException e) {
if (logger.isDebugEnabled()) {
logger.debug(CancelledKeyException.class.getSimpleName() + " raised by a Selector {} - JDK bug?",
selector, e);
}
// Harmless exception - log anyway
}
}
這個方法的主要做的工作在註釋裡都說明了,我們著重來看一下是如何來重建selector的:
public void rebuildSelector() {
//如果當前外部執行緒不是當前eventLoop繫結的外部執行緒,那麼把這項工作封裝成一個task放到任務佇列裡面
if (!inEventLoop()) {
execute(new Runnable() {
@Override
public void run() {
rebuildSelector();
}
});
return;
}
//儲存舊的selector
final Selector oldSelector = selector;
final Selector newSelector;
if (oldSelector == null) {
return;
}
try {
//建立一個新的selector
newSelector = openSelector();
} catch (Exception e) {
logger.warn("Failed to create a new Selector.", e);
return;
}
// 將原本註冊到舊的selector上的channel註冊到新的selector上
int nChannels = 0;
for (;;) {
try {
//遍歷oldSelector上所有的key
for (SelectionKey key: oldSelector.keys()) {
//獲得當前遍歷道德channel
Object a = key.attachment();
try {
if (!key.isValid() || key.channel().keyFor(newSelector) != null) {
continue;
}
//獲得當前channel感興趣的事
int interestOps = key.interestOps();
key.cancel();
//註冊到新的selector上
SelectionKey newKey = key.channel().register(newSelector, interestOps, a);
if (a instanceof AbstractNioChannel) {
// Update SelectionKey
((AbstractNioChannel) a).selectionKey = newKey;
}
nChannels ++;
} catch (Exception e) {
logger.warn("Failed to re-register a Channel to the new Selector.", e);
if (a instanceof AbstractNioChannel) {
AbstractNioChannel ch = (AbstractNioChannel) a;
ch.unsafe().close(ch.unsafe().voidPromise());
} else {
@SuppressWarnings("unchecked")
NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
invokeChannelUnregistered(task, key, e);
}
}
}
} catch (ConcurrentModificationException e) {
// Probably due to concurrent modification of the key set.
continue;
}
break;
}
selector = newSelector;
try {
// time to close the old selector as everything else is registered to the new one
oldSelector.close();
} catch (Throwable t) {
if (logger.isWarnEnabled()) {
logger.warn("Failed to close the old Selector.", t);
}
}
logger.info("Migrated " + nChannels + " channel(s) to the new Selector.");
}
就是先建立了一個新的selector,然後通過迴圈把所有的註冊到舊的selector上的channel註冊到新的selector上。 接下來我們看一下Netty是如何處理io事件的:
private void processSelectedKeysOptimized(SelectionKey[] selectedKeys) {
for (int i = 0;; i ++) {
//獲得當前的attachment,也就是channel
final SelectionKey k = selectedKeys[i];
if (k == null) {
break;
}
// 這個位置處理完了要置為空
selectedKeys[i] = null;
//獲得channel
final Object a = k.attachment();
//真正執行io事件
if (a instanceof AbstractNioChannel) {
processSelectedKey(k, (AbstractNioChannel) a);
} else {
@SuppressWarnings("unchecked")
NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
processSelectedKey(k, task);
}
if (needsToSelectAgain) {
// null out entries in the array to allow to have it GC'ed once the Channel close
// See https://github.com/netty/netty/issues/2363
for (;;) {
i++;
if (selectedKeys[i] == null) {
break;
}
selectedKeys[i] = null;
}
//再次輪詢io事件
selectAgain();
// 重新過的selectedKey
selectedKeys = this.selectedKeys.flip();
i = -1;
}
}
}
private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
//獲得unsafe類
final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
if (!k.isValid()) {
final EventLoop eventLoop;
try {
//獲得當前channel繫結的eventLoop
eventLoop = ch.eventLoop();
} catch (Throwable ignored) {
return;
}
if (eventLoop != this || eventLoop == null) {
return;
}
// close the channel if the key is not valid anymore
unsafe.close(unsafe.voidPromise());
return;
}
try {
//獲得事件
int readyOps = k.readyOps();
// 根據不同的事件進入不同的分支進行處理
if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
// remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking
// See https://github.com/netty/netty/issues/924
int ops = k.interestOps();
ops &= ~SelectionKey.OP_CONNECT;
k.interestOps(ops);
unsafe.finishConnect();
}
// Process OP_WRITE first as we may be able to write some queued buffers and so free memory.
if ((readyOps & SelectionKey.OP_WRITE) != 0) {
// Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write
ch.unsafe().forceFlush();
}
// Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead
// to a spin loop
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
unsafe.read();
if (!ch.isOpen()) {
// Connection already closed - no need to handle write.
return;
}
}
} catch (CancelledKeyException ignored) {
unsafe.close(unsafe.voidPromise());
}
}
處理完io事件時候,就要處理任務佇列裡面的方法:
protected boolean runAllTasks() {
assert inEventLoop();
boolean fetchedAll;
boolean ranAtLeastOne = false;
do {
//聚類任務佇列和定時任務佇列
fetchedAll = fetchFromScheduledTaskQueue();
//執行任務佇列的方法
if (runAllTasksFrom(taskQueue)) {
ranAtLeastOne = true;
}
} while (!fetchedAll); // keep on processing until we fetched all scheduled tasks.
if (ranAtLeastOne) {
lastExecutionTime = ScheduledFutureTask.nanoTime();
}
afterRunningAllTasks();
return ranAtLeastOne;
}
這個方法主要做了兩件事: 1、將定時任務和普通任務合併 2、執行任務
private boolean fetchFromScheduledTaskQueue() {
//獲得執行任務的截至時間
long nanoTime = AbstractScheduledEventExecutor.nanoTime();
//根據截至時間從任務佇列中一個一個的取出在截至時間到來之前要執行的任務
Runnable scheduledTask = pollScheduledTask(nanoTime);
while (scheduledTask != null) {
//將任務放到普通任務佇列中
if (!taskQueue.offer(scheduledTask)) {
// 如果放入失敗,那麼就要將它重新返回定是佇列裡面
scheduledTaskQueue().add((ScheduledFutureTask<?>) scheduledTask);
return false;
}
scheduledTask = pollScheduledTask(nanoTime);
}
return true;
}
這個方法首先會得到一個截至時間,然後根據這個截至時間到定時任務佇列裡面一個一個的拿到在截至時間之內要被執行的任務放到普通任務佇列裡面。
protected final boolean runAllTasksFrom(Queue<Runnable> taskQueue) {
//獲得任務佇列裡面的一個任務
Runnable task = pollTaskFrom(taskQueue);
if (task == null) {
return false;
}
for (;;) {
//開始執行
safeExecute(task);
task = pollTaskFrom(taskQueue);
if (task == null) {
return true;
}
}
}
protected static void safeExecute(Runnable task) {
try {
task.run();
} catch (Throwable t) {
logger.warn("A task raised an exception. Task: {}", task, t);
}
}
這個方法通過迴圈一個一個的執行。 這樣eventLoop的執行流程就介紹完了,還有一個定時任務佇列之前沒提到過,我們一起來看一下
Queue<ScheduledFutureTask<?>> scheduledTaskQueue() {
if (scheduledTaskQueue == null) {
scheduledTaskQueue = new PriorityQueue<ScheduledFutureTask<?>>();
}
return scheduledTaskQueue;
}
我們發現了有這樣一個方法,裡面返回定時任務佇列,再返回之前進行非空判斷如果為空就建立一個定時任務佇列,怪不得我們之前我們沒提到過,原來是用到的時候才會建立。
public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {
ObjectUtil.checkNotNull(command, "command");
ObjectUtil.checkNotNull(unit, "unit");
if (delay < 0) {
throw new IllegalArgumentException(
String.format("delay: %d (expected: >= 0)", delay));
}
return schedule(new ScheduledFutureTask<Void>(
this, command, null, ScheduledFutureTask.deadlineNanos(unit.toNanos(delay))));
}
這是一個公共方法,用來想定時任務佇列裡面新增一個定時任務
<V> ScheduledFuture<V> schedule(final ScheduledFutureTask<V> task) {
if (inEventLoop()) {
scheduledTaskQueue().add(task);
} else {
execute(new Runnable() {
@Override
public void run() {
scheduledTaskQueue().add(task);
}
});
}
return task;
}
如果當前的外部執行緒是當前eventLoop繫結的執行緒,那麼就直接新增進去,如果不是,把它封裝成一個普通的task放到任務佇列裡面等待序列執行。 好了這節我們就介紹結束了,在下一篇文章中我們將介紹客戶端channel加入連線的處理。