1. 程式人生 > 其它 >生產問題之CompletableFuture預設執行緒池踩坑,請務必自定義執行緒池

生產問題之CompletableFuture預設執行緒池踩坑,請務必自定義執行緒池

前言

先說結論,沒興趣瞭解原因的可以只看此處的結論

CompletableFuture是否使用預設執行緒池的依據,和機器的CPU核心數有關。當CPU核心數-1大於1時,才會使用預設的執行緒池,否則將會為每個CompletableFuture的任務建立一個新執行緒去執行

即,CompletableFuture的預設執行緒池,只有在雙核以上的機器內才會使用。在雙核及以下的機器中,會為每個任務建立一個新執行緒,等於沒有使用執行緒池,且有資源耗盡的風險

因此建議,在使用CompletableFuture時,務必要自定義執行緒池。因為即便是用到了預設執行緒池,池內的核心執行緒數,也為機器核心數-1。也就意味著假設你是4核機器,那最多也只有3個核心執行緒,對於CPU密集型的任務來說倒還好,但是我們平常寫業務程式碼,更多的是IO密集型任務,對於IO密集型的任務來說,這其實遠遠不夠用的,會導致大量的IO任務在等待

,導致吞吐率大幅度下降,即預設執行緒池比較適用於CPU密集型任務。

背景

最近接到一個工作任務,由於我們之前的下單介面速度過慢,光是下單介面需要1500ms左右,因此需要做一些優化,在梳理完業務邏輯後,發現有一些可以並行查詢或者非同步執行的地方。於是打算採用CompletableFuture來做非同步優化,提高執行速度。程式碼示例如下

 			//查詢使用者資訊
            CompletableFuture<JSONObject> userInfoFuture = CompletableFuture
                .supplyAsync(() -> proMemberService.queryUserByOpenIdInner(ordOrder.getOpenId()));
            //查詢積分商品資訊
            CompletableFuture<JSONObject> integralProInfoFuture = CompletableFuture
                .supplyAsync(() -> proInfoService
                    .getIntegralInfoCache(ordOrderIntegral.getProId()));

            //查詢會員積分資訊
            CompletableFuture<Integer> integerFuture = CompletableFuture
                .supplyAsync(() -> proMemberService
                    .getTotalIntegralByOpenId(ordOrder.getOpenId()));
          

經過

於是一頓操作,優化完畢,執行速度從1500ms下降到300ms左右,在經過本地和測試環境後,上線生產。眾所周知,CompletableFuture在沒有指定執行緒池的時候,會使用一個預設的ForkJoinPool執行緒池,也就是下面這個玩意。

 	public static ForkJoinPool commonPool() {
        // assert common != null : "static init error";
        return common;
    }

等發了生產之後看日誌列印的執行緒號,卻發現了一個極其詭異的事情。明明是同一套程式碼,生產環境的沒有用到預設的執行緒池。而測試環境和本地環境都使用了預設的ForkJoinPool執行緒池

這是測試和本地環境列印的執行緒日誌

這是生產環境列印的執行緒日誌

從日誌列印的執行緒編號可以看到,測試和本地環境都是從ForkJoinPool中取工作執行緒,但是生產環境卻是為每個任務建立了一個全新的執行緒。這是一個很危險的行為,假如這是一個併發比較高的介面,並且該介面使用了比較多的CompletableFuture來並行的執行任務。在高併發的時候,為每個任務都建立一個子執行緒,就會存線上程資源被耗盡的可能性,從而導致伺服器崩潰。

那這是為什麼呢?明明是同一套程式碼,在不同的機器上卻有不同的執行緒使用情況。

原因

在帶著疑問翻閱了CompletableFuture的原始碼之後,終於找到了原因:【是否使用預設的ForkJoinPool執行緒池,和機器的配置有關】

我們點進supplyAsync方法的原始碼

public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) {
        return asyncSupplyStage(asyncPool, supplier);
    }
    

可以看到這裡使用了預設使用了一個asyncPool,點進這個asyncPool

  //是否使用預設執行緒池的判斷依據
private static final Executor asyncPool = useCommonPool ?
        ForkJoinPool.commonPool() : new ThreadPerTaskExecutor();
//useCommonPool的來源
 private static final boolean useCommonPool =
        (ForkJoinPool.getCommonPoolParallelism() > 1);

其實程式碼看到這裡就很清晰了,CompletableFuture是否使用預設執行緒池,是根據這個useCommonPool的boolean值來的,如果為true,就使用預設的ForkJoinPool,否則就為每個任務建立一個新執行緒,也就是這個ThreadPerTaskExecutor,見名知義。

