1. 程式人生 > >Java8--Stream 並行流詳解

Java8--Stream 並行流詳解

簡介

並行流就是把一個內容分成多個數據塊,並用不同的執行緒分別處理每個資料塊的流。序列流則相反,並行流的底層其實就是ForkJoin框架的一個實現。 java.util.Collection < E >新添加了兩個預設方法

  • default Stream stream() : 返回序列流
  • default Stream parallelStream() : 返回並行流

將一個並行流轉成順序的流只要呼叫sequential()方法 stream.parallel() .filter(…) .sequential() .map(…) .parallel() .reduce();

這兩個方法可以多次呼叫, 只有最後一個呼叫決定這個流是順序的還是併發的。 想要明白並行流,那麼就必須瞭解ForkJion框架

Fork/Join框架介紹

Fork/Join框架時java7提供的一個用於並行執行任務的框架:就是在必要的情況下,將一個大任務,進行拆分成若干個小任務(拆到不可再拆時),再將一個個小的任務運算的結果進行jion彙總。 在這裡插入圖片描述

注意:Fork/Jion框架使用的預設執行緒數等於你機器的處理器核心數 通過這個方法可以修改這個值,而且這個還是全域性屬性,不過建議一般不修改 System.setProperty(“java.util.concurrent.ForkJoinPool.common.parallelism”, “12”);

工作竊取模式

Fork/Join框架它所使用的執行緒模式----工作竊取模式

。每個執行緒都會為分配給它的任務儲存一個雙向鏈式佇列,每完成一個任務,就會從佇列頭上取出下一個任務開始執行。基於種種原因,某個執行緒可能很早就完成了分配給它的任務,而其他的執行緒還未完成,那麼這個執行緒就會,隨機選一個執行緒,從佇列的尾巴上“偷走一個任務”。這個過程一直繼續下去,直到所有的任務都執行完畢,所有的佇列都清空。 在這裡插入圖片描述

使用Fork/Join框架

  1. 首先定義一個類,去繼承RecursiveTask或者RecursiveAction
public class ForkJionCalculate extends RecursiveTask<Long> implements Serializable {
    private static final long serialVersionUID = 2462375556031755900L;
    
    @Override
    protected Long compute() {
        return null;
    }
}
-------------------------------------------------
public class ForkJionCalculate extends RecursiveAction implements Serializable {
    private static final long serialVersionUID = 2462375556031755900L;


    @Override
    protected void compute() {
        
    }
}

從上面的程式碼中我們可以看出RecursiveTaskRecursiveAction都有一個抽象方法compute(),只是RecursiveTask有返回值,RecursiveAction沒有返回值,類時於Runnable和Callable 下面以RecursiveTask為例子

public class ForkJionCalculate extends RecursiveTask<Long> implements Serializable {
    private static final long serialVersionUID = 2462375556031755900L;


    private long start;

    private long end;

    private static final long THRESHOLD = 10000L;//臨界值

    public ForkJionCalculate(long start, long end) {
        this.start = start;
        this.end = end;
    }

    @Override
    protected Long compute() {
        long length = end -start;
        if(length<=THRESHOLD){
            long sum =0;
            for (long i = start; i <=end ; i++) {
                sum+=i;
            }
            return sum;
        }else {
            long middle =(start+end)/2;
            ForkJionCalculate left = new ForkJionCalculate(start, middle);
            left.fork();//拆分,並將該子任務壓入執行緒佇列
            ForkJionCalculate right = new ForkJionCalculate(middle + 1, end);
            right.fork();
            return left.join()+right.join();
        }
    }
}

接下來測試一下,要執行一個ForkJoin的任務,首先建一個執行緒池ForkJoinPool,這個跟我們的普通的執行緒池使用上很像,因為它們的祖先都是ExecutorService

 @org.junit.Test
    public void test(){
        Instant start = Instant.now();

        ForkJoinPool pool = new ForkJoinPool();
        ForkJionCalculate task = new ForkJionCalculate(0, 10000000000L);
        Long sum = pool.invoke(task);
        System.out.println(sum);
        Instant end = Instant.now();
        System.out.println("耗費時間"+Duration.between(start,end).toMillis());//2046
    }

