Jdk8--新特性--串並行流與ForkJoin框架
阿新 • • 發佈:2019-02-08
並行流就是把一個內容分成多個數據塊,並用不同的執行緒分別處理每個資料塊的流。穿行流則相反,並行流的底層其實就是ForkJoin框架的一個實現。
那麼先了解一下ForkJoin框架吧。
Fork/Join框架:在必要的情況下,將一個大任務,進行拆分(fork) 成若干個子任務(拆到不能再拆,這裡就是指我們制定的拆分的臨界值),再將一個個小任務的結果進行join彙總。
Fork/Join與傳統執行緒池的區別!
Fork/Join採用“工作竊取模式”,當執行新的任務時他可以將其拆分成更小的任務執行,並將小任務加到執行緒佇列中,然後再從一個隨即執行緒中偷一個並把它加入自己的佇列中。
就比如兩個CPU上有不同的任務,這時候A已經執行完,B還有任務等待執行,這時候A就會將B隊尾的任務偷過來,加入自己的佇列中,對於傳統的執行緒,ForkJoin更有效的利用的CPU資源!
我們來看一下ForkJoin的實現:實現這個框架需要繼承RecursiveTask 或者 RecursiveAction ,RecursiveTask是有返回值的,相反Action則沒有
測試
import java.util.concurrent.ForkJoinPool; import java.util.concurrent.ForkJoinTask; import java.util.stream.LongStream; import org.junit.Test; public class TestForkJoin { @Test public void test1(){ long start = System.currentTimeMillis(); ForkJoinPool pool = new ForkJoinPool(); ForkJoinTask<Long> task = new ForkJoinCalculate(0L, 10000000000L); long sum = pool.invoke(task); System.out.println(sum); long end = System.currentTimeMillis(); System.out.println("耗費的時間為: " + (end - start)); //112-1953-1988-2654-2647-20663-113808 } @Test public void test2(){ long start = System.currentTimeMillis(); long sum = 0L; for (long i = 0L; i <= 10000000000L; i++) { sum += i; } System.out.println(sum); long end = System.currentTimeMillis(); System.out.println("耗費的時間為: " + (end - start)); //34-3174-3132-4227-4223-31583 } @Test public void test3(){ long start = System.currentTimeMillis(); Long sum = LongStream.rangeClosed(0L, 10000000000L) .parallel() .sum(); System.out.println(sum); long end = System.currentTimeMillis(); System.out.println("耗費的時間為: " + (end - start)); //2061-2053-2086-18926 } }
import java.util.concurrent.RecursiveTask; public class ForkJoinCalculate extends RecursiveTask<Long>{ /** * */ private static final long serialVersionUID = 13475679780L; private long start; private long end; private static final long THRESHOLD = 10000L; //臨界值 public ForkJoinCalculate(long start, long end) { this.start = start; this.end = end; } @Override protected Long compute() { long length = end - start; if(length <= THRESHOLD){ long sum = 0; for (long i = start; i <= end; i++) { sum += i; } return sum; }else{ long middle = (start + end) / 2; ForkJoinCalculate left = new ForkJoinCalculate(start, middle); left.fork(); //拆分,並將該子任務壓入執行緒佇列 ForkJoinCalculate right = new ForkJoinCalculate(middle+1, end); right.fork(); return left.join() + right.join(); } } }
我們觀察上面可以看出來執行10000000000L的相加操作各自執行完畢的時間不同。觀察到當資料很大的時候ForkJoin比普通執行緒實現有效的多,但是相比之下ForkJoin的實現實在是有點麻煩,這時候Java 8 就為我們提供了一個並行流來實現ForkJoin實現的功能。可以看到並行流比自己實現ForkJoin還要快
Java 8 中將並行流進行了優化,我們可以很容易的對資料進行並行流的操作,Stream API可以宣告性的通過parallel()與sequential()在並行流與穿行流中隨意切換!