1. 程式人生 > >java8 之CompletableFuture -- 如何構建非同步應用

java8 之CompletableFuture -- 如何構建非同步應用

什麼是Future 介面

很多場景下,我們想去獲取執行緒執行的結果,而通常使用execute方法去提交任務是無法獲得結果的,這時候我們常常會改用submit方法去提交,以便獲得執行緒執行的結果。

而submit方法返回的就是Future,一個未來物件。 使用future.get() 方法去獲取執行緒執行結果,包括如果出現異常,也會隨get方法丟擲。

 

Future 介面的缺陷

當我們使用future.get()方法去取得執行緒執行結果時,要知道get方法是阻塞的,也就是說為了拿到結果,當主執行緒執行到get()方法,當前執行緒會去等待非同步任務執行完成,

換言之,非同步的效果在我們使用get()拿結果時,會變得無效。示例如下

public static void main(String[] args) throws Exception{
        ExecutorService executorService = Executors.newSingleThreadExecutor();
        Future future = executorService.submit(()->{
            try {
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("非同步任務執行了");
        });
        future.get();
System.out.println("主線任務執行了"); }

列印結果是:非同步任務執行了過後主線任務才執行。  就是因為get()在一直等待。

那麼如何解決我想要拿到結果,可以對結果進行處理,又不想被阻塞呢?

 

CompletableFuture 使一切變得可能

JDK1.8才新加入的一個實現類CompletableFuture,實現了Future<T>CompletionStage<T>兩個介面。

實際開發中,我們常常面對如下的幾種場景:

1.  針對Future的完成事件,不想簡單的阻塞等待,在這段時間內,我們希望可以正常繼續往下執行,所以在它完成時,我們可以收到回撥即可。

2. 面對Future集合來講,這其中的每個Future結果其實很難去描述它們之間的依賴關係,而往往我們希望等待所有的Future集合都完成,然後做一些事情。

3. 在非同步計算中,兩個計算任務相互獨立,但是任務二又依賴於任務一的結果。

如上的幾種場景,單靠Future是解決不了的,而CompletableFuture則可以幫我們實現。

 

CompletableFuture 常見api 介紹

1、 runAsync 和 supplyAsync方法

 它提供了四個方法來建立一個非同步任務

public static CompletableFuture<Void> runAsync(Runnable runnable)
public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor)
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)

 

runAsync類似於execute方法,不支援返回值,而supplyAsync方法類似submit方法,支援返回值。也是我們的重點方法。

沒有指定Executor的方法會使用ForkJoinPool.commonPool() 作為它的執行緒池執行非同步程式碼。

  示例

   //無返回值
    CompletableFuture<Void> future1 = CompletableFuture.runAsync(() -> {
        System.out.println("runAsync無返回值");
    });

    future1.get();

    //有返回值
    CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
        System.out.println("supplyAsync有返回值");
        return "111";
    });

    String s = future2.get();

 

2、 非同步任務執行完時的回撥方法  whenComplete 和 exceptionally

  當CompletableFuture的計算結果完成,或者丟擲異常的時候,可以執行特定的任務

public CompletableFuture<T> whenComplete(BiConsumer<? super T,? super Throwable> action)
public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T,? super Throwable> action)
public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T,? super Throwable> action, Executor executor)
public CompletableFuture<T> exceptionally(Function<Throwable,? extends T> fn)

 

這些方法都是上述建立的非同步任務完成後 (也可能是丟擲異常後結束) 所執行的方法。

whenComplete和whenCompleteAsync方法的區別在於:前者是由上面的執行緒繼續執行,而後者是將whenCompleteAsync的任務繼續交給執行緒池去做決定。

exceptionally則是上面的任務執行丟擲異常後,所要執行的方法。

示例

CompletableFuture.supplyAsync(()->{
        int a = 10/0;
        return 1;
    }).whenComplete((r, e)->{
        System.out.println(r);
    }).exceptionally(e->{
        System.out.println(e);
        return 2;
    });

 

值得注意的是:哪怕supplyAsync丟擲了異常,whenComplete也會執行,意思就是,只要supplyAsync執行結束,它就會執行,不管是不是正常執行完。exceptionally只有在異常的時候才會執行。

其實,在whenComplete的引數內 e就代表異常了,判斷它是否為null,就可以判斷是否有異常,只不過這樣的做法,我們不提倡。

whenComplete和exceptionally這兩個,誰在前,誰先執行。 

此類的回撥方法,哪怕主執行緒已經執行結束,已經跳出外圍的方法體,然後回撥方法依然可以繼續等待非同步任務執行完成再觸發,絲毫不受外部影響。

 

3、 thenApply 和 handle 方法

如果兩個任務之間有依賴關係,比如B任務依賴於A任務的執行結果,那麼就可以使用這兩個方法

public <U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn)
public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn)
public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn, Executor executor)

public <U> CompletionStage<U> handle(BiFunction<? super T, Throwable, ? extends U> fn);
public <U> CompletionStage<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn);
public <U> CompletionStage<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn,Executor executor);

 

