Java JUC併發之ForkJoin入門
阿新 • • 發佈:2021-07-15
十四、ForkJoin
什麼是fork join
JDK1.7開始
並行執行任務!提高效率 => 針對大資料量
大資料 => MapReduce (把把任務拆分成小任務)
ForkJoin 特點 : 工作竊取
已經執行完任務的執行緒會將尚未執行完的執行緒中的任務竊取過來,避免執行緒等待造成資源浪費 => 提高效率
這裡維護的都是雙端佇列 Deque 【佇列兩端都可以存取】
ForkJoin測試
package com.liu.forkjoin; import java.util.concurrent.RecursiveTask; /** * 求和計算的任務 * 3000=>for 6000 => forkjoin 9000 => Stream並行流 */ public class ForkJoinDemo extends RecursiveTask<Long> { private Long start; private Long end; private Long temp = 10000L; public ForkJoinDemo(Long start,Long end) { this.start = start; this.end = end; } public static void main(String[] args) { int sum = 0; for (int i = 1; i <= 10_0000_0000; i++) { sum += i; } System.out.println(sum); } @Override protected Long compute() { if ((end-start) < temp){ // 一般的求和 Long sum = 0L; for (Long i = start; i <= end ; i++) { sum += i; } return sum; }else{ // 分支合併計算 Long middle = (start + end)/2; // 中間值 ForkJoinDemo task1 = new ForkJoinDemo(start, middle); task1.fork(); // 拆分任務,把任務壓入執行緒 ForkJoinDemo task2 = new ForkJoinDemo(middle + 1, end); task2.fork(); return task1.join() + task2.join(); } } }
測試類 Test
package com.liu.forkjoin; import java.util.concurrent.ExecutionException; import java.util.concurrent.ForkJoinPool; import java.util.concurrent.ForkJoinTask; import java.util.stream.DoubleStream; import java.util.stream.IntStream; import java.util.stream.LongStream; // 測試類 求 0-10_0000_0000的和 public class Test { //同一個任務 效率高几十倍!! public static void main(String[] args) throws ExecutionException, InterruptedException { // test1(); // 6303 // test2(); // 5846 test3(); // 150 ??? } // 普通程式設計師 public static void test1(){ Long sum = 0L; Long start = System.currentTimeMillis(); // 記錄任務開始時間 for (Long i = 1L; i <= 10_0000_0000; i++) { sum += i; } Long end = System.currentTimeMillis(); // 記錄任務完成時間 System.out.println("sum = " + sum + " 時間 :" + (end -start)); } // ForkJoin public static void test2() throws ExecutionException, InterruptedException { Long start = System.currentTimeMillis(); ForkJoinPool forkJoinPool = new ForkJoinPool(); ForkJoinTask<Long> task = new ForkJoinDemo(0L, 10_0000_0000L); ForkJoinTask<Long> submit = forkJoinPool.submit(task);// 提交任務 Long sum = submit.get(); Long end = System.currentTimeMillis(); System.out.println("sum = " + sum + " 時間 :" + (end -start)); } // Stream並行流 呼叫API public static void test3(){ Long start = System.currentTimeMillis(); // Stream並行流 // DoubleStream // IntStream.rangeClosed() Long sum = LongStream.rangeClosed(0L, 10_0000_0000L).parallel().reduce(0,Long::sum); // sum是一個求和函式 返回一個a+b的和 Long end = System.currentTimeMillis(); System.out.println("sum = " + sum + " 時間 :" + (end -start)); } }
本質 :(RecursiveTask) 遞迴 返回一個ForkJoinTask
本文來自部落格園,作者:{夕立君},轉載請註明原文連結:https://www.cnblogs.com/liuzhhhao/p/15016660.html