1. 程式人生 > >J.U.C之ForkJoin

J.U.C之ForkJoin

本章簡說一下ForkJoin

ForkJoin是Java7提供的原生多執行緒並行處理框架,其基本思想是將大人物分割成小任務,最後將小任務聚合起來得到結果。Fork/Join採用“工作竊取模式”,當執行新的任務時他可以將其拆分成更小的任務執行,並將小任務加到執行緒佇列中,然後再從一個隨即執行緒中偷一個並把它加入自己的佇列中 Fork就是把一個大任務切分為若干子任務並行的執行,Join就是合併這些子任務的執行結果,最後得到這個大任務的結果。比如計算1+2+。。+10000,可以分割成10個子任務,每個子任務分別對1000個數進行求和,最終彙總這10個子任務的結果 在這裡插入圖片描述 我們要使用ForkJoin框架,必須首先建立一個ForkJoin任務。它提供在任務中執行fork()和join的操作機制,通常我們不直接繼承ForkjoinTask類,只需要直接繼承其子類。

  1. 繼承RecursiveAction,用於沒有返回結果的任務;
  2. 繼承RecursiveTask,用於有返回值的任務。

ForkJoinPool:task要通過ForkJoinPool來執行,分割的子任務也會新增到當前工作執行緒的雙端佇列中,進入佇列的頭部。當一個工作執行緒中沒有任務時,會從其他工作執行緒的佇列尾部獲取一個任務。

下面看一個累加求和的例子:

import java.util.concurrent.*;

public class ForkJoinTest extends RecursiveTask<Integer> {

    private static final int thresheld = 2;

    private int start;

    private int end;

    public ForkJoinTest(int start, int end){
        this.start = start;
        this.end = end;
    }

    @Override
    protected Integer compute() {
        int sum = 0;

        boolean flag = (end - start) <= thresheld;
        //如果未執行的任務數量小於等於了執行緒數量,則表示未完成的任務很少了,可以直接進行計算
        if(flag){
            for(int i = start; i <= end; i++){
                sum += i;
            }
        }else{          //如果未執行的任務數量大於等於了執行緒數量,則表示未完成的任務還有很多,使用ForkJoin
            int middle = (start + end) / 2;     //平均分配任務數量
            ForkJoinTest forkJoinTest1 = new ForkJoinTest(start, middle);       //執行前半部分的任務
            ForkJoinTest forkJoinTest2 = new ForkJoinTest(middle + 1, end);       //執行後半部分的任務

            //開始執行子任務
            forkJoinTest1.fork();
            forkJoinTest2.fork();

            //得到結果
            int res1 = forkJoinTest1.join();
            int res2 = forkJoinTest2.join();

            sum = res1 + res2;
        }

        return sum;
    }

    public static void main(String[] args) {
        ForkJoinPool forkJoinPool = new ForkJoinPool();

        ForkJoinTest task = new ForkJoinTest(1, 1000);//計算1-100的和

        Future<Integer> result = forkJoinPool.submit(task); //執行這個任務

        try {
            System.out.printf(result.get().toString());
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }
    }

}