這兩個方法,效果是一樣的,區別在於,當A任務執行出現異常時,thenApply方法不會執行,而handle 方法一樣會去執行,因為在handle方法裡,我們可以處理異常,而前者不行。

示例

    CompletableFuture.supplyAsync(()->{
        return 5;
    }).thenApply((r)->{
        r = r + 1;
        return r;
    });
    
    //出現了異常,handle方法可以拿到異常 e
    CompletableFuture.supplyAsync(()->{
        int i = 10/0;
        return 5;
    }).handle((r, e)->{
        System.out.println(e);
        r = r + 1;
        return r;
    });

 

這裡延伸兩個方法  thenAccept 和 thenRun。其實 和上面兩個方法差不多,都是等待前面一個任務執行完 再執行。區別就在於thenAccept接收前面任務的結果,且無需return。而thenRun只要前面的任務執行完成,它就執行,不關心前面的執行結果如何

如果前面的任務拋了異常,非正常結束,這兩個方法是不會執行的,所以處理不了異常情況。

 

4、 合併操作方法  thenCombine 和 thenAcceptBoth 

我們常常需要合併兩個任務的結果,在對其進行統一處理,簡言之,這裡的回撥任務需要等待兩個任務都完成後再會觸發。

public <U,V> CompletionStage<V> thenCombine(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn);
public <U,V> CompletionStage<V> thenCombineAsync(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn);
public <U,V> CompletionStage<V> thenCombineAsync(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn,Executor executor);

public <U> CompletionStage<Void> thenAcceptBoth(CompletionStage<? extends U> other,BiConsumer<? super T, ? super U> action);
public <U> CompletionStage<Void> thenAcceptBothAsync(CompletionStage<? extends U> other,BiConsumer<? super T, ? super U> action);
public <U> CompletionStage<Void> thenAcceptBothAsync(CompletionStage<? extends U> other,BiConsumer<? super T, ? super U> action,     Executor executor);

 

這兩者的區別 在於 前者是有返回值的,後者沒有(就是個消耗工作)

示例

private static void thenCombine() throws Exception {

        CompletableFuture<String> future1 = CompletableFuture.supplyAsync(()->{
            try {
                Thread.sleep(4000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "future1";
        });
        CompletableFuture<String> future2 = CompletableFuture.supplyAsync(()->{
            return "future2";
        });

        CompletableFuture<String> result = future1.thenCombine(future2, (r1, r2)->{
            return r1 + r2;
        });
//這裡的get是阻塞的,需要等上面兩個任務都完成 System.out.println(result.get()); }

 

private static void thenAcceptBoth() throws Exception {

        CompletableFuture<String> future1 = CompletableFuture.supplyAsync(()->{
            try {
                Thread.sleep(4000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "future1";
        });
        CompletableFuture<String> future2 = CompletableFuture.supplyAsync(()->{
            return "future2";
        });
        //值得注意的是,這裡是不阻塞的
        future1.thenAcceptBoth(future2, (r1, r2)->{
            System.out.println(r1 + r2);
        });

        System.out.println("繼續往下執行");
    }

 

這兩個方法 都不會形成阻塞。就是個回撥方法。只有get()才會阻塞。

4、 allOf (重點,個人覺得用的場景很多)

  很多時候,不止存在兩個非同步任務,可能有幾十上百個。我們需要等這些任務都完成後,再來執行相應的操作。那怎麼集中監聽所有任務執行結束與否呢? allOf方法可以幫我們完成。

public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs);

 

它接收一個可變入參,既可以接收CompletableFuture單個物件,可以接收其陣列物件。

結合例子說明其作用。

public static void main(String[] args) throws Exception{
        long start = System.currentTimeMillis();
        CompletableFutureTest test = new CompletableFutureTest();
        // 結果集
        List<String> list = new ArrayList<>();

        List<Integer> taskList = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
        // 全流式處理轉換成CompletableFuture[]
        CompletableFuture[] cfs = taskList.stream()
                .map(integer -> CompletableFuture.supplyAsync(() -> test.calc(integer))
                        .thenApply(h->Integer.toString(h))
                        .whenComplete((s, e) -> {
                            System.out.println("任務"+s+"完成!result="+s+",異常 e="+e+","+new Date());
                            list.add(s);
                        })
                ).toArray(CompletableFuture[]::new);
        
        CompletableFuture.allOf(cfs).join();
        
        System.out.println("list="+list+",耗時="+(System.currentTimeMillis()-start));
    }

    public int calc(Integer i) {
        try {
            if (i == 1) {
                Thread.sleep(3000);//任務1耗時3秒
            } else if (i == 5) {
                Thread.sleep(5000);//任務5耗時5秒
            } else {
                Thread.sleep(1000);//其它任務耗時1秒
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return i;
    }

 

全流式寫法,綜合了以上的一些方法,使用allOf集中阻塞,等待所有任務執行完成,取得結果集list。   這裡有些CountDownLatch的感覺。

 

CompletableFuture 總結

圖片出自

https://www.cnblogs.com/dennyzhangdd/p/7010972.html

 

本文只是簡述了CompletableFuture的常用用法。日常開發基本夠用,但是針對一些特殊場景,例如異常場景,取消場景,仍待研究。

&n