1. 程式人生 > >java8-並行流

java8-並行流

前言:

很久沒有寫部落格了,csdn的編輯器已經變成了markdown,不知道是否還有當年輸入Tab變沒的問題,我想應該已經解決了,一段時間沒有寫部落格,大概一年吧,說實話我並沒有什麼負罪敢,因為寫部落格就是分享知識,讓人快樂,而且我絕對有時候人需要沉澱,而不管什麼都會有一個起伏,起伏是正常的,如果一直不變化,不品味,我絕的也不會有什麼變化,最近真的是想寫部落格了,所以我回來了,帶著新的心態,帶著新的心情,用著新的markdown,總是要去接受一些新的東西呢。

吐槽:用markdown真的很不爽。。。總之最後再改格式吧

這次我要寫的是java8,有可能會寫成一個系列,這次簡單的說下java8對集合的操作中用並行流加快對資料操作的速度。

使用

在使用流的時候,可以用parallel將預設的順序流變成並行流,可以用sequential將並行流轉換為順序流,當並行流的時候所有操作按照並行的方式,而順序流就是順序的方式,並不是所有的操作

 stream. parallel ( ) 
         filter(...) 
         sequential ( ) 
         map (...) 
         parallel ( ) 
         reduce() ; 

並行流是由ForkJoin 框架實現的就是java7中的ForkJoin ,預設的執行緒數與CPU的核數一樣,一般情況下這是最好的選擇
但 是 你 可 以 通 過 系 統 屬 性 java.util.concurrent.ForkJoinPool.common. parallelism來改變執行緒池大小,如下所示:
System.setProperty(“java.util.concurrent.ForkJoinPool.common.parallelism”,”12”);

原理

如果說順序流是我們買的一袋子奧利奧,吃的時候是一個人一塊一塊拿出來吃,那麼並行流就是把奧利奧都拿出來,分成N堆,多個人一起吃,總之核心思想就是將一個問題,分組解決,然後再合併。

下面就是把從1到8相加得計的並行流分組
分組

並行流並不一定是最快的,有些情況順序流比並行流快,打個比方,有100個奧利奧,一個人吃和10個人吃哪個快,當然是10個人的快,但是如果有5個奧利奧,一個人吃和5個人吃哪個快,一個人快,因為分奧利奧還要時間,奧利奧還沒分完,一個人就已經吃完了。

程式碼

 import java.util.stream.LongStream;
import java.util.stream.Stream;

public
class ParallelStreams { //傳統方式 public static long iterativeSum(long n) { long result = 0; for (long i = 1L; i <= n; i++) { result += i; } return result; } //順序流 public static long sequentialSum(long n) { //生成無限長度的流Stream.iterate return Stream.iterate(1L, i -> i + 1) .limit(n) .reduce(0L, Long::sum); } //並行流 public static long parallelSum(long n) { return Stream.iterate(1L, i -> i + 1) .limit(n) .parallel() .reduce(0L, Long::sum); } //順序流---改用LongStream.rangeClosed public static long rangedSum(long n) { return LongStream.rangeClosed(1, n) .reduce(0L, Long::sum); } //並行流---改用LongStream.rangeClosed public static long parallelRangedSum(long n) { return LongStream.rangeClosed(1, n) .parallel() .reduce(0L, Long::sum); } //順序 public static long sideEffectSum(long n) { Accumulator accumulator = new Accumulator(); LongStream.rangeClosed(1, n).forEach(accumulator::add); return accumulator.total; } //並行 public static long sideEffectParallelSum(long n) { Accumulator accumulator = new Accumulator(); LongStream.rangeClosed(1, n).parallel().forEach(accumulator::add); return accumulator.total; } }
public class Accumulator {
    public long total = 0;

