1. 程式人生 > >並行流與序列流 Fork/Join框架

並行流與序列流 Fork/Join框架

一、並行流概念:

  並行流就是把一個內容分成多個數據塊,並用不同的執行緒分別處理每個資料塊的流。

  java8中將並行進行了優化,我們可以很容易的對資料進行並行操作。Stream API可以宣告性的通過parallel()與sequential()在並行流與順序流之間進行切換。

二、Fork/Join 框架

  就是在必要的情況下,將一個大任務,進行拆分(fork)成若干個小任務(拆到不可再拆時),再將一個個的小任務運算的結果進行 join 彙總。

  

  Fork/Join框架與傳統執行緒池的區別:

  採用 “工作竊取”模式(work-stealing):當執行新的任務時它可以將其拆分分成更小的任務執行,並將小任務加到執行緒佇列中,然後再從一個隨機執行緒的佇列中偷一個並把它放在自己的佇列中。

  相對於一般的執行緒池實現,fork/join框架的優勢體現在對其中包含的任務的處理方式上.在一般的執行緒池中,如果一個執行緒正在執行的任務由於某些原因無法繼續執行,那麼該執行緒會處於等待狀態,

  而在fork/join框架實現中,如果某個子問題由於等待另外一個子問題的完成而無法繼續執行.那麼處理該子問題的執行緒會主動尋找其他尚未執行的子問題來執行.這種方式減少了執行緒的等待時間,提高了效能。

  Fork/Join實現例子

  1、使用傳統forkJoin實現 

//計算從start-end之和
public class ForkJoinCalculate extends RecursiveTask<Long>{

    /**
     * 
     */
    private static final long serialVersionUID = 13475679780L;
    
    private long start;
    private long end;
    
    private static final long THRESHOLD = 10000L; //臨界值
    
    public ForkJoinCalculate(long start, long end) {
        this.start = start;
        this.end = end;
    }
    
    @Override
    protected Long compute() {
        long length = end - start;
        
        if(length <= THRESHOLD){
            long sum = 0;
            
            for (long i = start; i <= end; i++) {
                sum += i;
            }
            
            return sum;
        }else{
            long middle = (start + end) / 2;
            
            ForkJoinCalculate left = new ForkJoinCalculate(start, middle);
            left.fork(); //拆分,並將該子任務壓入執行緒佇列
            
            ForkJoinCalculate right = new ForkJoinCalculate(middle+1, end);
            right.fork();
            //彙總
            return left.join() + right.join();
        }
        
    }

}
    public void test1(){
        long start = System.currentTimeMillis();
        
        ForkJoinPool pool = new ForkJoinPool();
        ForkJoinTask<Long> task = new ForkJoinCalculate(0L, 10000000000L);
        
        long sum = pool.invoke(task);
        System.out.println(sum);
        
        long end = System.currentTimeMillis();
        
        System.out.println("耗費的時間為: " + (end - start)); //112-1953-1988-2654-2647-20663-113808
    }

   2、使用java8並行流實現

@Test
    public void test3(){
        long start = System.currentTimeMillis();
        
        Long sum = LongStream.rangeClosed(0L, 10000000000L)
                             .parallel()
                             .sum();
        
        System.out.println(sum);
        
        long end = System.currentTimeMillis();
        
        System.out.println("耗費的時間為: " + (end - start)); //2061-2053-2086-18926
    }