JDK8 併發新特性學習 (一) CompletableFuture
JDK8 併發新特性學習 (一) CompletableFuture
JDK 8 java.util.concurrent
新增加的兩個介面和四個類
CompletableFutre.AsynchronousCompletionTask
一個沒有方法的裝飾介面,用來標識非同步的任務CompletionStage<T>
一個多個非同步計算的介面,對多個非同步計算進行組合、過濾、異常處理等操作。ConcurrentHashMap.KeySet<K,V>
一個ConcurrentHashMap的key的viewCounterCompleter<T>
一個ForkJoinTask,當任務執行完成時候執行CompletionException
在完成一個任務時候丟擲的異常.
這次我們先學習的是CompletableFuture。首先我們看這個類實現了Future和CompletionStage介面,所以它既有
Future的功能,也有一些非同步計算的功能。
個人理解CompletableFuture的功能主要分為幾類:
- 生產一個CompletableFuture,使用
supplyAsync
- 轉換和使用上一個結構,主要使用
accept
- 單個異常處理
exceptionally
- 組合結果
combine
compose
- 等待完成之後執行,主要使用
acceptEither
、runEither
、allOf
和anyOf
建立一個非同步任務,列印日誌資訊
private static void create() throws ExecutionException, InterruptedException { CompletableFuture<String> future = CompletableFuture.<String>supplyAsync(() -> sendMessage("1")); } private static
最後的輸出結果是:
10:15:37.918 [main] INFO c.k.j.c.CompletableFutureDemo - 啟動 10:15:38.004 [main] INFO c.k.j.c.CompletableFutureDemo - 結束 10:15:38.004 [ForkJoinPool.commonPool-worker-1] INFO c.k.j.c.CompletableFutureDemo - 這裡是傳送資訊1
可見是非同步的,如果沒有提供Executor,預設使用ForkJoinPool的commonPool
我們對一個結果進行處理,處理多次
private static void run() throws ExecutionException, InterruptedException { CompletableFuture<String> future = CompletableFuture.<String>supplyAsync(() -> sendMessage("1")); CompletableFuture<Void> acceptFuture = future.thenAccept(s -> log.info(s.concat("第一步的accept"))); CompletableFuture<Void> voidCompletableFuture = acceptFuture.thenAccept(aVoid -> log.info("第二步的accept, 返回值已經為void了, 返回值: {}", aVoid)); }
輸出結果:
10:45:34.296 [main] INFO c.k.j.c.CompletableFutureDemo - 啟動 10:45:34.367 [ForkJoinPool.commonPool-worker-1] INFO c.k.j.c.CompletableFutureDemo - 這裡是傳送資訊1 10:45:34.367 [main] INFO c.k.j.c.CompletableFutureDemo - 資訊1第一步的accept 10:45:34.368 [main] INFO c.k.j.c.CompletableFutureDemo - 第二步的accept, 返回值已經為void了, 返回值: null 10:45:34.368 [main] INFO c.k.j.c.CompletableFutureDemo - 結束
在這裡我們觀察到CompletableFutre的thenApply是每次對上一個completableFuture做處理,thenApply的引數是一個Consumer,所以沒有返回值。在第二次進行處理的時候已經成
Consumer<? extends Void>
了.那假如我們使用
thenApplySync
效果會是什麼呢private static void run() throws ExecutionException, InterruptedException { CompletableFuture<String> future = CompletableFuture.<String>supplyAsync(() -> sendMessage("1")); // CompletableFuture<Void> acceptFuture = future.thenAccept(s -> log.info(s.concat("第一步的accept"))); // CompletableFuture<Void> voidCompletableFuture = acceptFuture.thenAccept(aVoid -> log.info("第二步的accept, 返回值已經為void了, 返回值: {}", aVoid)); CompletableFuture<Void> acceptSyncFuture = future.thenAcceptAsync(s -> log.info(s.concat("第一步的accept"))); acceptSyncFuture.thenAcceptAsync(aVoid -> log.info("第二步的accept, 返回值已經為void了, 返回值: {}", aVoid)); }
輸出結果:
10:47:07.286 [main] INFO c.k.j.c.CompletableFutureDemo - 啟動 10:47:07.361 [main] INFO c.k.j.c.CompletableFutureDemo - 結束 10:47:07.363 [ForkJoinPool.commonPool-worker-1] INFO c.k.j.c.CompletableFutureDemo - 這裡是傳送資訊1 10:47:07.363 [ForkJoinPool.commonPool-worker-1] INFO c.k.j.c.CompletableFutureDemo - 資訊1第一步的accept 10:47:07.363 [ForkJoinPool.commonPool-worker-1] INFO c.k.j.c.CompletableFutureDemo - 第二步的accept, 返回值已經為void了, 返回值: null
上述輸出結果順序可能會有所變化,如果我們使用
.thenAcceptAsync
,那麼就會保持使用非同步的執行緒執行accept,而.thenAccept
會類似於future.get等待上一個非同步的結果,最後在主執行緒裡執行單個異常處理
CompletableFuture<String> future = CompletableFuture.<String>supplyAsync(() -> sendMessage("1")); CompletableFuture<String> exceptionFuture = future.exceptionally(e -> { log.info("發生了異常", e); return e.getMessage(); }); exceptionFuture.thenAccept(s -> log.info("這是accept".concat(s)));
測試說出說明,如果future正常完成,那麼將會走thenAccept,否則會進行異常處理
combine 和 compose
combine是兩個CompletableFuture結合起來,將前一個和後一個結合起來。compose類似於flatmap的作用,會將一個future的返回值,組合成一個新的completableFuture
CompletableFuture<String> future = CompletableFuture.<String>supplyAsync(() -> sendMessage("1")); CompletableFuture<String> secondFuture = CompletableFuture.<String>supplyAsync(() -> sendMessage("2")); CompletableFuture<String> combineFuture = secondFuture.thenCombine(future, (s, v) -> { log.info("這裡是combine, {}", s + v); return s + v; });
CompletableFuture<String> future = CompletableFuture.<String>supplyAsync(() -> sendMessage("1")); CompletableFuture<String> secondFuture = CompletableFuture.<String>supplyAsync(() -> sendMessage("2")); CompletableFuture<String> composeFuture = future.thenCompose(s -> { log.info("compose的引數值是{}", s); return CompletableFuture.<String>supplyAsync(() -> "compose的future" + s); });
任務完成之後執行,這裡我們只假設一個場景,就是我們要併發做四件事,然後做完之後再列印成功日誌
CompletableFuture.allOf(future, secondFuture, thirdFuture, forthFuture).thenAccept(o -> log.info("完成任務"));
現在CompletableFuture介紹的差不多了,JDK8的非同步程式設計的確方便了很多,而且像stream一樣。還有handle和complete等其他方法,可以去看看。CompletableFuture就介紹到這兒了。