淺談Fork/Join框架
前言:本文基於jdk1.7,jdk1.8與jdk1.7還是有些差別。
一、什麼是Fork/Join框架
Fork/Join框架是Java 7提供的一個用於並行執行任務的框架,是一個把大任務分割成若干個小任務,最終彙總每個小任務結果後得到大任務結果的框架。簡單來說,Fork就是把一個大任務切分為若干子任務並行的執行,Join就是合併這些子任務的執行結果,最後得到這個大任務的結果。比如計算1+2+…+10000,可以分割成10個子任務,每個子任務分別對1000個數進行求和,最終彙總這10個子任務的結果。
這裡要介紹一下工作竊取演算法:指某個執行緒從其他佇列裡竊取任務來執行。簡單說就是,將一個大任務分割為若干個互不依賴的子任務,這些子任務放到不同的佇列裡,併為每個佇列建立一個單獨的執行緒來執行佇列裡的任務,執行緒和佇列一一對應。當某個執行緒完成了自己佇列的任務之後,從其他未完成任務的執行緒的佇列中竊取任務來執行。但是,這樣子的話這些執行緒之間會存在競爭關係,所以一般會採取雙端佇列,被竊取任務執行緒永遠從雙端佇列的頭部拿任務執行,而竊取任務的執行緒永遠從雙端佇列的尾部拿任務執行。
二、Fork/Join框架的工作原理
從上面Fork/Join框架的介紹中瞭解到,Fork/Join框架主要分為幾個步驟實現。
1、分割任務。首先我們需要有一個fork類來把大任務分割成子任務,有可能子任務還是很大,所以還需要不停地分割,直到分割出的子任務足夠小。
2、執行任務併合並結果。分割的子任務分別放在雙端佇列裡,然後幾個啟動執行緒分別從雙端佇列裡獲取任務執行。子任務執行完的結果都統一放在一個佇列裡,啟動一個執行緒從佇列裡拿資料,然後合併這些資料。
Fork/Join框架通過ForkJoinTask和ForkJoinPool來實現分割任務和執行任務併合並結果。
①ForkJoinTask:我們要使用ForkJoin框架,必須首先建立一個ForkJoin任務。它提供在任務中執行fork()和join()操作的機制。通常情況下,我們不需要直接繼承ForkJoinTask類,只需要繼承它的子類
·RecursiveAction:用於沒有返回結果的任務,即執行任務後不會返回結果;
·RecursiveTask:用於有返回結果的任務,即執行任務後會返回結果。
②ForkJoinPool:ForkJoinTask需要通過ForkJoinPool來執行。任務分割出的子任務會新增到當前工作執行緒所維護的雙端佇列中,進入佇列的頭部。當一個工作執行緒的佇列裡暫時沒有任務時,它會隨機從其他工作執行緒的佇列的尾部獲取一個任務。
接下來我們通過一個例子來理解Fork/Join框架是如果工作的:計算1+2+3+4的結果
首先,我們需要分割任務,假如我們希望每個子任務最多執行兩個數的相加,那麼我們設定分割的閾值是2,由於是4個數字相加,所以Fork/Join框架會把這個任務fork成兩個子任務,子任務一負責計算1+2,子任務二負責計算3+4,然後再join兩個子任務
的結果。因為是有結果的任務,所以必須繼承RecursiveTask,實現程式碼如下:
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.Future;
import java.util.concurrent.RecursiveTask;
public class CountTask extends RecursiveTask<Integer> {
// 閾值,表示拆分為幾個子任務
private static final int THRESHOLD = 2;
private int start;
private int end;
public CountTask(int start, int end) {
this.start = start;
this.end = end;
}
@Override
protected Integer compute() {
int sum = 0;
// 如果任務小於等於閾值則計算任務
Boolean canCompute = (end - start) <= THRESHOLD;
if (canCompute) {
for (int i = start; i <= end; i++) {
sum += i;
}
} else {
// 如果任務大於閾值,就分裂成兩個子任務計算
int middle = (start + end) / 2;
CountTask leftTask = new CountTask(start, middle);
CountTask rightTask = new CountTask(middle + 1, end);
// 執行子任務
leftTask.fork();
rightTask.fork();
// 等待子任務執行完,並得到其結果
int leftResult=leftTask.join();
int rightResult=rightTask.join();
// 合併子任務
sum = leftResult + rightResult;
}
r
eturn sum;
}
public static void main(String[] args) {
ForkJoinPool forkJoinPool = new ForkJoinPool();
// 生成一個計算任務,負責計算1+2+3+4
CountTask task = new CountTask(1, 4);
// 執行一個任務
Future<Integer> result = forkJoinPool.submit(task);
try {
System.out.println(result.get());
}
catch (InterruptedException e) {
}
catch (ExecutionException e) {
}
}
}
從上面的程式碼可以看到, ForkJoinTask的子類需要實現compute方法,在這個方法裡,首先需要判斷任務是否足夠小,如果足夠小就直接執行任務。如果不足夠小,就必須分割成兩個子任務,每個子任務在呼叫fork方法時,又會進入compute方法,看看當前子任務是否需要繼續分割成子任務,如果不需要繼續分割,則執行當前子任務並返回結果。使用join方法會等待子任務執行完並得到其結果。
此外,ForkJoinTask提供了isCompletedAbnormally()方法來檢查任務是否已經丟擲異常或已經被取消了,並且可以通過ForkJoinTask的getException方法獲取異常。
if(task.isCompletedAbnormally())
{
System.out.println(task.getException());
}
getException方法返回Throwable物件,如果任務被取消了則返回CancellationException。如果任務沒有完成或者沒有丟擲異常則返回null。
三、Fork/Join框架的實現原理
ForkJoinPool由ForkJoinTask陣列和ForkJoinWorkerThread陣列組成,ForkJoinTask陣列負責將存放程式提交給ForkJoinPool的任務,而ForkJoinWorkerThread陣列負責執行這些任務。
(1)ForkJoinTask的fork方法實現原理
當我們呼叫ForkJoinTask的fork方法時,程式會呼叫ForkJoinWorkerThread的pushTask方法非同步地執行這個任務,然後立即返回結果。
public final ForkJoinTask<V> fork() {
((ForkJoinWorkerThread) Thread.currentThread()).pushTask(this);
return this;
}
pushTask方法把當前任務存放在ForkJoinTask陣列佇列裡。然後再呼叫ForkJoinPool的signalWork()方法喚醒或建立一個工作執行緒來執行任務。
final void pushTask(ForkJoinTask<> t) {
ForkJoinTask<>[] q;
int s, m;
if ((q = queue) != null) {
// ignore if queue removed
long u = (((s = queueTop) & (m = q.length - 1)) << ASHIFT) + ABASE;
UNSAFE.putOrderedObject(q, u, t);
queueTop = s + 1;
// or use putOrderedInt
if ((s -= queueBase) <= 2)
pool.signalWork(); else if (s == m)
growQueue();
}
}
(2)ForkJoinTask的join方法實現原理
Join方法的主要作用是阻塞當前執行緒並等待獲取結果。
public final V join() {
if (doJoin() != NORMAL)
return reportResult();
else
return getRawResult();
}
private V reportResult() {
int s;
Throwable ex;
if ((s = status) == CANCELLED)
throw new CancellationException();
if (s == EXCEPTIONAL && (ex = getThrowableException()) != null)
UNSAFE.throwException(ex);
return getRawResult();
}
首先,它呼叫了doJoin()方法,通過doJoin()方法得到當前任務的狀態來判斷返回什麼結果,任務狀態有4種:已完成(NORMAL)、被取消(CANCELLED)、訊號(SIGNAL)和出現異常(EXCEPTIONAL)。
·如果任務狀態是已完成,則直接返回任務結果。
·如果任務狀態是被取消,則直接丟擲CancellationException。
·如果任務狀態是丟擲異常,則直接丟擲對應的異常。
讓我們再來分析一下doJoin()方法的實現程式碼
private int doJoin() {
Thread t;
ForkJoinWorkerThread w;
int s;
Boolean completed;
if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) {
if ((s = status) < 0)
return s;
if ((w = (ForkJoinWorkerThread)t).unpushTask(this)) {
try {
completed = exec();
}
catch (Throwable rex) {
return setExceptionalCompletion(rex);
}
if (completed)
return setCompletion(NORMAL);
}
return w.joinTask(this);
} else
return externalAwaitDone();
}
在doJoin()方法裡,首先通過檢視任務的狀態,看任務是否已經執行完成,如果執行完成,則直接返回任務狀態;如果沒有執行完,則從任務數組裡取出任務並執行。如果任務順利執行完成,則設定任務狀態為NORMAL,如果出現異常,則記錄異常,並將任務狀態設定為EXCEPTIONAL。
《Java併發程式設計的藝術》