1. 程式人生 > >java執行緒池,工作竊取演算法

java執行緒池,工作竊取演算法

前言

在上一篇《java執行緒池,阿里為什麼不允許使用Executors?》中我們談及了執行緒池,同時又發現一個現象,當最大執行緒數還沒有滿的時候耗時的任務全部堆積給了單個執行緒, 程式碼如下:

ThreadPoolExecutor executor = new ThreadPoolExecutor(
        1, //corePoolSize
        100, //maximumPoolSize
        100, //keepAliveTime
        TimeUnit.SECONDS, //unit
        new LinkedBlockingDeque<>(100));//workQueue

for (int i = 0; i < 5; i++) {
    final int taskIndex = i;
    executor.execute(() -> {
        System.out.println(taskIndex);
        try {
            Thread.sleep(Long.MAX_VALUE);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    });
}
// 輸出: 0

下圖很形象的說明了這個問題:

那麼有沒有一種機制,線上程池中還有執行緒可以提供服務的時候幫忙分擔一些已經被分配給某一個執行緒的耗時任務呢?
答案當然是有的:工作竊取演算法

工作竊取 (Work stealing)

這邊大家先不要將這個跟java掛鉤,因為這個屬於演算法,一種思想和套路,並不是特定語言特有的東西,所以不同的語言對應的實現也不盡一樣,但核心思想一致。
這邊會用“工作者”來代替執行緒的說法,如果在java中這個工作者就是執行緒。

工作竊取核心思想是,自己的活幹完了去看看別人有沒有沒幹完的活,如果有就拿過來幫他幹。
大多數實現機制是:為每個工作者程分配一個雙端佇列(本地佇列)用於存放需要執行的任務,當自己的佇列沒有資料的時候從其它工作者佇列中獲得一個任務繼續執行。

我們來看一張圖,這張圖是發生了工作竊取時的狀態。

可以看到工作者B的本地佇列中沒有了需要執行的規則,它正嘗試從工作者A的任務佇列中偷取一個任務。

為什麼說嘗試?因為涉及到並行程式設計肯定涉及到併發安全的問題,有可能在偷取過程中工作者A提前搶佔了這個任務,那麼B的偷取就會失敗。大多數實現會盡量避免發生這個問題,所以大多數情況下不會發生。

併發安全的問題是怎麼避免的呢?

一般是自己的本地佇列採取LIFO(後進先出),偷取時採用FIFO(先進先出),一個從頭開始執行,一個從尾部開始執行,由於偷取的動作十分快速,會大量降低這種衝突,也是一種優化方式。

Java中的工作竊取演算法執行緒池

在Java 1.7新增了一個ForkJoinPool類,主要是實現了工作竊取演算法的執行緒池,該類在1.8中被優化了,同時1.8在Executors類中還新增了兩個newWorkStealingPool工廠方法。

java7中的fork/join task 和 java8中的並行stream都是基於ForkJoinPool。

// 使用當前處理器數, 相當於呼叫 newWorkStealingPool(Runtime.getRuntime().availableProcessors());
public static ExecutorService newWorkStealingPool();
public static ExecutorService newWorkStealingPool(int parallelism);

同時 ForkJoinPool 還在全域性建立了一個公共的執行緒池

ForkJoinPool.commonPool();

預設的並行度是當前JVM識別到的處理器數。這個值也是可以通過引數進行變更的,下面是可以通過JVM熟悉進行commonPool設定的引數。

字首統一為: java.util.concurrent.ForkJoinPool.common.
比如 parallelism 就要寫為 java.util.concurrent.ForkJoinPool.common.parallelism

引數 描述 預設值
parallelism 並行級別 JVM識別到的處理器數
threadFactory 執行緒工廠類名 ForkJoinPool.DefaultForkJoinWorkerThreadFactory
exceptionHandler 錯誤處理程式 null
maximumSpares 最大允許額外執行緒數 256

使用工作竊取演算法的執行緒池來優化之前的程式碼

ExecutorService executor = Executors.newWorkStealingPool(8);

for (int i = 0; i < 5; i++) {
    final int taskIndex = i;
    executor.execute(() -> {
        System.out.println(taskIndex);
        try {
            Thread.sleep(Long.MAX_VALUE);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    });
}

// 無序輸出 0~4

如果將Executors.newWorkStealingPool(8)改成ForkJoinPool.commonPool()會輸出什麼?

如果你能知道輸出什麼那麼你對這個機制就算掌握了,會輸出當前執行環境中處理器(cpu)數量的次數(如果核算大於5就只會輸出5個結果)。

newWorkStealingPool 和 ForkJoinPool.commonPool 該優先選擇哪個?

這個沒有最優解,推薦執行的小任務(零散的)使用commonPool,而有特定目的的則使用newWorkStealingPoolnew ForkJoinPool

使用ForkJoinPool.commonPool 需要注意的問題

commonPool預設使用當前環境的處理器格式來當做並行程度,如果遇上堵塞形任務一樣會遇到浪費算力的問題。
這點在容器化時需要特別注意,因為容器化的cpu個數限制往往不會太大。
這種時候可以通過設定預設的並行度或者使用newWorkStealingPool來手動指定並行度。

最後

為什麼ForkJoinPool極少出現執行緒關鍵字?

現在許多語言淡化了執行緒這個概念,而golang中更是直接去掉了執行緒能力改為提供協程goroutine
目的還是執行緒是OS的資源,OS對程式內部執行其實並沒有太瞭解,為了避免執行緒資源的浪費許多語言會自己管理執行緒。
對於程式來說我們關心的主要還是任務的並行執行,並不關心是執行緒還是協程。
下面是一些對應關係:

  • CPU : 執行緒 (1:N)
  • 執行緒 : 協程 (1:N)

CPU由OS管理,OS提供執行緒給程式使用,程式利用執行緒提供協程能力給應用使用。

ForkJoinPool一定更快嗎?

不,大家都知道做的事情越多邏輯越複雜效率會越低。
ForkJoinPool中的工作佇列,工作竊取都是需要額外管理的,同時也對執行緒排程和GC帶來了壓力。
所以ForkJoinPool並不是萬能藥大家根據具體需要去使用。

後面可能會跟大家分享下 Spring 中的 @Async