那這個useCommonPool的布林值什麼情況下才為true,也就是什麼時候才能使用到預設的執行緒池呢。即getCommonPoolParallelism()返回的值要大於1,我們繼續跟進這個getCommonPoolParallelism()方法

//類頂SMASK常量的值
static final int SMASK  = 0xffff;   
final int config;
static final ForkJoinPool common;

//該方法返回了一個commonParallelism的值
public static int getCommonPoolParallelism() {
        return commonParallelism;
    }


    //而commonParallelism的值是在一個靜態程式碼塊裡被初始化的,也就是類載入的時候初始化
static {
    	//初始化common,這個common即ForkJoinPool自身
        common = java.security.AccessController.doPrivileged
            (new java.security.PrivilegedAction<ForkJoinPool>() {
                public ForkJoinPool run() { return makeCommonPool(); }});
    //根據par的值來初始化commonParallelism的值
        int par = common.config & SMASK; // report 1 even if threads disabled
        commonParallelism = par > 0 ? par : 1;
    }

總結一下上面三部分程式碼,結合在一起看,這部分程式碼主要是初始化了commonParallelism的值,也就是getCommonPoolParallelism()方法的返回值,這個返回值也決定了是否使用預設執行緒池。而commonParallelism的值又是通過par的值來確定的,par的值是common來確定的,而common則是在makeCommonPool()這個方法中初始化的。

我們繼續跟進makeCommonPool()方法

private static ForkJoinPool makeCommonPool() {
        int parallelism = -1;
       
        if (parallelism < 0 && // default 1 less than #cores
            //獲取機器的cpu核心數 將機器的核心數-1 賦值給parallelism 這一段是是否使用執行緒池的關鍵
            //同時 parallelism也是ForkJoinPool的核心執行緒數
            (parallelism = Runtime.getRuntime().availableProcessors() - 1) <= 0)
            parallelism = 1;
        if (parallelism > MAX_CAP)
            parallelism = MAX_CAP;
        return new ForkJoinPool(parallelism, factory, handler, LIFO_QUEUE,
                                "ForkJoinPool.commonPool-worker-");
    }

//上面的那個構造方法,可以看到把parallelism賦值給了config變數
private ForkJoinPool(int parallelism,
                         ForkJoinWorkerThreadFactory factory,
                         UncaughtExceptionHandler handler,
                         int mode,
                         String workerNamePrefix) {
        this.workerNamePrefix = workerNamePrefix;
        this.factory = factory;
        this.ueh = handler;
        this.config = (parallelism & SMASK) | mode;
        long np = (long)(-parallelism); // offset ctl counts
        this.ctl = ((np << AC_SHIFT) & AC_MASK) | ((np << TC_SHIFT) & TC_MASK);
    }

總結一下上面兩段程式碼,獲取機器核心數-1的值,賦值給parallelism變數,再通過構造方法把parallelism的值賦值給config變數。

然後初始化ForkJoinPool的時候。再將config的值賦值給par變數。如果par大於0則將par的值賦給commonParallelism,如果commonParallelism的值大於1的話,useCommonPool的值就為true,就使用預設的執行緒池,否則就為每個任務建立一個新執行緒。另外即便是用到了預設執行緒池,池內的核心執行緒數,也為機器核心數-1。也就意味著假設你是4核機器,那最多也只有3個核心執行緒,對於IO密集型的任務來說,這其實遠遠不夠的

解釋

以上就是CompletableFuture中預設執行緒池使用依據的原始碼分析了。看完這一系列原始碼,就能解釋文章一開頭出現的那個問題。

因為我本地和測試環境機器的核心數是4核的,4減1大於1,所以在本地和測試環境的日誌上可以看出,使用了預設的執行緒池ForkJoinPool,而我們生產環境是雙核的機器。2減1不大於1,所以從生產環境的日誌看出,是為每個任務都建立了一個新執行緒。

總結

  • 使用CompletableFuture一定要自定義執行緒池
  • CompletableFuture是否使用預設執行緒池和機器核心數有關,當核心數減1大於1時才會使用預設執行緒池,否則將為每個任務建立一個新執行緒去處理
  • 即便使用到了預設執行緒池,池內最大執行緒數也是核心數減1,對io密集型任務是遠遠不夠的,會令大量任務等待,降低吞吐率
  • ForkJoinPool比較適用於CPU密集型的任務,比如說計算。