Fork-Join 原理深入分析(二)
??本文是將 Fork-Join 復雜且較為龐大的框架分成5個小點來分析 Fork-Join 框架的實現原理,一個個點地理解透 Fork-Join 的核心原理。
1. Frok-Join 框架的核心類的結構分析
??Fork-Join 框架有三個核心類:ForkJoinPool,ForkJoinWorkerThread,ForkJoinTask。下面將分析這三個類的數據結構,初步了解三個類的核心成員。
ForkJoinPool
//繼承了 AbstractExecutorService 類
public class ForkJoinPool extends AbstractExecutorService{
//任務隊列數組,存儲了所有任務隊列,包括 內部隊列 和 外部隊列
volatile WorkQueue[] workQueues; // main registry
//一個靜態常量,ForkJoinPool 提供的內部公用的線程池
static final ForkJoinPool common;
//默認的線程工廠類
public static final ForkJoinWorkerThreadFactory defaultForkJoinWorkerThreadFactory;
}
ForkJoinWorkerThread
//繼承了 Thread 類
public class ForkJoinWorkerThread extends Thread {
//線程工作的線程池,即此線程所屬的線程池
final ForkJoinPool pool;
// 線程的內部隊列
final ForkJoinPool.WorkQueue workQueue;
//.....
}
2. ForkJoinPool 中線程的創建
2.1 默認的線程工廠類
ForkJoinPool 中的線程是由默認的線程工廠類 defaultForkJoinWorkerThreadFactory
創建的
//默認的工廠類
public static final ForkJoinWorkerThreadFactory defaultForkJoinWorkerThreadFactory;
defaultForkJoinWorkerThreadFactory =
new DefaultForkJoinWorkerThreadFactory();
defaultForkJoinWorkerThreadFactory
創建線程的方法 newThread()
,其實就是傳入當前的線程池,直接創建。
/**
* Default ForkJoinWorkerThreadFactory implementation; creates a
* new ForkJoinWorkerThread.
*/
static final class DefaultForkJoinWorkerThreadFactory
implements ForkJoinWorkerThreadFactory {
public final ForkJoinWorkerThread newThread(ForkJoinPool pool) {
return new ForkJoinWorkerThread(pool);
}
}
2.2 ForkJoinWorkerThread 的構造方法
protected ForkJoinWorkerThread(ForkJoinPool pool) {
// Use a placeholder until a useful name can be set in registerWorker
super("aForkJoinWorkerThread");
//線程工作的線程池,即創建這個線程的線程池
this.pool = pool;
//註冊線程到線程池中,並返回此線程的內部任務隊列
this.workQueue = pool.registerWorker(this);
}
創建一個工作線程,最後一步還要註冊到其所屬的線程池中,看下面源碼,註冊的過程可以分為兩步:
- 創建一個新的任務隊列
- 為此任務隊列分配一個線程池的索引,將任務隊列存儲在線程數組 workQueues 的此索引位置,並返回這個任務隊列,作為線程的內部任務隊列。線程註冊成功。
final WorkQueue registerWorker(ForkJoinWorkerThread wt) {
UncaughtExceptionHandler handler;
wt.setDaemon(true); // configure thread
if ((handler = ueh) != null)
wt.setUncaughtExceptionHandler(handler);
//創建一個任務隊列
WorkQueue w = new WorkQueue(this, wt);
int i = 0; //分配一個線程池的索引
int mode = config & MODE_MASK;
int rs = lockRunState();
try {
WorkQueue[] ws; int n; // skip if no array
if ((ws = workQueues) != null && (n = ws.length) > 0) {
int s = indexSeed += SEED_INCREMENT; // unlikely to collide
int m = n - 1;
//計算 索引
i = ((s << 1) | 1) & m; // odd-numbered indices
if (ws[i] != null) { //如果索引沖突
int probes = 0; // step by approx half n
int step = (n <= 4) ? 2 : ((n >>> 1) & EVENMASK) + 2;
while (ws[i = (i + step) & m] != null) {
if (++probes >= n) {
//擴容:以原來的數組的長度的兩倍來創建一個新的數組,再復制舊數組的內衣
workQueues = ws = Arrays.copyOf(ws, n <<= 1);
m = n - 1;
probes = 0;
}
}
}
w.hint = s; // use as random seed
w.config = i | mode;
w.scanState = i; // publication fence
//剛創建的任務隊列加入到線程池的 任務隊列數組中
ws[i] = w;
}
} finally {
unlockRunState(rs, rs & ~RSLOCK);
}
wt.setName(workerNamePrefix.concat(Integer.toString(i >>> 1)));
return w;
}
??對應註冊線程,ForkJoinPool
也提供了一個取消線程註冊的方法 deregisterWorker()
,在線程被銷毀的時候調用,此處就不說了。
3. ForkJoinTask的fork()、join()方法
??在上一篇文章中,我們在實現 分治編程時,主要就是調用 ForkJoinTask
的 fork()
和 join()
方法。fork()
方法用於提交子任務,而 join()
方法則用於等待子任務的完成。而這個過程中,將涉及到 “工作竊取算法”。
3.1 fork( ) 方法提交任務
先來看一下 fork()
方法的源碼
public final ForkJoinTask<V> fork() {
Thread t;
//判斷是否是一個 工作線程
if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread)
//加入到內部隊列中
((ForkJoinWorkerThread)t).workQueue.push(this);
else//由 common 線程池來執行任務
ForkJoinPool.common.externalPush(this);
return this;
}
??源碼中,fork()
方法先判斷當前線程(調用fork()
來提交任務的線程)是不是一個 ForkJoinWorkerThread
的工作線程,如果是,則將任務加入到內部隊列中,否則,由 ForkJoinPool
提供的內部公用的線程池 common 線程池
來執行這個任務。
//ForkJoinPool 提供的內部公用的線程池
static final ForkJoinPool common;
??順便說一下,根據上面的說法,意味著我們可以在普通線程池中直接調用 fork()
方法來提交任務到一個默認提供的線程池中。這將非常方便。假如,你要在程序中處理大任務,需要分治編程,但你僅僅只處理一次,以後就不會用到,而且任務不算太大,不需要設置特定的參數,那麽你肯定不想為此創建一個線程池,這時默認的提供的線程池將會很有用。
下面是我基於上一篇文章例子改造的,CountTask 類在我上一篇文章中找到
public class Test_34 {
public static void main(String[] args) throws InterruptedException, ExecutionException {
// 創建一個計算任務,計算 由1加到12
CountTask countTask2 = new CountTask(1, 12);
//直接在main線程中調用 fork 來提交任務,
countTask2.fork();
//沒有創建線程池,使用commonPool線程池
System.out.println(countTask2.get());
}
}
運行結果:
任務過大,切割的任務: 1加到 12的和 執行此任務的線程:ForkJoinPool.commonPool-worker-1
任務過大,切割的任務: 1加到 6的和 執行此任務的線程:ForkJoinPool.commonPool-worker-2
任務過大,切割的任務: 7加到 12的和 執行此任務的線程:ForkJoinPool.commonPool-worker-3
執行計算任務,計算 1到 3的和 ,結果是:6 執行此任務的線程:ForkJoinPool.commonPool-worker-2
執行計算任務,計算 4到 6的和 ,結果是:15 執行此任務的線程:ForkJoinPool.commonPool-worker-1
執行計算任務,計算 7到 9的和 ,結果是:24 執行此任務的線程:ForkJoinPool.commonPool-worker-3
執行計算任務,計算 10到 12的和 ,結果是:33 執行此任務的線程:ForkJoinPool.commonPool-worker-1
78
註意執行任務的線程名稱:commonPool表示執行任務的線程是公用的ForkJoinPooL線程池中的線程,上面的例子中,並沒有創建一個新的ForKJoinPool線程池
3.2 join( ) 等待任務的完成
public final V join() {
int s;
if ((s = doJoin() & DONE_MASK) != NORMAL)
reportException(s);
return getRawResult();//直接返回結果
}
??重點在 dojoin()
方法,下面追蹤下去
private int doJoin() {
int s; Thread t; ForkJoinWorkerThread wt; ForkJoinPool.WorkQueue w;
return
//如果完成,直接返回s
(s = status) < 0 ? s :
//沒有完成,判斷是不是池中的 ForkJoinWorkerThread 工作線程
((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ?
//如果是池中線程,執行這裏
(w = (wt = (ForkJoinWorkerThread)t).workQueue).
tryUnpush(this) && (s = doExec()) < 0 ? s :
wt.pool.awaitJoin(w, this, 0L) :
//如果不是池中的線程池,則執行這裏
externalAwaitDone();
}
??仔細看上面的註釋。當 dojoin( )
方法發現任務沒有完成且當前線程是池中線程時,執行了 tryUnpush( )
方法。tryUnpush()
方法嘗試去執行此任務:如果要join的任務正好在當前任務隊列的頂端,那麽pop出這個任務,然後調用 doExec() 讓當前線程去執行這個任務。
final boolean tryUnpush(ForkJoinTask<?> t) {
ForkJoinTask<?>[] a; int s;
if ((a = array) != null && (s = top) != base &&
U.compareAndSwapObject
(a, (((a.length - 1) & --s) << ASHIFT) + ABASE, t, null)) {
U.putOrderedInt(this, QTOP, s);
return true;
}
return false;
}
final int doExec() {
int s; boolean completed;
if ((s = status) >= 0) {
try {
completed = exec();
} catch (Throwable rex) {
return setExceptionalCompletion(rex);
}
if (completed)
s = setCompletion(NORMAL);
}
return s;
}
??如果任務不是處於隊列的頂端,那麽就會執行 awaitJoin( )
方法。
/**
* Helps and/or blocks until the given task is done or timeout.
*
* @param w caller
* @param task the task
* @param deadline for timed waits, if nonzero
* @return task status on exit
*/
final int awaitJoin(WorkQueue w, ForkJoinTask<?> task, long deadline) {
int s = 0;
if (task != null && w != null) {
ForkJoinTask<?> prevJoin = w.currentJoin;
U.putOrderedObject(w, QCURRENTJOIN, task);
CountedCompleter<?> cc = (task instanceof CountedCompleter) ?
(CountedCompleter<?>)task : null;
for (;;) {
if ((s = task.status) < 0)//如果任務完成了,跳出死循環
break;
if (cc != null)//當前任務是CountedCompleter類型,則嘗試從任務隊列中獲取當前任務的派生子任務來執行;
helpComplete(w, cc, 0);
else if (w.base == w.top || w.tryRemoveAndExec(task))//如果當前線程的內部隊列為空,或者成功完成了任務,幫助某個線程完成任務。
helpStealer(w, task);
if ((s = task.status) < 0)//任務完成,跳出死循環
break;
long ms, ns;
if (deadline == 0L)
ms = 0L;
else if ((ns = deadline - System.nanoTime()) <= 0L)
break;
else if ((ms = TimeUnit.NANOSECONDS.toMillis(ns)) <= 0L)
ms = 1L;
if (tryCompensate(w)) {
task.internalWait(ms);
U.getAndAddLong(this, CTL, AC_UNIT);
}
}
U.putOrderedObject(w, QCURRENTJOIN, prevJoin);
}
return s;
}
重點說一下helpStealer。helpStealer的原則是你幫助我執行任務,我也幫你執行任務。
- 遍歷奇數下標,如果發現隊列對象currentSteal放置的剛好是自己要找的任務,則說明自己的任務被該隊列A的owner線程偷來執行
- 如果隊列A隊列中有任務,則從隊尾(base)取出執行;
- 如果發現隊列A隊列為空,則根據它正在join的任務,在拓撲找到相關的隊列B去偷取任務執行。
在執行的過程中要註意,我們應該完整的把任務完成
還有剩下的幾個比較核心的部分源碼就不再此處分析,提供兩個比較棒的博文:(因為我還有一些疑惑沒解決,以後再補充)
- jdk1.8-ForkJoin框架剖析
- Jdk1.7 JUC源碼增量解析(3)-ForkJoin-非ForkJoin任務的執行過程
最後,有興趣的還可以看一下Doug Lea 的寫的Fork-Join 框架的文章
原文:A Java Fork/Join Framework
中文譯文:Fork/Join 框架-設計與實現
參考文獻:
- PunyGod https://www.jianshu.com/p/f777abb7b251
Fork-Join 原理深入分析(二)