生產問題之CompletableFuture預設執行緒池踩坑,請務必自定義執行緒池
前言
先說結論,沒興趣瞭解原因的可以只看此處的結論
CompletableFuture是否使用預設執行緒池的依據,和機器的CPU核心數有關。當CPU核心數-1大於1時,才會使用預設的執行緒池,否則將會為每個CompletableFuture的任務建立一個新執行緒去執行。
即,CompletableFuture的預設執行緒池,只有在雙核以上的機器內才會使用。在雙核及以下的機器中,會為每個任務建立一個新執行緒,等於沒有使用執行緒池,且有資源耗盡的風險。
因此建議,在使用CompletableFuture時,務必要自定義執行緒池。因為即便是用到了預設執行緒池,池內的核心執行緒數,也為機器核心數-1。也就意味著假設你是4核機器,那最多也只有3個核心執行緒,對於CPU密集型的任務來說倒還好,但是我們平常寫業務程式碼,更多的是IO密集型任務,對於IO密集型的任務來說,這其實遠遠不夠用的,會導致大量的IO任務在等待
背景
最近接到一個工作任務,由於我們之前的下單介面速度過慢,光是下單介面需要1500ms左右,因此需要做一些優化,在梳理完業務邏輯後,發現有一些可以並行查詢或者非同步執行的地方。於是打算採用CompletableFuture來做非同步優化,提高執行速度。程式碼示例如下
//查詢使用者資訊 CompletableFuture<JSONObject> userInfoFuture = CompletableFuture .supplyAsync(() -> proMemberService.queryUserByOpenIdInner(ordOrder.getOpenId())); //查詢積分商品資訊 CompletableFuture<JSONObject> integralProInfoFuture = CompletableFuture .supplyAsync(() -> proInfoService .getIntegralInfoCache(ordOrderIntegral.getProId())); //查詢會員積分資訊 CompletableFuture<Integer> integerFuture = CompletableFuture .supplyAsync(() -> proMemberService .getTotalIntegralByOpenId(ordOrder.getOpenId()));
經過
於是一頓操作,優化完畢,執行速度從1500ms下降到300ms左右,在經過本地和測試環境後,上線生產。眾所周知,CompletableFuture在沒有指定執行緒池的時候,會使用一個預設的ForkJoinPool執行緒池,也就是下面這個玩意。
public static ForkJoinPool commonPool() {
// assert common != null : "static init error";
return common;
}
等發了生產之後看日誌列印的執行緒號,卻發現了一個極其詭異的事情。明明是同一套程式碼,生產環境的沒有用到預設的執行緒池。而測試環境和本地環境都使用了預設的ForkJoinPool執行緒池
這是測試和本地環境列印的執行緒日誌
這是生產環境列印的執行緒日誌
從日誌列印的執行緒編號可以看到,測試和本地環境都是從ForkJoinPool中取工作執行緒,但是生產環境卻是為每個任務建立了一個全新的執行緒。這是一個很危險的行為,假如這是一個併發比較高的介面,並且該介面使用了比較多的CompletableFuture來並行的執行任務。在高併發的時候,為每個任務都建立一個子執行緒,就會存線上程資源被耗盡的可能性,從而導致伺服器崩潰。
那這是為什麼呢?明明是同一套程式碼,在不同的機器上卻有不同的執行緒使用情況。
原因
在帶著疑問翻閱了CompletableFuture的原始碼之後,終於找到了原因:【是否使用預設的ForkJoinPool執行緒池,和機器的配置有關】
我們點進supplyAsync方法的原始碼
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) {
return asyncSupplyStage(asyncPool, supplier);
}
可以看到這裡使用了預設使用了一個asyncPool,點進這個asyncPool
//是否使用預設執行緒池的判斷依據
private static final Executor asyncPool = useCommonPool ?
ForkJoinPool.commonPool() : new ThreadPerTaskExecutor();
//useCommonPool的來源
private static final boolean useCommonPool =
(ForkJoinPool.getCommonPoolParallelism() > 1);
其實程式碼看到這裡就很清晰了,CompletableFuture是否使用預設執行緒池,是根據這個useCommonPool的boolean值來的,如果為true,就使用預設的ForkJoinPool,否則就為每個任務建立一個新執行緒,也就是這個ThreadPerTaskExecutor,見名知義。
那這個useCommonPool的布林值什麼情況下才為true,也就是什麼時候才能使用到預設的執行緒池呢。即getCommonPoolParallelism()返回的值要大於1,我們繼續跟進這個getCommonPoolParallelism()方法
//類頂SMASK常量的值
static final int SMASK = 0xffff;
final int config;
static final ForkJoinPool common;
//該方法返回了一個commonParallelism的值
public static int getCommonPoolParallelism() {
return commonParallelism;
}
//而commonParallelism的值是在一個靜態程式碼塊裡被初始化的,也就是類載入的時候初始化
static {
//初始化common,這個common即ForkJoinPool自身
common = java.security.AccessController.doPrivileged
(new java.security.PrivilegedAction<ForkJoinPool>() {
public ForkJoinPool run() { return makeCommonPool(); }});
//根據par的值來初始化commonParallelism的值
int par = common.config & SMASK; // report 1 even if threads disabled
commonParallelism = par > 0 ? par : 1;
}
總結一下上面三部分程式碼,結合在一起看,這部分程式碼主要是初始化了commonParallelism的值,也就是getCommonPoolParallelism()方法的返回值,這個返回值也決定了是否使用預設執行緒池。而commonParallelism的值又是通過par的值來確定的,par的值是common來確定的,而common則是在makeCommonPool()這個方法中初始化的。
我們繼續跟進makeCommonPool()方法
private static ForkJoinPool makeCommonPool() {
int parallelism = -1;
if (parallelism < 0 && // default 1 less than #cores
//獲取機器的cpu核心數 將機器的核心數-1 賦值給parallelism 這一段是是否使用執行緒池的關鍵
//同時 parallelism也是ForkJoinPool的核心執行緒數
(parallelism = Runtime.getRuntime().availableProcessors() - 1) <= 0)
parallelism = 1;
if (parallelism > MAX_CAP)
parallelism = MAX_CAP;
return new ForkJoinPool(parallelism, factory, handler, LIFO_QUEUE,
"ForkJoinPool.commonPool-worker-");
}
//上面的那個構造方法,可以看到把parallelism賦值給了config變數
private ForkJoinPool(int parallelism,
ForkJoinWorkerThreadFactory factory,
UncaughtExceptionHandler handler,
int mode,
String workerNamePrefix) {
this.workerNamePrefix = workerNamePrefix;
this.factory = factory;
this.ueh = handler;
this.config = (parallelism & SMASK) | mode;
long np = (long)(-parallelism); // offset ctl counts
this.ctl = ((np << AC_SHIFT) & AC_MASK) | ((np << TC_SHIFT) & TC_MASK);
}
總結一下上面兩段程式碼,獲取機器核心數-1的值,賦值給parallelism變數,再通過構造方法把parallelism的值賦值給config變數。
然後初始化ForkJoinPool的時候。再將config的值賦值給par變數。如果par大於0則將par的值賦給commonParallelism,如果commonParallelism的值大於1的話,useCommonPool的值就為true,就使用預設的執行緒池,否則就為每個任務建立一個新執行緒。另外即便是用到了預設執行緒池,池內的核心執行緒數,也為機器核心數-1。也就意味著假設你是4核機器,那最多也只有3個核心執行緒,對於IO密集型的任務來說,這其實遠遠不夠的。
解釋
以上就是CompletableFuture中預設執行緒池使用依據的原始碼分析了。看完這一系列原始碼,就能解釋文章一開頭出現的那個問題。
因為我本地和測試環境機器的核心數是4核的,4減1大於1,所以在本地和測試環境的日誌上可以看出,使用了預設的執行緒池ForkJoinPool,而我們生產環境是雙核的機器。2減1不大於1,所以從生產環境的日誌看出,是為每個任務都建立了一個新執行緒。
總結
- 使用CompletableFuture一定要自定義執行緒池
- CompletableFuture是否使用預設執行緒池和機器核心數有關,當核心數減1大於1時才會使用預設執行緒池,否則將為每個任務建立一個新執行緒去處理
- 即便使用到了預設執行緒池,池內最大執行緒數也是核心數減1,對io密集型任務是遠遠不夠的,會令大量任務等待,降低吞吐率
- ForkJoinPool比較適用於CPU密集型的任務,比如說計算。