1. 程式人生 > 其它 >12_分支合併框架 Fork/Join

12_分支合併框架 Fork/Join

這裡寫目錄標題

1、簡介

  1. Fork/Join 它可以將一個大的任務拆分成多個子任務進行並行處理,最後將子任務結果合併成最後的計算結果,並進行輸出。
    Fork/Join 框架要完成兩件事情:
    • Fork:把一個複雜任務進行分拆,大事化小
    • Join:把分拆任務的結果進行合併
  2. 任務分割:首先 Fork/Join 框架需要把大的任務分割成足夠小的子任務,如果子任務比較大的話還要對子任務進行繼續分割
    執行任務併合並結果:分割的子任務分別放到雙端佇列裡,然後幾個啟動執行緒分別從雙端佇列裡獲取任務執行。子任務執行完的結果都放在另外一個佇列裡,啟動一個執行緒從佇列裡取資料,然後合併這些資料。
  3. 完成分支合併的具體實現類ForkJoinTask、ForkJoinPool
    • ForkJoinTask: 要使用 Fork/Join 框架,首先需要建立一個 ForkJoin 任務。該類提供了在任務中執行 fork 和 join 的機制。
      通常不需要直接繼承ForkJoinTask 類,只需要繼承它的子類,
      RecursiveAction:用於沒有返回結果的任務、RecursiveTask:用於有返回結果的任務
    • ForkJoinPool: ForkJoinTask 需要通過 ForkJoinPool 來執行
    • RecursiveTask: 繼承後可以實現遞迴(自己調自己)呼叫的任務
  4. Fork/Join 框架的實現原理
    ForkJoinPool 由 ForkJoinTask 陣列和 ForkJoinWorkerThread 陣列組成,
    ForkJoinTask 陣列負責將存放以及將程式提交給 ForkJoinPool,而ForkJoinWorkerThread 負責執行這些任務。

2、Fork/join 方法的實現原理

//此處待定

3、異常處理

ForkJoinTask 在執行的時候可能會丟擲異常,但是我們沒辦法在主執行緒裡直接捕獲異常,所以 ForkJoinTask 提供了 isCompletedAbnormally()方法來檢查任務是否已經丟擲異常或已經被取消了,並且可以通過 ForkJoinTask 的getException 方法獲取異常。

getException 方法返回 Throwable 物件,如果任務被取消了則返回CancellationException。如果任務沒有完成或者沒有丟擲異常則返回 null。

4、入門案例

場景: 生成一個計算任務,計算 1+2+3…+1000,每 100 個數切分一個子任務

class TaskExample extends RecursiveTask<Long> {
    private int start;
    private int end;
    private long sum;
    /**
     * 建構函式
     * @param start
     * @param end
     */
    public TaskExample(int start, int end){
        this.start = start;
        this.end = end;
    }
    /**
     * The main computation performed by this task.
     * @return the result of the computation
     */
    @Override
    protected Long compute() {
        System.out.println("任務" + start + "=========" + end + "累加開始");
        //大於 100 個數相加切分,小於直接加
        if(end - start < 100){
            for (int i = start; i <= end; i++) {
                //累加
                sum += i;
            }
        }else {
            //切分為 2 塊
            int middle = start + 99;
            //遞迴呼叫,切分為 2 個小任務
            TaskExample taskExample1 = new TaskExample(start, middle);
            TaskExample taskExample2 = new TaskExample(middle + 1, end);
            //執行:非同步
            taskExample1.fork();
            taskExample2.fork();
            //同步阻塞獲取執行結果
            sum = taskExample1.join() + taskExample2.join();
        }
        //加完返回
        return sum;
    }
}

/**
 * 分支合併案例
 */
public class ForkJoinPoolDemo {
    /**
     * 生成一個計算任務,計算 1+2+3.........+1000
     * @param args
     */
    public static void main(String[] args) {
        //定義任務
        TaskExample taskExample = new TaskExample(1, 1000);
        //定義執行物件
        ForkJoinPool forkJoinPool = new ForkJoinPool();
        //加入任務執行
        ForkJoinTask<Long> result = forkJoinPool.submit(taskExample);
        //輸出結果
        try {
            System.out.println(result.get());
        }catch (Exception e){
            e.printStackTrace();
        }finally {
            forkJoinPool.shutdown();
        }
    }
}