然後,我們用傳統的迴圈來比較

@org.junit.Test
    public void test1(){
        Instant start = Instant.now();
        long sum = 0L;
        for (long i = 0L; i < 10000000000L; i++) {
            sum+=i;
        }
        System.out.println(sum);
        Instant end = Instant.now();
        System.out.println("耗費時間"+Duration.between(start,end).toMillis());//3561
    }

可以看出ForkJoin的效率高很多

注意:使用ForkJoin時,任務的量一定要大,否則太小,由於任務拆分也會消耗時間,它執行的效率不一定比for迴圈高

在這裡插入圖片描述

最後我們用Stream的並行流來測試(內部實現的是ForkJoin)

@org.junit.Test
    public void test3(){
        Instant start = Instant.now();
        long sum = LongStream.rangeClosed(0, 10000000000L).parallel().sum();
        System.out.println(sum);
        Instant end = Instant.now();
        System.out.println("耗費時間"+Duration.between(start,end).toMillis());//1421
    }

可以看出也是比傳統的要快

注意:上面程式碼上我們使用的是LongStream來生成資料 ( rangeClosed:需要傳入開始節點和結束節點兩個引數,返回的是一個有序的LongStream。包含開始節點和結束節點兩個引數之間所有的引數,間隔為1 range:同理 區別就是rangeClosed包含最後的結束節點,range不包含。) 接下來我們使用迭代的方式生成

 @org.junit.Test
    public void test4(){
        Instant start = Instant.now();
        Long reduce = Stream.iterate(1L, x -> x + 1).limit(10000000000L).parallel().reduce(0L, Long::sum);
        System.out.println(reduce);
        Instant end = Instant.now();
        System.out.println("耗費時間"+Duration.between(start,end).toMillis());
    }

直接報錯 超出記憶體(我16G) 所以在使用並行流的時候需要注意:

  1. 留意裝箱。自動裝箱和拆箱操作會大大降低效能。Java8中有原始型別流(IntStream、LongStream、DoubleStream)來避免這種操作,但凡有可能都應該使用這些原始流。
  2. 有些操作本身在並行流上的效能就比順序流差。特別是limit、findFirst等依賴於元素順序的操作,它們在並行流上執行的代價非常大。例如,findAny會比findFirst效能好,因為它不一定要順序來執行。你總是可以呼叫unordered方法來把順序流變成無須流。那麼,如果你需要流中的n個元素而不是專門的前n個的話,對無序並行流呼叫limit可能會比單個有序流(比如資料來源是List)更高效。
  3. 還要考慮流的操作流水線的總計算成本。設N是要處理的元素的總數,Q是一個元素通過流水線的大致處理成本,則N*Q就是這個對成本的一個粗略的定型估計。Q值較高就意味著使用並行流時效能好的可能性比較大。
  4. 對於較小的資料量,選擇並行流幾乎從來不都是一個好的選擇。並行處理少數幾個元素的好處還抵不上並行化造成的額外開銷。
  5. 要考慮流背後的資料結構是否易於分解。例如,ArrayList的拆分效率比LinkedList高很多,因為前者用不著遍歷就可以平均拆分,而後者則必須遍歷。另外,用range工廠方法建立的原始型別流也可以快速分解。最後,你可以自己實現Spliterator來完全掌握分解過程。
  6. 流自身的特點,以及流水線中的中間操作修改流的方式,都可能會改變分解過程的效能。例如,一個SIZED流可以分成大小相等的兩部分,這樣每個部分都可以比較高效地並行處理,但篩選操作可能丟棄的元素個數卻無法預測,導致流本身的大小未知。
  7. 還要考慮終端操作中合併步驟的代價是大是小(例如Collector中的combiner方法)。如果這一步代價很大,那麼組合每個子流產生的部分結果所付出的代價就可能會超出通過並行流得到的效能提升。