1. 程式人生 > 實用技巧 >ForkJoin、並行流計算、序列流計算對比

ForkJoin、並行流計算、序列流計算對比

ForkJoin


什麼是 ForkJoin

ForkJoin 是一個把大任務拆分為多個小任務來分別計算的平行計算框架

ForkJoin 特點:工作竊取

這裡面維護的都是雙端佇列,因此但其中一個執行緒完成自己的計算任務之後,可以從其他執行緒任務佇列另一端“竊取”任務進行計算,從而提高計算效率!

ForkJoin 執行流程

虛擬碼:

if(任務數小){
	直接計算
}else{
	將問題劃分為獨立的部分
	分叉新的子任務來解決每個部分
	加入所有子任務進行計算
	將子結果進行合併
}

ForkJoinPool: 核心

ForkJoinTask:

RecursiveTask:

遞迴任務


package juc.forkJoin;

import java.util.concurrent.RecursiveTask;

/*
求和計算的任務!
普通求和    ForkJoin    Stream並行流
如何使用ForkJoin
    1、ForkJoinPool 通過它來執行
    2、計算任務 ForkJoinPool.execute(ForkJoinTask task)
    3、ForkJoinTask 是一個介面,execute方法傳入引數應為 ForkJoinTask 的子類 如 RecursiveTask

 */
public class ForkJoinDemo extends RecursiveTask<Long> {
    private Long start;//開始值
    private Long end;//結束值
    private Long temp=10000L;//閾值,用於區分是否用ForkJoin來進行劃分

    public ForkJoinDemo(Long start,Long end){
        this.start=start;
        this.end=end;
    }

    @Override
    protected Long compute() {
        if ((end-start)<=temp){//小於等於閾值,則直接進行計算
            Long sum=0L;
            for (Long i = start; i <= end; i++) {
                sum+=i;
            }
            return sum;
        }else {//大於閾值使用ForkJoin進行劃分
            //任務拆分點
            Long middle=(start+end)/2;
            ForkJoinDemo task1 = new ForkJoinDemo(start, middle);
            task1.fork();
            ForkJoinDemo task2 = new ForkJoinDemo(middle + 1, end);
            task2.fork();

            return task1.join()+task2.join();

        }
    }
}

測試類:

package juc.forkJoin;

import java.util.OptionalLong;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
import java.util.stream.LongStream;

public class Test {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        //計算結果:500000000500000000
        //test1();
        test2();
        test3();
    }
    //普通方法
    public static void test1(){
        Long startTime=System.currentTimeMillis();
        Long sum=0L;
        for (Long i = 1L; i <= 10_0000_0000L; i++) {
            sum+=i;
        }
        Long endTime=System.currentTimeMillis();
        System.out.println("計算結果:"+sum);
        System.out.println("普通方法耗時:"+(endTime-startTime));
    }
    //ForkJoin方法
    public static void test2() throws ExecutionException, InterruptedException {
        Long startTime=System.currentTimeMillis();

        ForkJoinPool pool = new ForkJoinPool();
        ForkJoinTask<Long> task = new ForkJoinDemo(1L, 10_0000_0000L);
        ForkJoinTask<Long> submit = pool.submit(task);
        Long result = submit.get();

        Long endTime=System.currentTimeMillis();
        System.out.println("計算結果:"+result);
        System.out.println("ForkJoin方法耗時:"+(endTime-startTime));
    }
    //Stream並行流方法
    public static void test3(){
        Long startTime=System.currentTimeMillis();

        //Stream並行流       parallel()並行流     sequential()序列流
        OptionalLong reduce = LongStream.rangeClosed(1L, 10_0000_0000L).parallel().reduce(Long::sum);
        Long result = reduce.getAsLong();

        Long endTime=System.currentTimeMillis();
        System.out.println("計算結果:"+result);
        System.out.println("Stream並行流方法耗時:"+(endTime-startTime));
    }
}

執行結果: