1. 程式人生 > >《Java 8 in Action》Chapter 7:並行資料處理與效能

《Java 8 in Action》Chapter 7:並行資料處理與效能

在Java 7之前,並行處理資料集合非常麻煩。第一,你得明確地把包含資料的資料結構分成若干子部分。第二,你要給每個子部分分配一個獨立的執行緒。第三,你需要在恰當的時候對它們進行同步來避免不希望出現的競爭條件,等待所有執行緒完成,最後把這些部分結果合併起來。Java 7引入了一個叫作分支/合併的框架,讓這些操作更穩定、更不易出錯。
Stream介面讓你不用太費力氣就能對資料集執行並行操作。它允許你宣告性地將順序流變為並行流。此外,你將看到Java是如何變戲法的,或者更實際地來說, 流是如何在幕後應用Java 7引入的分支/合併框架的。

1. 並行流

並行流就是一個把內容分成多個數據塊,並用不同的執行緒分別處理每個資料塊的流。

public static long sequentialSum(long n) {
             return Stream.iterate(1L, i -> i + 1)
                          .limit(n)
                          .reduce(0L, Long::sum);
}
傳統寫法:
public static long iterativeSum(long n) {
        long result = 0;
        for (long i = 1L; i <= n; i++) {
            result += i;
        }
        return result;
}

1.1 將順序流轉換為並行流

可以把流轉換成並行流,從而讓前面的函式歸約過程(也就是求和)並行執行——對順序流呼叫parallel方法:

public static long parallelSum(long n) {
        return Stream.iterate(1L, i -> i + 1)
                     .limit(n)
                     .parallel()
                     .reduce(0L, Long::sum);
}

在現實中,對順序流呼叫parallel方法並不意味著流本身有任何實際的變化。它在內部實際上就是設了一個boolean標誌,表示你想讓呼叫parallel之後進行的所有操作都並行執行。類似地,你只需要對並行流呼叫sequential方法就可以把它變成順序流。請注意,你可能以為把這兩個方法結合起來,就可以更細化地控制在遍歷流時哪些操作要並行執行,哪些要順序執行。

配置並行流使用的執行緒池
看看流的parallel方法,你可能會想,並行流用的執行緒是從哪來的?有多少個?怎麼自定義這個過程呢?
並行流內部使用了預設的ForkJoinPool,它預設的執行緒數量就是你的處理器數量,這個值是由Runtime.getRuntime().available- Processors()得到的。
但是你可以通過系統屬性 java.util.concurrent.ForkJoinPool.common.parallelism來改變執行緒池大小,如下所示:
System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism","12");
這是一個全域性設定,因此它將影響程式碼中所有的並行流。反過來說,目前還無法專為某個並行流指定這個值。一般而言,讓ForkJoinPool的大小等於處理器數量是個不錯的預設值,
除非你有很好的理由,否則我們強烈建議你不要修改它。

1.2 測量流效能

並行程式設計可能很複雜,有時候甚至有點違反直覺。如果用得不對(比如採用了一 個不易並行化的操作,如iterate),它甚至可能讓程式的整體效能更差,所以在呼叫那個看似神奇的parallel操作時,瞭解背後到底發生了什麼是很有必要的。
並行化並不是沒有代價的。並行化過程本身需要對流做遞迴劃分,把每個子流的歸納操作分配到不同的執行緒,然後把這些操作的結果合併成一個值。但在多個核心之間移動資料的代價也可能比你想的要大,所以很重要的一點是要保證在核心中並行執行工作的時間比在核心之間傳輸資料的時間長。總而言之,很多情況下不可能或不方便並行化。然而,在使用 並行Stream加速程式碼之前,你必須確保用得對;如果結果錯了,算得快就毫無意義了。

1.3 正確使用並行流

錯用並行流而產生錯誤的首要原因,就是使用的演算法改變了某些共享狀態。下面是另一種實現對前n個自然數求和的方法,但這會改變一個共享累加器:

public static long sideEffectSum(long n) {
    Accumulator accumulator = new Accumulator();
    LongStream.rangeClosed(1, n).forEach(accumulator::add)
    return accumulator.total;
}
public class Accumulator {
    public long total = 0;
    public void add(long value) { total += value; }
}

這段程式碼本身上就是順序的,因為每次訪問total都會出現資料競爭。接下來將這段程式碼改為並行:

public static long sideEffectParallelSum(long n) {
    Accumulator accumulator = new Accumulator();
    LongStream.rangeClosed(1, n).parallel().forEach(accumulator::add);
    return accumulator.total;}
System.out.println("SideEffect parallel sum done in: " + measurePerf(ParallelStreams::sideEffectParallelSum, 10_000_000L) +" msecs" );
Result: 5959989000692
Result: 7425264100768
Result: 6827235020033
Result: 7192970417739
Result: 6714157975331
Result: 7715125932481
SideEffect parallel sum done in: 49 msecs

這回方法的效能無關緊要了,唯一要緊的是每次執行都會返回不同的結果,都離正確值50000005000000差很遠。這是由於多個執行緒在同時訪問累加器,執行total += value,而這一句