Fork/Join框架
Fork/Join框架介紹
Fork/Join框架是Java 7提供的一個用於並行執行任務的框架,是一個把大任務分割成若干個小任務,最終彙總每個小任務 結 果後得到大任務結果的框架。Fork/Join框架要完成兩件事情:
1.任務分割:首先Fork/Join框架需要把大的任務分割成足夠小的子任務,如果子任務比較大的話還要對子任務進行繼續分割
2.執行任務併合並結果:分割的子任務分別放到雙端佇列裡,然後幾個啟動執行緒分別從雙端佇列裡獲取任務執行。子任務執 行完的結果都放在另外一個佇列裡,啟動一個執行緒從佇列裡取資料,然後合併這些資料。
工作竊取
演算法是指某個執行緒從其他佇列裡竊取任務來執行。
為什麼需要使用工作竊取演算法呢?
假如我們需要做一個比較大的任務,我們可以把這個任務分割為若干互不依賴的子任務,為了減少執行緒間的競爭,於是 把 這 些子任務分別放到不同的佇列裡,併為每個佇列建立一個單獨的執行緒來執行佇列裡的任務,執行緒和佇列一一對應, 比如A執行緒負責處理A佇列裡的任務。
工作竊取演算法的優點:充分利用執行緒進行平行計算,減少了執行緒間時竟爭。
工作竊取演算法的缺點:在某些情況下還是存在競爭,比如雙端佇列裡只有一個任務時。
Fork/Join框架設計
1分割任務
2執行任務併合並結果
API介紹
ForkJoinPool
由ForkJoinTask陣列和ForkJoinWorkerThread陣列組成,ForkJoinTask陣列負責將存放程式提交給ForkJoinPool,而 ForkJoinWorkerThread負責執行這些任務。
有兩種構造器方式可以獲取ForkJoinPool
的例項,第一種使用構造器建立:
- ForkJoinPool(): 使用預設的構造器建立例項,該構造器創建出的池與系統中可用的處理器數量相等。
- ForkJoinPool(int parallelism):該構造器指定處理器數量,建立具有自定義並行度級別的池,該級別的並行度必須大於0,且不超過可用處理器的實際數量。
並行性的級別決定了可以併發執行的執行緒的數量。換句話說,它決定了可以同時執行的任務的數量——但不能超過處理器的數 量。
但是,這並不限制池可以管理的任務的數量。ForkJoinPool可以管理比其並行級別多得多的任務。
ForkJoinTask
ForkJoinTask代表執行在ForkJoinPool中的任務。
主要方法:
- fork() 在當前執行緒執行的執行緒池中安排一個非同步執行。簡單的理解就是再建立一個子任務。
- join() 當任務完成的時候返回計算結果,Join主要作用是阻塞當前執行緒並等待獲取結果。
- invoke() 開始執行任務,如果必要,等待計算完成。
子類:
- RecursiveAction 一個遞迴無結果的ForkJoinTask(沒有返回值)
- RecursiveTask 一個遞迴有結果的ForkJoinTask(有返回值)
例項:
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.Future;
import java.util.concurrent.RecursiveTask;
public class MyForkJoin extends RecursiveTask<Integer>
{
private final int threshold=3;
private int beg;
private int end;
public MyForkJoin(int beg, int end) {
this.beg=beg;
this.end=end;
}
public static void main(String[] arg)
{
ForkJoinPool forkJoinPool=new ForkJoinPool();
//生成一個計算任務
MyForkJoin forjoin=new MyForkJoin(1,3);
//執行一個任務
Future<Integer> result=forkJoinPool.submit(forjoin);
try {
System.out.println(result.get());
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
@Override
protected Integer compute() {
int sum=0;
if(end-beg>1)
{
int middle = (beg + end) / 2;
MyForkJoin Task1 = new MyForkJoin(beg, middle);
MyForkJoin Task2 = new MyForkJoin(middle + 1, end);
// 執行子任務
Task1.fork();
Task2.fork();
// 等待任務執行結束合併其結果
int Result1 = Task1.join();
int Result2 = Task2.join();
// 合併子任務
sum = Result1 + Result1;
}
else
{
for (int i = beg; i <= end; i++) {
sum += i;
}
}
return sum;
}
}
結果:6
Fork/Join框架的異常處理
ForkJoinTask在執行的時候可能會丟擲異常,但是我們沒辦法在主執行緒裡直接捕獲異常,所以ForkJoinTask提供了isCompletedAbnormally()方法來檢查任務是否已經丟擲異常或已經被取消了,並且可以通過ForkJoinTask的getException方法獲取異常