java非同步程式設計降低延遲
目錄
java非同步程式設計降低延遲
在平時開發的過程中,其實有很多可以採用多執行緒優化的地方,像ExecutorService、CompletionService、CompletableFuture和並行流等類,只是沒有去注意,這裡總結下日常工作中常用的一些方法。
一、ExecutorService和CompletionService
基本的execute和submit方法
這個其實沒有太多好說的,因為這個是最基本的,基本使用執行緒池的都會使用到這個方法,主要用於非同步執行任務,submit和execute的區別就在於,submit有一個方法的回執,可以利用這個Future對這個任務的生命週期進行干預。
invokeAll和invokeAny方法
很多人沒有注意到這兩個方法,這兩個方法其實也是非常有用的,例如你有很多可以並行執行的操作投遞到執行緒池,執行完之後就挨個呼叫Future的get獲取結果最後生成結果,這兩個步驟其實就是invokeAll已經封裝好的。他的內部實現也很簡單和你手動取每個值是一樣的,這個方法只會到所有任務執行完畢或者設定的時間超時了才會返回。實現非常簡單:
for (int i = 0, size = futures.size(); i < size; i++) { Future<T> f = futures.get(i); if (!f.isDone()) { try { f.get(); } catch (CancellationException ignore) { } catch (ExecutionException ignore) { } }
invokeAny方法稍微比invokeAll複雜些,內部是基於ExecutorCompletionService實現的。如果有一個任務返回了就直接返回結果,如果第一個完成的任務丟擲了異常那麼這個方法會丟擲對應的異常。
CompletionService
這個類名中文翻譯就是完成服務,這個類組合了ExecutorService,實現邏輯非常簡單,內部存放了一個阻塞佇列,當投遞的任務完成時會將對應的Future放入這個阻塞佇列,這樣就可以做到投遞的任務在完成的順序依次放入阻塞佇列。這就是上面invokeAny實現利用主要邏輯。利用阻塞佇列的poll和take方法,在第一個返回時就取消剩餘的任務。
雖然invokeAny已經封裝了CompletionService的邏輯但是有些場景這個類還是很有用的。比如現在我想要得到一個最先完成的但是沒有丟擲異常的,這種情況下我們就需要寫一個類似於invokeAny的例子。jdk註釋中給出了例子:
void solve(Executor e,
Collection<Callable<Result>> solvers)
throws InterruptedException {
CompletionService<Result> ecs
= new ExecutorCompletionService<Result>(e);
int n = solvers.size();
List<Future<Result>> futures
= new ArrayList<Future<Result>>(n);
Result result = null;
try {
for (Callable<Result> s : solvers)
futures.add(ecs.submit(s));
for (int i = 0; i < n; ++i) {
try {
Result r = ecs.take().get();
if (r != null) {
result = r;
break;
}
} catch (ExecutionException ignore) {}
}
}
finally {
for (Future<Result> f : futures)
f.cancel(true);
}
if (result != null)
use(result);
}
這個類也很容易想到一個場景,我有很多工是可以併發執行了,這時可以使用invokeAll,但是讓必須等到所有的任務執行完畢才能返回,這時如果有一個任務被io阻塞了很慢將會導致整個方法阻塞。如果是利用CompletionService的話,因為他是按照任務的完成順序往佇列裡放,所以我們可以全部提交後,利用他的poll或者take方法遍歷任務,先完成的任務返回就可以直接消費。
Future
講到這裡我覺得有必要提一下Future,因為執行緒池中投遞任務submit方法均為返回Future這個物件。Future你可以把它理解成對這個任務的建模,你得這個物件可以利用這個物件來管理任務的生命週期,例如get方法獲取結果,cancel來取消這個任務,以及isDnoe來判斷任務是否取消等。api沒有什麼難理解的地方,主要是取消任務這一塊需要結合中斷來理解,cancel引數的Boolean值就是說能不能給這個任務發中斷,如果可以他內部實際就是通過中斷來停止任務,需要使用者程式碼響應中斷。FutureTask中的cancel原始碼如下:
if (mayInterruptIfRunning) {
try {
Thread t = runner;
if (t != null)
t.interrupt();
} finally { // final state
UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
}
}
二、CompletableFuture(重要)
上面簡單說了下Future,Future是jdk5.0就已經引進的,但是他的能力非常的弱,主要是缺少了一個回撥的機制,很多框架都基於它提供了增強版像guava的ListenableFuture
和spring中的ListenableFuture
。直到java8出現了CompletableFuture才彌補了jdk的這個特性。
可能很多人沒有注意到這個類,因為平時沒關注這方面,其實如果好好的學習下這個類就會發現這個類的功能非常強大,和stream類似的設計思想,使用非常簡潔。可以基於教程好好研究一下,這裡介紹下常用的操作。
在以前我們投遞到執行緒中任務返回的Future中,我們只能實現一些簡單的輪詢,取消等api。如果現在有這樣的一些類似的需求:
執行一個任務,當任務執行完的時候執行一個動作(相當於任務執行完觸發回撥)
CompletableFuture.runAsync(() -> System.out.println("hello word")).whenComplete((aVoid, throwable) -> System.out.println("任務完成"));
任務執行完的時候在發起另外一個任務(這裡是有順序性的,第二個依賴於第一個任務)
CompletableFuture.supplyAsync(() -> 12).thenApply(Function.identity()).thenAccept(System.out::println);
同時執行多個任務,當全部完成的時候執行一個動作
Integer join = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return 1;
}).thenCombine(CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return 2;
}), Integer::sum).join();
可以看到上面的這些任務都不簡單,但是如果使用CompletionStage卻變得十分容易,對任務的組合正是CompletableFuture的強項,下面大概說下CompletionStage的api。
通過觀察就會發現CompletionStage的一種型別操作都會有三種過載形式,分別如下:
- XXX()
- XXXAsync()
- XXXAsync(executors引數)
第一個表示這個操作在當前執行緒池中的當前執行緒直接執行,第二個表示會重新投遞到當前執行緒池執行,第三個則表示會重新投遞到傳入的執行緒池繼續執行。一般情況都會採用第一種形式和第三種,第一種是最常用的,可以減少執行緒的上下文切換,第三種情況主要使用者切換執行緒池,我們很多時候會根據不同的任務比如io密集型,cpu密集型建立不同的執行緒池,考慮這樣一種情況,我建立了一個ncpu+1的執行緒池1,和一個上限500的執行緒池2,第一個任務是cpu密集型,第二個依賴於第一個且是io密集型,這時候我們可以選擇將第一個投遞到執行緒池1,然後第二個通過傳入執行緒池引數重新投遞到執行緒池2中執行。
主要方法:
- thenRun 前一個任務正常執行完後執行一個任務
- thenAccept
- thenApply
- thenCompose
- handle 前一個任務執行完後執行(包括前一個丟擲異常),如果前面任務未丟擲異常當前任務丟擲異常則結果就是丟擲異常,如果前面任務丟擲異常e1當前任務丟擲異常e2則結果就是丟擲異常e1
- whenComplete
- exceptionally 如果前一個任務未丟擲異常則不執行
- XXXBoth(thenCombine) 兩個任務全都正常完成時執行
- XXXEither 當最先完成的任務是正常完成時執行
上面的方法大致可以再分為三組,分別是1-4,5-7,8-9。
1-4: 這個很好理解,四個方法都十分類似,這裡要提下的就是thenApply和thenCompose,會感覺有點難理解,你可以從java的stream中map和flatMap的角度來理解,這兩個方法做的事實際是一樣的,只不過形式不一樣,主要也是在和stream結合的過程中會用到,所以一般很少用到thenCompose。另外有一點很重要:如果前一個任務執行過程丟擲了異常那麼這個任務就不會執行也不會有任何提示,除非你呼叫CompletableFuture的get等獲取結果的方法會再次得到異常,不然這個異常資訊就丟了,需要十分注意這個點。
5-7: 1-4的方法如果前面的任務丟擲異常則會導致1-4任務的不執行。5-7的方法都可以對異常進行處理,如果前一個丟擲了異常,會有引數傳入,可以做相應的處理,很多時候可以利用這三個方法來記錄日誌,你看whenComplete就完全是一個透傳的效果
8-9: 這兩組型別分別是對兩個任務的與和或條件的組合工具方法。沒什麼難理解的,主要就是thenCombine,其實他實現的語義就是applyAfterBoth,只不過名字稍微不同而已。另外說下XXXEither,他的語義是任意一個任務執行完成後執行相應的動作,需要注意的地方就是如果最先完成的那個任務丟擲的是異常這個任務就不會執行。
再次提醒下異常,如果你對最後的CompletablFuture會呼叫get等取結果的方法那沒什麼,執行過程中丟擲的異常會再次丟擲,但是如果你只是呼叫後不再去取結果,就像thenApply結尾那麼就一定要非常小心,如果前一個方法丟擲異常你的thenApply的任務便不會執行,而且都沒有什麼提示。你可以對相應的任務包裝列印異常在rethrow,如下所示:
public class AsyncableWrapper {
public static final Runnable of(Runnable runnable) {
return () -> {
try {
runnable.run();
} catch (Exception e) {
log.error("執行非同步任務失敗", e);
throw e;
}
};
}
public <T> Consumer<T> ofConsumer(Consumer<T> consumer) {
return o -> {
try {
consumer.accept(o);
} catch (Exception e) {
log.error("執行非同步任務失敗,", e);
throw e;
}
};
}
public <T> Supplier<T> ofSupply(Supplier<T> supplier) {
return () -> {
try {
return supplier.get();
} catch (Exception e) {
log.error("執行非同步任務失敗,", e);
throw e;
}
};
}
}
下面的這個程式碼你可以試試執行的結果,用來測試異常:
CompletableFuture.supplyAsync(() -> 1).whenComplete((o, throwable) -> {
System.out.println("when1");
throw new RuntimeException();
}).thenAccept(o -> {
System.out.println("accetp2");
}).thenApply(aVoid -> {
System.out.println("apply");
return 1;
}).whenComplete((integer, throwable) -> {
System.out.println("when2");
}).thenAccept(integer -> {
System.out.println("accetp");
});
獲取結果
join: 在之前Future的api中,只有get方法,會丟擲受檢查異常,受檢查在lambda表示式中需要捕獲使得程式碼看上去不那麼美觀,因此引入了join,除了包裝了受檢異常,其他行為和get一樣
getNow:可以立即返回,引數可以寫入預設值,在輪詢的場景中會有用到
工具方法
anyOf:用於等待一組任務任意一個最先完成
allOf:用於等待一組任務全部完成
這兩個方法和上面的XXXEither和XXXBoth很相似,只不過是多個CompletablFuture。丟擲異常的規則也是一樣的。常用形式如下:
CompletableFuture.allOf(c1, c2, c3).join();
三、stream中的parallel(並行流)
在處理一批任務的時候,大部分場景都會有個集合,例如一個id列表,然後我們需要獲取每個id的資訊,通過http介面,但是沒有批量介面,這時候我們可以採用並行來提高效能。
流中有個簡單的方法parallel可以並行執行,如下所示:
Arrays.stream(new int[]{1, 2, 3}).parallel().forEach(operand -> {
try {
System.out.println("執行任務:" + operand);
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
上面的轉換成並行流十分簡單,執行時間一秒左右,就多了一個方法呼叫。那麼如果每個任務執行完還有第二個步驟那怎麼辦呢,很容易想到結合ComplablFuture使用,那就是如下的形式:
CompletableFuture[] completableFutures = Arrays.stream(new Integer[]{1, 2, 3})
.map(operand -> CompletableFuture.supplyAsync(() -> {
try {
System.out.println("執行任務:" + operand);
Thread.sleep(1000);
return operand;
} catch (InterruptedException e) {
e.printStackTrace();
return 0;
}
})).map(integerCompletableFuture -> integerCompletableFuture.thenApply(integer -> {
try {
Thread.sleep(1000);
} catch (Exception e) {
}
return integer;
})).toArray(CompletableFuture[]::new);
CompletableFuture.allOf(completableFutures).join();
上面的耗時大概在2秒左右,Stream和ComplablFuture組合功能是十分強大的,另外你可以注意到上面的程式碼中我移除了parallel()方法,因為ComplablFuture本身就利用了執行緒池,再利用parallel()是沒必要的,反而會增加執行緒上下文的切換。
實際執行的執行緒池
在使用並行流和非同步的過程中,肯定會非常好奇到底實際執行程式碼是在哪裡,非同步可能會好理解些,因為他很多方法例如thenAcceptAsync提供了執行緒池是你可以配置的。但是如果不傳的那麼實際使用的是ForkJoinPool。程式碼如下:
private static final Executor asyncPool = useCommonPool ?
ForkJoinPool.commonPool() : new ThreadPerTaskExecutor();
並行流內部使用的也是上述的執行緒池,但是並行流卻沒有提供顯示設定執行緒池的方法,這就導致有些阻塞的方法不適合採用並行流,其實他也是可以設定執行緒池的只不過不是像你想的那樣,程式碼如下:
ExecutorService executorService = Executors.newWorkStealingPool();
executorService.execute(() -> {
CompletableFuture.supplyAsync(() -> {
System.out.println(1);
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
throw new RuntimeException();
}).runAfterEither(CompletableFuture.supplyAsync(() -> {
System.out.println(2);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return 1;
}), () -> System.out.println(123));
});
關鍵點就在於上面程式碼中的executorService,他是ForkJoinPool的例項,並行流執行的過程中如果發現當前執行緒是ForkJoinPool的例項,那麼會利用當前的ForkJoinPool來並行執行,從而改變了執行緒池。如果只是計算操作,沒有涉及io和鎖等阻塞那麼使用預設的執行緒池是很不錯的行為,就像平時對集合使用stream來計算就完全沒必要改變執行緒池。但是在使用執行緒池提高效能的很多時候都會涉及io操作,如Rpc,Db,Http等操作,這時候完全有必要根據相應的業務場景提供一個合適的執行緒池,而不是使用一個統一的執行緒池。關於利用並行stream還是ComplablFuture,如果不涉及io以及任務組合等操作,我更會傾向使用stream,更多的情況下我會選擇使用ComplablFuture,結構更清晰。
關於執行緒池的總結可以參考我的這篇文章 java執行緒池和中斷總結
四、實際使用的另外一點總結:
剛開始接觸非同步的時候覺得他是提升效能的銀彈,但是其實很多技術都有適合的場景,不能為了技術而技術。這裡舉個反例,當時在公司呼叫rpc的時候例如根據id列表批量獲取資訊,因為不想麻煩別人有剛好試試非同步api,就採用了每個id呼叫一次,利用非同步來降低延遲的方案,後來實際證明是非常錯誤的!假設有100個id,如果對方提供了批量操作的rpc,那麼一次往返即可,採用非同步方案多增加199次呼叫,吞吐量嚴重降低,另外因為介面有呼叫限制,併發上去後介面的全部返回失敗!這種場景rpc每個操作耗時短,就非常適合提供批量操作而不是每個呼叫一次。
再來個正面例子,公司最近對接騰訊雲的人臉識別服務,因為是Http介面,而且每個介面返回比較慢,所以非常適合採用執行緒池和非同步來優化延遲。