1. 程式人生 > 程式設計 >淺談ForkJoinPool

淺談ForkJoinPool

ForkJoinPool是什麼?

談到執行緒池,很多人會想到Executors提供的一些預設的執行緒池,比如單執行緒執行緒池SingleThreadExecutor,固定大小的執行緒池FixedThreadPool,但是很少有人會注意到其中還提供了一種特殊的執行緒池:WorkStealingPool,我們點進這個方法,會看到和其他方法不同的是,這種執行緒池並不是通過ThreadPoolExecutor來建立的,而是ForkJoinPool來建立的:

    public static ExecutorService newWorkStealingPool() {
        return new ForkJoinPool
            (Runtime.getRuntime().availableProcessors(),ForkJoinPool.defaultForkJoinWorkerThreadFactory,null,true
); } 複製程式碼

這兩種執行緒池之間並不是繼承關係,而是平級關係:

ThreadPoolExecutor應該都很瞭解了,就是一個基本的儲存執行緒的執行緒池,需要執行任務的時候就從執行緒池中拿一個執行緒來執行。而ForkJoinPool則不僅僅是這麼簡單,同樣也不是ThreadPoolExecutor的代替品,這種執行緒池是為了實現“分治法”這一思想而建立的,通過把大任務拆分成小任務,然後再把小任務的結果彙總起來就是最終的結果,和MapReduce的思想很類似

舉個例子,我們要統計1-100的累加和,如果使用ForkJoinPool來實現的話,就可以將1-100每5位劃分一段,劃分出20段,當作20個任務,每個任務只計算自己區間內的結果,最後將這20個任務的結果彙總起來就是1-100的累加和

ForkJoinPool怎麼使用?

ForkJoinPool的本質就是兩點:

  1. 如果任務很小:直接計算得出結果
  2. 如果任務很大
    • 拆分成N個子任務
    • 呼叫子任務的fork()進行計算
    • 呼叫子任務的join()合併結果

接來下我們來做一個1-100的累加例子:

  1. 首先定義我們需要執行的任務:
class Task extends RecursiveTask<Integer> {

    private int start;

    private int end;
    private int mid;

    public Task(int start,int end) {
        this.start = start;
        this.end = end;
    }

    @Override
    protected Integer compute
() { int sum = 0; if (end - start < 6) { // 當任務很小時,直接進行計算 for (int i = start; i <= end; i++) { sum += i; } System.out.println(Thread.currentThread().getName() + " count sum: " + sum); } else { // 否則,將任務進行拆分 mid = (end - start) / 2 + start; Task left = new Task(start,mid); Task right = new Task(mid + 1,end); // 執行上一步拆分的子任務 left.fork(); right.fork(); // 拿到子任務的執行結果 sum += left.join(); sum += right.join(); } return sum; } } 複製程式碼

這裡的RecursiveTaskForkJoinTask的子類,ForkJoinTask又是Future的子類,不瞭解Future類的可以認為是一個非同步執行,並且可以有返回值的Runnable類

我們首先在Task類中定義了任務需要的一些資料,比如開始位置和結束位置。重點是其中的compute方法,在其中實現了我們剛才說到的步驟,如果任務很小(通過任務資料來判斷),就進行計算,否則將任務拆分,使用fork()執行,並通過join()拿到計算結果

  1. 將任務提交到執行緒池

剛才我們定義了任務類,接下來就需要把這個任務提交到執行緒池:

    public static void main(String[] args) throws ExecutionException,InterruptedException {
        ForkJoinPool forkJoinPool = new ForkJoinPool();

        Task countTask = new Task(1,100);
        ForkJoinTask<Integer> result = forkJoinPool.submit(countTask);

        System.out.println("result: " + result.get());

        forkJoinPool.shutdown();
    }
複製程式碼

注意,這裡ForkJoinPool初始化可以傳入一個並行引數,如果不傳入該引數的話會預設使用處理器個數來作為並行引數

建立任務物件和執行緒池之後,使用submit方法來提交任務,該方法會返回一個ForkJoinTask<T>型別的物件,呼叫其get方法即可拿到執行結果

同時要注意,該執行緒池也需要呼叫shutdown方法來關閉

ForkJoinPool的原理

ForkJoinPool中有三個重要角色:

  • ForkJoinWorkerThread:工作執行緒,在內部對Thread進行的封裝
  • WorkQueue:任務佇列
  • ForkJoinTask:任務,繼承自Future,在含義上分為submission和task兩種

線上程池中,任務佇列使用陣列來儲存,其中儲存了所有提交進來的任務:

  1. 奇數位置儲存submission
  2. 偶數位置儲存task

submission指的是本地提交的任務,如submit、execute提交的任務;而task則是通過fork方法新增的子任務。這兩種任務僅僅在含義上有所區別,所以一同儲存在任務佇列中,通過位置進行區分

ForkJoinPool的核心

想理解ForkJoinPool的原理,就要理解其核心,一共有兩點,其一是分治法,其二就是工作竊取演演算法。分治法相信就不用多說了,就是通過把大任務拆分成小任務來提高併發度。重點要說的就是工作竊取演演算法,該演演算法的原理:

所有執行緒均嘗試找到並執行已提交的任務,或是通過其他任務建立的子任務

依賴於這種特性,來儘量避免一個執行緒執行完自己的任務後“無所事事”的情況。同時,竊取順序是FIFO的