1. 程式人生 > >提交訂單效能優化系列之006-普通的Thread多執行緒改為Java8的parallelStream併發流

提交訂單效能優化系列之006-普通的Thread多執行緒改為Java8的parallelStream併發流

概括總結

Java8的parallelStream併發流能達到跟多執行緒類似的效果,但它也不是什麼善茬,為了得到跟上一版本的多執行緒類似的效果,一改再改,雖然最後改出來了,但是還是存在理解不了的地方。

006版本更新說明

  1. 上一版本中寫了多個測試類,每個類針對一個執行緒數量。寫這一版的時候覺得上一版本有點太傻了,於是花了點時間想了想辦法,發現確實可以在一個類中完成。也證明了上一版的寫法確實是傻。
  2. 把多執行緒(即new Thread().start())的方式改為了Java8的parallelStream

怎樣得到一個parallelStream

理論上,你需要先有一個List<?>

,任意型別的List都行,然後呼叫它的.parallelStream()方法就可以了。

對我這個例子來說,元素的型別不重要,因此選擇了Integer型別,核心程式碼如下:

AtomicInteger atomicInteger = new AtomicInteger(0);
return Arrays.asList(new Integer[size]).parallelStream().map(i -> atomicInteger.incrementAndGet());

值得注意的是,第一行用的是AtomicInteger而不是Integer,因為Integer會存在併發問題。

第二行的意思是:新建一個大小為size

的陣列,把陣列轉成List,再把List轉成parallelStream,再把列表中的元素初始化成遞增的整數,最後返回。

為什麼說parallelStream不是什麼善茬

簡單來說,我認為原因是:因為它的預設值適用的場景是CPU密集型的,而一般的Web專案是IO密集型的(一般的Web專案都是需要跟資料庫打交道的,針對資料庫的操作主要就都是IO,而對CPU的消耗並不高)。

當不能使用預設值的時候,就需要開發人員額外去了解parallelStream的用法,而這些資料還不是特別好找。比如說:parallelStream預設的併發執行緒數是多少?怎麼修改預設的執行緒數?

問題1: 在Java程式碼中,怎樣獲取可用的CPU處理器的數量?程式碼如下:(在我的機器上結果是:8)

Runtime.getRuntime().availableProcessors()

問題2: parallelStream預設的併發執行緒數是多少?程式碼如下:(在我的機器上結果是:7)

ForkJoinPool.getCommonPoolParallelism()

問題3: 為什麼parallelStream預設的併發執行緒數要比CPU處理器的數量少1個?因為最優的策略是每個CPU處理器分配一個執行緒,然而主執行緒也算一個執行緒,所以要佔一個名額。

問題4: 那如果電腦比較差,就只有1個CPU要怎麼辦?那就不管了,預設的併發執行緒數就是1,總不能為零吧。

問題5: 預設的併發執行緒數太少了,要怎麼修改?如程式碼如下:(改成了20)

System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "20");

問題6: 預設的併發執行緒數可以反覆修改嗎?不能。因為java.util.concurrent.ForkJoinPool.common.parallelismfinal型別的,整個JVM中只允許設定一次。

執行以下程式碼:

int[] threadCountArr = {32, 48, 72};
for (int threadCount : threadCountArr) {
    System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "" + threadCount);
    System.out.println("ForkJoinPool.getCommonPoolParallelism() : " + ForkJoinPool.getCommonPoolParallelism());
}

列印結果如下:

ForkJoinPool.getCommonPoolParallelism() : 32
ForkJoinPool.getCommonPoolParallelism() : 32
ForkJoinPool.getCommonPoolParallelism() : 32

問題7: 既然預設的併發執行緒數不能反覆修改,那怎麼進行不同執行緒數量的併發測試呢?答案是:引入ForkJoinPool。用法如下:

new ForkJoinPool(threadCount).submit(() -> {
    parallelStream.forEach(i -> {
        // 這裡省略提交訂單的程式碼
    });
}).get();

問題8: java.util.concurrent.ForkJoinPool.common.parallelismnew ForkJoinPool(threadCount)之間有什麼關係?答案是:不知道。

這個答案很讓人失望,但是我確實沒有查出來。我這邊測試的結果是:

  • 如果在new ForkJoinPool(threadCount)之前沒有設定java.util.concurrent.ForkJoinPool.common.parallelism的值,那麼new ForkJoinPool(threadCount)的作用就不明顯,即就是說,改變threadCount的值對效能沒有多大影響。

  • 如果在之前設定了java.util.concurrent.ForkJoinPool.common.parallelism的值,但是設定得比較小(比如32),則後續的new ForkJoinPool(threadCount)的作用也不明顯。

  • 只有先把java.util.concurrent.ForkJoinPool.common.parallelism的值設定得比較大(比如10000),後續的new ForkJoinPool(threadCount)threadCount改變之後,才對效能有明顯的影響。

問題9: 如果按問題8中的來修改,把java.util.concurrent.ForkJoinPool.common.parallelism的值設定得比較大(比如10000),就意味不再適用於CPU密集型的操作了,那應該怎麼辦呢?答案是:每次都用new ForkJoinPool(threadCount),整體放棄使用預設的parallelStream。(那多麻煩啊)

你看,隨隨便便就有這麼多問題,頓時就不想用了,是不是。

“併發執行緒數量”與“每秒能提交的訂單數量”之間的關係

這次測試的結果與上次測試的結果對比圖如下:(紅色為上一版本使用Thread測試的結果,黑色為這一版本使用parallelStream測試的結果)

Snipaste_20180922_143610png

可以看到差別並不大。

【備註】:不同的機器上的測試結果會不一樣,以上測試結果僅供參考。

原始碼