1. 程式人生 > >JDK8 併發新特性學習 (一) CompletableFuture

JDK8 併發新特性學習 (一) CompletableFuture

JDK8 併發新特性學習 (一) CompletableFuture

JDK 8 java.util.concurrent新增加的兩個介面和四個類

  • CompletableFutre.AsynchronousCompletionTask
    一個沒有方法的裝飾介面,用來標識非同步的任務
  • CompletionStage<T>
    一個多個非同步計算的介面,對多個非同步計算進行組合、過濾、異常處理等操作。
  • ConcurrentHashMap.KeySet<K,V>
    一個ConcurrentHashMap的key的view
  • CounterCompleter<T>

    一個ForkJoinTask,當任務執行完成時候執行
  • CompletionException
    在完成一個任務時候丟擲的異常.

這次我們先學習的是CompletableFuture。首先我們看這個類實現了Future和CompletionStage介面,所以它既有
Future的功能,也有一些非同步計算的功能。

個人理解CompletableFuture的功能主要分為幾類:

  1. 生產一個CompletableFuture,使用supplyAsync
  2. 轉換和使用上一個結構,主要使用accept
  3. 單個異常處理exceptionally
  4. 組合結果combine
    compose
  5. 等待完成之後執行,主要使用acceptEitherrunEitherallOfanyOf

  1. 建立一個非同步任務,列印日誌資訊

    private static void create() throws ExecutionException, InterruptedException {
        CompletableFuture<String> future = CompletableFuture.<String>supplyAsync(() -> sendMessage("1"));
    }
    
    
    
    private static
    String sendMessage(String id) { log.info("這裡是傳送資訊{}", id); return "資訊" + id; } public static void main(String[] args) throws Throwable { log.info("啟動"); create(); log.info("結束"); Thread.sleep(3 * 1000L); }

    最後的輸出結果是:

    10:15:37.918 [main] INFO  c.k.j.c.CompletableFutureDemo - 啟動
    10:15:38.004 [main] INFO  c.k.j.c.CompletableFutureDemo - 結束
    10:15:38.004 [ForkJoinPool.commonPool-worker-1] INFO  c.k.j.c.CompletableFutureDemo - 這裡是傳送資訊1

    可見是非同步的,如果沒有提供Executor,預設使用ForkJoinPool的commonPool

  2. 我們對一個結果進行處理,處理多次

    private static void run() throws ExecutionException, InterruptedException {
            CompletableFuture<String> future = CompletableFuture.<String>supplyAsync(() -> sendMessage("1"));
            CompletableFuture<Void> acceptFuture = future.thenAccept(s -> log.info(s.concat("第一步的accept")));
            CompletableFuture<Void> voidCompletableFuture = acceptFuture.thenAccept(aVoid -> log.info("第二步的accept, 返回值已經為void了, 返回值: {}", aVoid));
        }

    輸出結果:

    10:45:34.296 [main] INFO  c.k.j.c.CompletableFutureDemo - 啟動
    10:45:34.367 [ForkJoinPool.commonPool-worker-1] INFO  c.k.j.c.CompletableFutureDemo - 這裡是傳送資訊1
    10:45:34.367 [main] INFO  c.k.j.c.CompletableFutureDemo - 資訊1第一步的accept
    10:45:34.368 [main] INFO  c.k.j.c.CompletableFutureDemo - 第二步的accept, 返回值已經為void了, 返回值: null
    10:45:34.368 [main] INFO  c.k.j.c.CompletableFutureDemo - 結束

    在這裡我們觀察到CompletableFutre的thenApply是每次對上一個completableFuture做處理,thenApply的引數是一個Consumer,所以沒有返回值。在第二次進行處理的時候已經成Consumer<? extends Void>了.

    那假如我們使用thenApplySync效果會是什麼呢

    private static void run() throws ExecutionException, InterruptedException {
        CompletableFuture<String> future = CompletableFuture.<String>supplyAsync(() -> sendMessage("1"));
        // CompletableFuture<Void> acceptFuture = future.thenAccept(s -> log.info(s.concat("第一步的accept")));
        // CompletableFuture<Void> voidCompletableFuture = acceptFuture.thenAccept(aVoid -> log.info("第二步的accept, 返回值已經為void了, 返回值: {}", aVoid));
        CompletableFuture<Void> acceptSyncFuture = future.thenAcceptAsync(s -> log.info(s.concat("第一步的accept")));
        acceptSyncFuture.thenAcceptAsync(aVoid -> log.info("第二步的accept, 返回值已經為void了, 返回值: {}", aVoid));
    }

    輸出結果:

    10:47:07.286 [main] INFO  c.k.j.c.CompletableFutureDemo - 啟動
    10:47:07.361 [main] INFO  c.k.j.c.CompletableFutureDemo - 結束
    10:47:07.363 [ForkJoinPool.commonPool-worker-1] INFO  c.k.j.c.CompletableFutureDemo - 這裡是傳送資訊1
    10:47:07.363 [ForkJoinPool.commonPool-worker-1] INFO  c.k.j.c.CompletableFutureDemo - 資訊1第一步的accept
    10:47:07.363 [ForkJoinPool.commonPool-worker-1] INFO  c.k.j.c.CompletableFutureDemo - 第二步的accept, 返回值已經為void了, 返回值: null

    上述輸出結果順序可能會有所變化,如果我們使用.thenAcceptAsync,那麼就會保持使用非同步的執行緒執行accept,而.thenAccept會類似於future.get等待上一個非同步的結果,最後在主執行緒裡執行

  3. 單個異常處理

    CompletableFuture<String> future = CompletableFuture.<String>supplyAsync(() -> sendMessage("1"));
    CompletableFuture<String> exceptionFuture = future.exceptionally(e -> {
            log.info("發生了異常", e);
            return e.getMessage();
        });
    exceptionFuture.thenAccept(s -> log.info("這是accept".concat(s)));

    測試說出說明,如果future正常完成,那麼將會走thenAccept,否則會進行異常處理

  4. combine 和 compose

    combine是兩個CompletableFuture結合起來,將前一個和後一個結合起來。compose類似於flatmap的作用,會將一個future的返回值,組合成一個新的completableFuture

    CompletableFuture<String> future = CompletableFuture.<String>supplyAsync(() -> sendMessage("1"));
    CompletableFuture<String> secondFuture = CompletableFuture.<String>supplyAsync(() -> sendMessage("2"));
    
    CompletableFuture<String> combineFuture = secondFuture.thenCombine(future, (s, v) -> {
        log.info("這裡是combine, {}", s + v);
        return s + v;
    });
    CompletableFuture<String> future = CompletableFuture.<String>supplyAsync(() -> sendMessage("1"));
    CompletableFuture<String> secondFuture = CompletableFuture.<String>supplyAsync(() -> sendMessage("2"));
    
    CompletableFuture<String> composeFuture = future.thenCompose(s -> {
        log.info("compose的引數值是{}", s);
        return CompletableFuture.<String>supplyAsync(() -> "compose的future" + s);
    });
  5. 任務完成之後執行,這裡我們只假設一個場景,就是我們要併發做四件事,然後做完之後再列印成功日誌

    CompletableFuture.allOf(future, secondFuture, thirdFuture, forthFuture).thenAccept(o -> log.info("完成任務"));

現在CompletableFuture介紹的差不多了,JDK8的非同步程式設計的確方便了很多,而且像stream一樣。還有handle和complete等其他方法,可以去看看。CompletableFuture就介紹到這兒了。