Java8新特性——並行流與順序流
阿新 • • 發佈:2019-01-06
並行流就是把一個內容分成多個數據塊,並用不同的執行緒分別處理每個資料塊的流。
Java8中將並行流進行了優化,我們很容易的對資料進行並行操作。Stream API可以宣告性地通過parallel()與scqucntial()在並行流與順序流之間進行切換。
Fork-Join框架:是Java7提供的一個用於執行任務的框架,就是在必要的情況下,將一個大任務,進行拆分(Fork)成若干個小任務(拆分到不能再拆分),再將一個個的小任務運算的結果進行Join彙總。
Fork-Join框架是ExecutorService介面的一種具體實現,目的是為了幫助更好的利用多處理器帶來的好處。它是為那些能被遞迴地拆分成子任務的工作型別量身設計的。其目的在於能夠使用所有有可用的運算能力來提升你的應用的效能。
Fork/Join 框架與傳統執行緒池的區別
採用 “工作竊取”模式(work-stealing): 當執行新的任務時它可以將其拆分分成更小的任務執行,並將小任務加到線 程佇列中,然後再從一個隨機執行緒的佇列中偷一個並把它放在自己的佇列中。
相對於一般的執行緒池實現,fork/join框架的優勢體現在對其中包含的任務的 處理方式上.在一般的執行緒池中,如果一個執行緒正在執行的任務由於某些原因 無法繼續執行,那麼該執行緒會處於等待狀態。而在fork/join框架實現中,如果 某個子問題由於等待另外一個子問題的完成而無法繼續執行.那麼處理該子 問題的執行緒會主動尋找其他尚未執行的子問題來執行。這種方式減少了執行緒 的等待時間, 高了效能。
package ParallelFlow; import java.util.concurrent.RecursiveTask; public class ParallelFlow extends RecursiveTask<Long>{ private long start; private long end; private static final long THRESHOLD = 10000;//臨界值 public ParallelFlow(long start, long end) { this.start = start; this.end = end; } @Override protected Long compute() { long length = end - start; /*如果不到臨界值就執行加操作*/ if (length <= THRESHOLD){ long sum = 0; for (long i = start;i<=end;i++){ sum += i; } return sum; }else { long middle = (start + end) / 2; ParallelFlow left = new ParallelFlow(start, middle); left.fork();//拆分子任務,同時壓入執行緒佇列 ParallelFlow right = new ParallelFlow(middle + 1, end); right.fork(); return left.join() + right.join(); } } }
package ParallelFlow;
import org.junit.Test;
import java.time.Duration;
import java.time.Instant;
import java.util.concurrent.ForkJoinPool;
import java.util.stream.LongStream;
public class TestForkJoin {
/*
ForkJoin框架
*/
@Test
public void fun1(){
Instant start = Instant.now();
ForkJoinPool pool = new ForkJoinPool();
ParallelFlow task = new ParallelFlow(0, 1000000000L);
Long sum = pool.invoke(task);
System.out.println(sum);
Instant end = Instant.now();
System.out.println("耗時時間為:" + Duration.between(start,end).toMillis());//耗時時間為:3873
}
/*
普通for
*/
@Test
public void fun2(){
Instant start = Instant.now();
long sum = 0L;
for (long i = 0;i<1000000000L;i++){
sum += i;
}
System.out.println(sum);
Instant end = Instant.now();
System.out.println("耗時時間為:" + Duration.between(start,end).toMillis());//耗時時間為:4167
}
/*
Java8並行流
*/
@Test
public void fun3(){
Instant start = Instant.now();
LongStream.rangeClosed(0,1000000000L)
.parallel()
.reduce(0,Long::sum);
Instant end = Instant.now();
System.out.println("耗時時間為:" + Duration.between(start,end).toMillis());//耗時時間為:3063
}
}