12_分支合併框架 Fork/Join
阿新 • • 發佈:2022-11-30
這裡寫目錄標題
1、簡介
- Fork/Join 它可以將一個大的任務拆分成多個子任務進行並行處理,最後將子任務結果合併成最後的計算結果,並進行輸出。
Fork/Join 框架要完成兩件事情:- Fork:把一個複雜任務進行分拆,大事化小
- Join:把分拆任務的結果進行合併
- 任務分割:首先 Fork/Join 框架需要把大的任務分割成足夠小的子任務,如果子任務比較大的話還要對子任務進行繼續分割
執行任務併合並結果:分割的子任務分別放到雙端佇列裡,然後幾個啟動執行緒分別從雙端佇列裡獲取任務執行。子任務執行完的結果都放在另外一個佇列裡,啟動一個執行緒從佇列裡取資料,然後合併這些資料。 - 完成分支合併的具體實現類ForkJoinTask、ForkJoinPool
- ForkJoinTask: 要使用 Fork/Join 框架,首先需要建立一個 ForkJoin 任務。該類提供了在任務中執行 fork 和 join 的機制。
通常不需要直接繼承ForkJoinTask 類,只需要繼承它的子類,
RecursiveAction:用於沒有返回結果的任務、RecursiveTask:用於有返回結果的任務 - ForkJoinPool: ForkJoinTask 需要通過 ForkJoinPool 來執行
- RecursiveTask: 繼承後可以實現遞迴(自己調自己)呼叫的任務
- ForkJoinTask: 要使用 Fork/Join 框架,首先需要建立一個 ForkJoin 任務。該類提供了在任務中執行 fork 和 join 的機制。
- Fork/Join 框架的實現原理
ForkJoinTask 陣列負責將存放以及將程式提交給 ForkJoinPool,而ForkJoinWorkerThread 負責執行這些任務。
2、Fork/join 方法的實現原理
//此處待定
3、異常處理
ForkJoinTask 在執行的時候可能會丟擲異常,但是我們沒辦法在主執行緒裡直接捕獲異常,所以 ForkJoinTask 提供了 isCompletedAbnormally()方法來檢查任務是否已經丟擲異常或已經被取消了,並且可以通過 ForkJoinTask 的getException 方法獲取異常。
getException 方法返回 Throwable 物件,如果任務被取消了則返回CancellationException。如果任務沒有完成或者沒有丟擲異常則返回 null。
4、入門案例
場景: 生成一個計算任務,計算 1+2+3…+1000,每 100 個數切分一個子任務
class TaskExample extends RecursiveTask<Long> {
private int start;
private int end;
private long sum;
/**
* 建構函式
* @param start
* @param end
*/
public TaskExample(int start, int end){
this.start = start;
this.end = end;
}
/**
* The main computation performed by this task.
* @return the result of the computation
*/
@Override
protected Long compute() {
System.out.println("任務" + start + "=========" + end + "累加開始");
//大於 100 個數相加切分,小於直接加
if(end - start < 100){
for (int i = start; i <= end; i++) {
//累加
sum += i;
}
}else {
//切分為 2 塊
int middle = start + 99;
//遞迴呼叫,切分為 2 個小任務
TaskExample taskExample1 = new TaskExample(start, middle);
TaskExample taskExample2 = new TaskExample(middle + 1, end);
//執行:非同步
taskExample1.fork();
taskExample2.fork();
//同步阻塞獲取執行結果
sum = taskExample1.join() + taskExample2.join();
}
//加完返回
return sum;
}
}
/**
* 分支合併案例
*/
public class ForkJoinPoolDemo {
/**
* 生成一個計算任務,計算 1+2+3.........+1000
* @param args
*/
public static void main(String[] args) {
//定義任務
TaskExample taskExample = new TaskExample(1, 1000);
//定義執行物件
ForkJoinPool forkJoinPool = new ForkJoinPool();
//加入任務執行
ForkJoinTask<Long> result = forkJoinPool.submit(taskExample);
//輸出結果
try {
System.out.println(result.get());
}catch (Exception e){
e.printStackTrace();
}finally {
forkJoinPool.shutdown();
}
}
}