1. 程式人生 > 實用技巧 >Fork/Join框架

Fork/Join框架

Fork/Join框架介紹

Fork/Join框架是Java 7提供的一個用於並行執行任務的框架,是一個把大任務分割成若干個小任務,最終彙總每個小任務 結 果後得到大任務結果的框架。Fork/Join框架要完成兩件事情:

  1.任務分割:首先Fork/Join框架需要把大的任務分割成足夠小的子任務,如果子任務比較大的話還要對子任務進行繼續分割

  2.執行任務併合並結果:分割的子任務分別放到雙端佇列裡,然後幾個啟動執行緒分別從雙端佇列裡獲取任務執行。子任務執 行完的結果都放在另外一個佇列裡,啟動一個執行緒從佇列裡取資料,然後合併這些資料。

工作竊取

演算法是指某個執行緒從其他佇列裡竊取任務來執行。

為什麼需要使用工作竊取演算法呢?

假如我們需要做一個比較大的任務,我們可以把這個任務分割為若干互不依賴的子任務,為了減少執行緒間的競爭,於是 把 這 些子任務分別放到不同的佇列裡,併為每個佇列建立一個單獨的執行緒來執行佇列裡的任務,執行緒和佇列一一對應, 比如A執行緒負責處理A佇列裡的任務。

工作竊取演算法的優點:充分利用執行緒進行平行計算,減少了執行緒間時竟爭。

工作竊取演算法的缺點:在某些情況下還是存在競爭,比如雙端佇列裡只有一個任務時。

Fork/Join框架設計

1分割任務

2執行任務併合並結果

API介紹

ForkJoinPool

由ForkJoinTask陣列和ForkJoinWorkerThread陣列組成,ForkJoinTask陣列負責將存放程式提交給ForkJoinPool,而 ForkJoinWorkerThread負責執行這些任務。

有兩種構造器方式可以獲取ForkJoinPool的例項,第一種使用構造器建立:

  • ForkJoinPool(): 使用預設的構造器建立例項,該構造器創建出的池與系統中可用的處理器數量相等。
  • ForkJoinPool(int parallelism):該構造器指定處理器數量,建立具有自定義並行度級別的池,該級別的並行度必須大於0,且不超過可用處理器的實際數量。

並行性的級別決定了可以併發執行的執行緒的數量。換句話說,它決定了可以同時執行的任務的數量——但不能超過處理器的數 量。

但是,這並不限制池可以管理的任務的數量。ForkJoinPool可以管理比其並行級別多得多的任務。

ForkJoinTask

ForkJoinTask代表執行在ForkJoinPool中的任務。

主要方法:

  • fork() 在當前執行緒執行的執行緒池中安排一個非同步執行。簡單的理解就是再建立一個子任務。
  • join() 當任務完成的時候返回計算結果,Join主要作用是阻塞當前執行緒並等待獲取結果。
  • invoke() 開始執行任務,如果必要,等待計算完成。

子類:

  • RecursiveAction 一個遞迴無結果的ForkJoinTask(沒有返回值)
  • RecursiveTask 一個遞迴有結果的ForkJoinTask(有返回值)

例項:

import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.Future;
import java.util.concurrent.RecursiveTask;

public class MyForkJoin extends  RecursiveTask<Integer>
{

    private  final  int threshold=3;
    private int  beg;
    private int end;

    public MyForkJoin(int beg, int end) {

        this.beg=beg;
        this.end=end;

    }

    public static void main(String[] arg)
    {
        ForkJoinPool forkJoinPool=new ForkJoinPool();
        //生成一個計算任務

        MyForkJoin forjoin=new MyForkJoin(1,3);
        //執行一個任務
        Future<Integer> result=forkJoinPool.submit(forjoin);

        try {
            System.out.println(result.get());
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }

    }


    @Override
    protected Integer compute() {

        int sum=0;
        if(end-beg>1)
        {

            int middle = (beg + end) / 2;
            MyForkJoin Task1 = new MyForkJoin(beg, middle);
            MyForkJoin Task2 = new MyForkJoin(middle + 1, end);

            // 執行子任務
            Task1.fork();
            Task2.fork();

            // 等待任務執行結束合併其結果
            int Result1 = Task1.join();
            int Result2 = Task2.join();

            // 合併子任務
            sum = Result1 + Result1;

        }
        else
        {
            for (int i = beg; i <= end; i++) {
                sum += i;
            }


        }

        return sum;

    }
}

結果:6

Fork/Join框架的異常處理

  ForkJoinTask在執行的時候可能會丟擲異常,但是我們沒辦法在主執行緒裡直接捕獲異常,所以ForkJoinTask提供了isCompletedAbnormally()方法來檢查任務是否已經丟擲異常或已經被取消了,並且可以通過ForkJoinTask的getException方法獲取異常