    public void add(long value) {
        total += value;
    }
}
import java.util.function.Function;

public class parallel {
    public static void main(String[] args) {


        System.out.println("for迴圈  Iterative sum done in:" +
                measureSumPerf(ParallelStreams::iterativeSum, 10_000_000) + " msecs");
        System.out.println("循序流 Sequential sum done in:" +
                measureSumPerf(ParallelStreams::sequentialSum, 10_000_000) + " msecs");
        System.out.println("並行流 Parallel sum done in: " +
                measureSumPerf(ParallelStreams::parallelSum, 10_000_000) + " msecs" );
        //結論-it瑞特
        // iterate生成的是裝箱的物件,必須拆箱成數字才能求和;
        // 我們很難把iterate分成多個獨立塊來並行執行。


        //改用--LongStream.rangeClosed(拆裝箱影響速度問題)
        //LongStream.rangeClosed直接產生原始型別的long數字,沒有裝箱拆箱的開銷。
        //LongStream.rangeClosed會生成數字範圍,很容易拆分為獨立的小塊。例如,範圍1~20
        System.out.println("for迴圈  Iterative sum done in:" +
                measureSumPerf(ParallelStreams::iterativeSum, 10_000_000) + " msecs");
        System.out.println("順序流 Sequential sum done in:" +
                measureSumPerf(ParallelStreams::rangedSum, 10_000_000) + " msecs");
        System.out.println("並行流 Parallel sum done in: " +
                measureSumPerf(ParallelStreams::parallelRangedSum, 10_000_000) + " msecs" );

        //原子性影響正確結果問題
        System.out.println("飛並行 parallel sum done in: " +
                measureSumPerf(ParallelStreams::sideEffectSum, 10_000_000L) +" msecs" );
        System.out.println("並行 parallel sum done in: " +
                measureSumPerf(ParallelStreams::sideEffectParallelSum, 10_000_000L) +" msecs" );



    }

    //迴圈10次取最小一個的執行時間
    public static long measureSumPerf(Function<Long, Long> adder, long n) {
        long fastest = Long.MAX_VALUE;
        for (int i = 0; i < 10; i++) {
            long start = System.nanoTime();
            long sum = adder.apply(n);
            //納秒
            long duration = (System.nanoTime() - start) / 1_000_000;
            System.out.println("Result: " + sum);
            if (duration < fastest) fastest = duration;
        }
        return fastest;
    }

}

上面的程式碼跑起來都有對比,一共3項總結為
1 裝箱導致並行流執行效率慢,所以避免拆裝箱
2 LongStream.rangeClosed直接產生原始型別的long數字,沒有裝箱拆箱的開銷
3 操作一定要是原子性的!否則會有多執行緒問題

注意事項

1 如果有疑問,測量。把順序流轉成並行流輕而易舉,但卻不一定是好事。我們在本節中 已經指出,並行流並不總是比順序流快。此外,並行流有時候會和你的直覺不一致,所 以在考慮選擇順序流還是並行流時,第一個也是最重要的建議就是用適當的基準來檢查 其效能。
2 留意裝箱。自動裝箱和拆箱操作會大大降低效能。Java 8中有原始型別流(IntStream、 LongStream、DoubleStream)來避免這種操作,但凡有可能都應該用這些流。
3 有些操作本身在並行流上的效能就比順序流差。特別是limit和findFirst等依賴於元 素順序的操作,它們在並行流上執行的代價非常大。例如,findAny會比findFirst性 能好,因為它不一定要按順序來執行。你總是可以呼叫unordered方法來把有序流變成 無序流。那麼,如果你需要流中的n個元素而不是專門要前n個的話,對無序並行流呼叫 limit可能會比單個有序流(比如資料來源是一個List)更高效。
4 還要考慮流的操作流水線的總計算成本。設N是要處理的元素的總數,Q是一個元素通過 流水線的大致處理成本,則N*Q就是這個對成本的一個粗略的定性估計。Q值較高就意味 著使用並行流時效能好的可能性比較大。
5 對於較小的資料量,選擇並行流幾乎從來都不是一個好的決定。並行處理少數幾個元素 的好處還抵不上並行化造成的額外開銷。
6 要考慮流背後的資料結構是否易於分解。例如,ArrayList的拆分效率比LinkedList 高得多,因為前者用不著遍歷就可以平均拆分,而後者則必須遍歷。另外,用range工廠
7 流自身的特點,以及流水線中的中間操作修改流的方式,都可能會改變分解過程的效能。
理,但篩選操作可能丟棄的元素個數卻無法預測,導致流本身的大小未知。
還要考慮終端操作中合併步驟的代價是大是小(例如Collector中的combiner方法)。 如果這一步代價很大,那麼組合每個子流產生的部分結果所付出的代價就可能會超出通 過並行流得到的效能提升。

可拆分性質

下面是各個集合的可拆分性,可拆分性其實就是可被分組的能力,奧利奧容易被分組,蛋糕就難一點,要切開,糖就更難分組了,因為集合的資料結構不一樣,有的是散開無序,有的有序,有的鏈式,所以可分性不一樣,可分性不好的要花更多的時間拆分。
這裡寫圖片描述

總結:

其實這次的我早就寫到了onenote上,剛剛寫又加了自己說的一點廢話,總之markdown真的好難用。。。還需要多多學習呢 = =