java執行緒池的實現原理(netty)
部落格已經好久都沒有寫了,感覺自己變慵懶了。。。這本來也是應該早就應該要寫的。。。
在前面讀netty原始碼的時候就可以看到netty本身就自己實現了一個執行緒池,而且也自己實現了future,並且實現的功能更加的強大。。。future還可以新增listener,這個剛開始自己覺得最為神奇。。當看完了它是怎麼實現的之後覺得設計還是挺漂亮的。。。
要自己實現java的執行緒池,那麼有兩個介面是需要熟悉的。。。
最為上層的是executor介面,其中就定義了一個方法:
看註釋就知道,該方法要實現的功能很簡單,就是用來執行提交的runnalbe任務。。/** * Executes the given command at some time in the future. The command * may execute in a new thread, in a pooled thread, or in the calling * thread, at the discretion of the <tt>Executor</tt> implementation. * * @param command the runnable task * @throws RejectedExecutionException if this task cannot be * accepted for execution. * @throws NullPointerException if command is null */ void execute(Runnable command);
接下來是executorservice介面,它擴充套件了executor介面,增加了關閉,submit,invokeAll方法。。使得功能更強一些。。
至於說AbstractExecutorService,它是一抽象類,它實現了ExecutorService中的方法,如果我們繼承這個類來實現自己的執行緒池的話,那麼需要實現留下的execute方法。。。這裡我們就拿其中它的一個方法實現來看看吧:
這裡呼叫了newTaskFor方法,將runnable包裝為RunnableFuture的型別,也就是java自定義的future型別。。最後在呼叫execute方法來執行這個task,然後將futuretask按照future返回。。。public Future<?> submit(Runnable task) { if (task == null) throw new NullPointerException(); RunnableFuture<Void> ftask = newTaskFor(task, null); execute(ftask); return ftask; }
好了接下來我們來看看netty執行緒池的實現吧:
(1)我們可以將executor理解為單個執行任務的處理器,看做單個執行緒
(2)將executorgroup看成是一個executor的集合,也就是把它看成執行緒池。。
這裡我們就先來看看DefaultEventExecutor的實現吧,先來看看它型別的整合體系:
這次我們從後向前看吧,先來看看DefaultEventExecutor的定義:
final class DefaultEventExecutor extends SingleThreadEventExecutor {
DefaultEventExecutor(DefaultEventExecutorGroup parent, ThreadFactory threadFactory) {
super(parent, threadFactory, true);
}
//這個方法將會在生成的執行緒當中被呼叫,用於不斷的來處理已經
@Override
protected void run() {
for (;;) { //一個迴圈,不斷的將task取出來,然後執行就可以了
Runnable task = takeTask();
if (task != null) {
task.run();
updateLastExecutionTime();
}
if (confirmShutdown()) {
break;
}
}
}
}
其實它的實現相對還是很簡單的,定義了一個run方法,它將會在生成的執行緒中呼叫,它完成的功能就是不斷的從佇列中取出任務然後執行。。。
接下來我們來看看一個比較重要的SingleThreadEventExecutor的定義吧:
首先它定義了兩個兩個任務佇列:
private final Queue<Runnable> taskQueue; //當前executor的任務佇列
//這個佇列主要是用於處理帶有時間延遲的任務,可以將其理解為定時任務
final Queue<ScheduledFutureTask<?>> delayedTaskQueue = new PriorityQueue<ScheduledFutureTask<?>>(); //帶優先權的任務佇列
前面的佇列預設採用的是LinkedBlockingQueue,這是執行緒安全的佇列,而還有一個優先權佇列,這個是用來實現延遲任務的,也就是定時任務。。。可以類比nginx或者libevent的定時的實現。。說白了都是同一個道理,通過時間來作為key,將已經超時的節點選出來。。
這裡說一個題外話:java的PriorityQueue是採用堆實現的,小根堆實現的。。有意思吧。。不過這個小根堆的儲存炒採用的是陣列。。。
我們再來看看在該型別裡面定義的執行緒吧,也就是最終的用於執行任務的執行緒:
thread = threadFactory.newThread(new Runnable() { //建立執行緒
@Override //執行緒的執行函式
public void run() {
CURRENT_EVENT_LOOP.set(SingleThreadEventExecutor.this); //儲存當前執行緒的本地物件
boolean success = false;
updateLastExecutionTime();
try {
SingleThreadEventExecutor.this.run(); //開始當前executor的執行函式,run方法延後到了後面的類中實現
success = true;
} catch (Throwable t) {
logger.warn("Unexpected exception from an event executor: ", t);
} finally {
if (state < ST_SHUTTING_DOWN) {
state = ST_SHUTTING_DOWN;
}
// Check if confirmShutdown() was called at the end of the loop.
if (success && gracefulShutdownStartTime == 0) {
logger.error(
"Buggy " + EventExecutor.class.getSimpleName() + " implementation; " +
SingleThreadEventExecutor.class.getSimpleName() + ".confirmShutdown() must be called " +
"before run() implementation terminates.");
}
try {
// Run all remaining tasks and shutdown hooks.
for (;;) {
if (confirmShutdown()) {
break;
}
}
} finally {
try {
cleanup();
} finally {
synchronized (stateLock) {
state = ST_TERMINATED;
}
threadLock.release();
if (!taskQueue.isEmpty()) {
logger.warn(
"An event executor terminated with " +
"non-empty task queue (" + taskQueue.size() + ')');
}
}
}
}
}
});
程式碼比較簡單,就不細說了。。
另外在該型別中還定義了一些基本的操作方法,例如takeTask,runAllTask等方法。
最後它還實現了最為重要的execute方法:
//用於執行一個task,說白了就是把這個task放到任務隊列當中去,如果當前executor中定義的執行緒並沒有啟動的話,那麼要啟動它
@Override
public void execute(Runnable task) {
if (task == null) {
throw new NullPointerException("task");
}
boolean inEventLoop = inEventLoop();
if (inEventLoop) {
addTask(task);
} else {
startThread();
addTask(task);
if (isShutdown() && removeTask(task)) {
reject();
}
}
if (!addTaskWakesUp) {
wakeup(inEventLoop);
}
}
由這部分程式碼,我們可以看出,對於task的處理都是放入任務隊列當中去,然後再在前面提到的run方法中一個一個的處理。。。
接下來我們來看看AbstractEventExecutor型別的一些基本定義吧:
我們就來看看它的submit方法吧:
//下面無非就是一些提交和排程任務,會呼叫newTaskFor方法將任務轉換為futuretask,這裡result為void
@Override
public Future<?> submit(Runnable task) {
return (Future<?>) super.submit(task);
}
其實是呼叫的AbstractExecutorService的submit方法,在這裡將會將傳進來的runnable轉化為callable介面,然後再將其轉化為futuretask,這裡有一點需要注意的是,轉化為futuretask的方法被netty重寫了,用於生成自己的定義的futuretask,在前面就已經說了netty自己實現了自己的future。。。
我們來看重寫的方法吧:
@Override
//重寫的abstractexecutorservice的方法,用於將提交的任務封裝為futuretask,這裡封裝成promisetask
protected final <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
return new PromiseTask<T>(this, runnable, value);
}
再來看看在AbstractExecutorServicesubmit方法吧: public Future<?> submit(Runnable task) {
if (task == null) throw new NullPointerException();
RunnableFuture<Void> ftask = newTaskFor(task, null);
execute(ftask);
return ftask;
}
這裡將會呼叫已經過載的newTaskFor方法來穿傳進來的task轉化為netty自己定義的futuretask,也就是promiseTask,然後再呼叫前面我們提到過的execute方法,它將會把當前的task放入到任務隊列當中去,最後再由最終executor的run方法來挨個挨個的處理。。。至此,對於任務的提交和執行的線路就已經比較的清晰了。。(netty自定義的future的部分以後再說吧)
好了。。接下來我們來看看group的實現吧,也就是池子是怎麼搞的。。。
這裡就選DefaultEventExecutorGroup來分析吧,還是先來看看它的繼承體系:
這裡我們還是從後向前看吧,來看看DefaultEventExecutorGroup的定義:
public class DefaultEventExecutorGroup extends MultithreadEventExecutorGroup {
public DefaultEventExecutorGroup(int nThreads) {
this(nThreads, null);
}
public DefaultEventExecutorGroup(int nThreads, ThreadFactory threadFactory) {
super(nThreads, threadFactory);
}
@Override
protected EventExecutor newChild(
ThreadFactory threadFactory, Object... args) throws Exception {
return new DefaultEventExecutor(this, threadFactory);
}
}
定義比較簡單吧,不過這裡有定義一個比較重要的方法,newChild,用於生成一個一個的executor,這裡的executor也就是在前面說的DefaultEventExecutor。
接下來來看看MultithreadEventExecutorGroup的定義吧:
首先它有兩個比較重要的屬性:
private final EventExecutor[] children;
private final AtomicInteger childIndex = new AtomicInteger();
前面的children為一個數組,用於儲存當前這個group所有的executor,而另外一個執行緒安全的integer會用於簡單的負載均衡(真的很簡單)
來看看它的構造方法:
protected MultithreadEventExecutorGroup(int nThreads, ThreadFactory threadFactory, Object... args) {
if (nThreads <= 0) {
throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads));
}
if (threadFactory == null) {
threadFactory = newDefaultThreadFactory();
}
children = new SingleThreadEventExecutor[nThreads];
for (int i = 0; i < nThreads; i ++) {
boolean success = false;
try {
//呼叫子類的newChild方法,用於生成一個一個的executor
children[i] = newChild(threadFactory, args);
success = true;
} catch (Exception e) {
// TODO: Think about if this is a good exception type
throw new IllegalStateException("failed to create a child event loop", e);
} finally {
if (!success) {
for (int j = 0; j < i; j ++) {
children[j].shutdownGracefully();
}
for (int j = 0; j < i; j ++) {
EventExecutor e = children[j];
try {
while (!e.isTerminated()) {
e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);
}
} catch (InterruptedException interrupted) {
Thread.currentThread().interrupt();
break;
}
}
}
}
}
}
其實沒有做太多的事情,無非就是建立executor,並將它們儲存到陣列當中去。。。
另外還有一個比較重要的方法:
@Override
public EventExecutor next() {
return children[Math.abs(childIndex.getAndIncrement() % children.length)]; //獲取當前index的值,然後將其+1,比較簡單的執行緒間負載均衡
}
這裡就知道這個所謂的負載均衡有多麼簡單了吧。。。哈哈。。。
這裡我們再來看看AbstractEventExecutorGroup吧,我們就看其中一個方法就可以了:
@Override //用於提交任務
public Future<?> submit(Runnable task) {
return next().submit(task);
}
到這裡我們就將整個任務的提交過程弄得比較的清晰了。。。
。。。好了。。好像其實這篇文章也沒有什麼新的內容。。。
但是也知道了,其實要實現一個執行緒池也不是什麼難事,無非是實現一下統一的藉口,然後自己在定義一下自己的執行緒機制就好了。。。完全可以模仿netty的實現方式來實現一個自己的執行緒池。。。