ForkJoin、並行流計算、序列流計算對比
阿新 • • 發佈:2021-01-20
ForkJoin
什麼是 ForkJoin
ForkJoin 是一個把大任務拆分為多個小任務來分別計算的平行計算框架
ForkJoin 特點:工作竊取
這裡面維護的都是雙端佇列,因此但其中一個執行緒完成自己的計算任務之後,可以從其他執行緒任務佇列另一端“竊取”任務進行計算,從而提高計算效率!
ForkJoin 執行流程
虛擬碼:
if(任務數小){
直接計算
}else{
將問題劃分為獨立的部分
分叉新的子任務來解決每個部分
加入所有子任務進行計算
將子結果進行合併
}
ForkJoinPool: 核心
ForkJoinTask:
RecursiveTask: 遞迴任務
package juc.forkJoin; import java.util.concurrent.RecursiveTask; /* 求和計算的任務! 普通求和 ForkJoin Stream並行流 如何使用ForkJoin 1、ForkJoinPool 通過它來執行 2、計算任務 ForkJoinPool.execute(ForkJoinTask task) 3、ForkJoinTask 是一個介面,execute方法傳入引數應為 ForkJoinTask 的子類 如 RecursiveTask */ public class ForkJoinDemo extends RecursiveTask<Long> { private Long start;//開始值 private Long end;//結束值 private Long temp=10000L;//閾值,用於區分是否用ForkJoin來進行劃分 public ForkJoinDemo(Long start,Long end){ this.start=start; this.end=end; } @Override protected Long compute() { if ((end-start)<=temp){//小於等於閾值,則直接進行計算 Long sum=0L; for (Long i = start; i <= end; i++) { sum+=i; } return sum; }else {//大於閾值使用ForkJoin進行劃分 //任務拆分點 Long middle=(start+end)/2; ForkJoinDemo task1 = new ForkJoinDemo(start, middle); task1.fork(); ForkJoinDemo task2 = new ForkJoinDemo(middle + 1, end); task2.fork(); return task1.join()+task2.join(); } } }
測試類:
package juc.forkJoin; import java.util.OptionalLong; import java.util.concurrent.ExecutionException; import java.util.concurrent.ForkJoinPool; import java.util.concurrent.ForkJoinTask; import java.util.stream.LongStream; public class Test { public static void main(String[] args) throws ExecutionException, InterruptedException { //計算結果:500000000500000000 //test1(); test2(); test3(); } //普通方法 public static void test1(){ Long startTime=System.currentTimeMillis(); Long sum=0L; for (Long i = 1L; i <= 10_0000_0000L; i++) { sum+=i; } Long endTime=System.currentTimeMillis(); System.out.println("計算結果:"+sum); System.out.println("普通方法耗時:"+(endTime-startTime)); } //ForkJoin方法 public static void test2() throws ExecutionException, InterruptedException { Long startTime=System.currentTimeMillis(); ForkJoinPool pool = new ForkJoinPool(); ForkJoinTask<Long> task = new ForkJoinDemo(1L, 10_0000_0000L); ForkJoinTask<Long> submit = pool.submit(task); Long result = submit.get(); Long endTime=System.currentTimeMillis(); System.out.println("計算結果:"+result); System.out.println("ForkJoin方法耗時:"+(endTime-startTime)); } //Stream並行流方法 public static void test3(){ Long startTime=System.currentTimeMillis(); //Stream並行流 parallel()並行流 sequential()序列流 OptionalLong reduce = LongStream.rangeClosed(1L, 10_0000_0000L).parallel().reduce(Long::sum); Long result = reduce.getAsLong(); Long endTime=System.currentTimeMillis(); System.out.println("計算結果:"+result); System.out.println("Stream並行流方法耗時:"+(endTime-startTime)); } }
執行結果: