1. 程式人生 > >Java8新特性8--使用CompletableFuture構建非同步應用

Java8新特性8--使用CompletableFuture構建非同步應用

轉自:https://www.jianshu.com/p/4897ccdcb278

 

使用CompletableFuture構建非同步應用

Future 介面的侷限性

future介面可以構建非同步應用,但依然有其侷限性。它很難直接表述多個Future 結果之間的依賴性。實際開發中,我們經常需要達成以下目的:

  • 將兩個非同步計算合併為一個——這兩個非同步計算之間相互獨立,同時第二個又依賴於第
    一個的結果。
  • 等待 Future 集合中的所有任務都完成。
  • 僅等待 Future 集合中最快結束的任務完成(有可能因為它們試圖通過不同的方式計算同
    一個值),並返回它的結果。
  • 通過程式設計方式完成一個 Future 任務的執行(即以手工設定非同步操作結果的方式)。
  • 應對 Future 的完成事件(即當 Future 的完成事件發生時會收到通知,並能使用 Future
    計算的結果進行下一步的操作,不只是簡單地阻塞等待操作的結果)

新的CompletableFuture將使得這些成為可能。

CompletableFuture

非同步執行

首先,CompletableFuture實現了Future介面,因此你可以像Future那樣使用它。

其次,CompletableFuture並非一定要交給執行緒池執行才能實現非同步,你可以像下面這樣實現非同步執行。


    public static void test1() throws Exception{
        CompletableFuture<String> completableFuture=new CompletableFuture();
        new Thread(new Runnable() {
            @Override
            public void run() {
                //模擬執行耗時任務
                System.out.println("task doing...");
                try {
                    Thread.sleep(3000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                //告訴completableFuture任務已經完成
                completableFuture.complete("result");
            }
        }).start();
        //獲取任務結果,如果沒有完成會一直阻塞等待
        String result=completableFuture.get();
        System.out.println("計算結果:"+result);
    }


錯誤處理

如果沒有意外,上面發的程式碼工作得很正常。但是,如果任務執行過程中產生了異常會怎樣呢?

非常不幸,這種情況下你會得到一個相當糟糕的結果:異常會被限制在執行任務的執行緒的範圍內,最終會殺死該執行緒,而這會導致等待 get 方法返回結果的執行緒永久地被阻塞。

客戶端可以使用過載版本的 get 方法,它使用一個超時引數來避免發生這樣的情況。這是一種值得推薦的做法,你應該儘量在你的程式碼中新增超時判斷的邏輯,避免發生類似的問題。

使用這種方法至少能防止程式永久地等待下去,超時發生時,程式會得到通知發生了 Timeout-Exception 。不過,也因為如此,你不能指定執行任務的執行緒內到底發生了什麼問題。

為了能獲取任務執行緒內發生的異常,你需要使用
CompletableFuture 的completeExceptionally方法將導致CompletableFuture 內發生問題的異常丟擲。這樣,當執行任務發生異常時,呼叫get()方法的執行緒將會收到一個 ExecutionException 異常,該異常接收了一個包含失敗原因的Exception 引數。

 public static void test2() throws Exception{
        CompletableFuture<String> completableFuture=new CompletableFuture();
        new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    //模擬執行耗時任務
                    System.out.println("task doing...");
                    try {
                        Thread.sleep(3000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    throw new RuntimeException("拋異常了");
                }catch (Exception e) {
                    //告訴completableFuture任務發生異常了
                    completableFuture.completeExceptionally(e);
                }
            }
        }).start();
        //獲取任務結果,如果沒有完成會一直阻塞等待
        String result=completableFuture.get();
        System.out.println("計算結果:"+result);
    }

工廠方法

前面我們通過程式設計自己建立 CompletableFuture 物件以及如何獲取返回值,雖然看起來這些操作已經比較方便,但還有進一步提升的空間.

CompletableFuture 類自身提供了大量精巧的工廠方法,使用這些方法能更容易地完成整個流程,還不用擔心實現的細節。

supplyAsync 方法接受一個生產者(Supplier)作為引數,返回一個 CompletableFuture
物件。生產者方法會交由 ForkJoinPool池中的某個執行執行緒( Executor )執行,但是你也可以使用 supplyAsync 方法的過載版本,傳遞第二個引數指定執行緒池執行器執行生產者方法。

public static void test3() throws Exception {
        //supplyAsync內部使用ForkJoinPool執行緒池執行任務
        CompletableFuture<String> completableFuture=CompletableFuture.supplyAsync(()->{
            //模擬執行耗時任務
            System.out.println("task doing...");
            try {
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            //返回結果
            return "result";
        });
        System.out.println("計算結果:"+completableFuture.get());
    }

allOf 工廠方法接收一個由CompletableFuture 構成的陣列,陣列中的所有 Completable-Future 物件執行完成之後,它返回一個 CompletableFuture<Void> 物件。這意味著,如果你需要等待多個 CompletableFuture 物件執行完畢,對 allOf 方法返回的
CompletableFuture 執行 join 操作可以等待CompletableFuture執行完成。

或者你可能希望只要 CompletableFuture 物件陣列中有任何一個執行完畢就不再等待,在這種情況下,你可以使用一個類似的工廠方法 anyOf

該方法接收一個 CompletableFuture 物件構成的陣列,返回由第一個執行完畢的 CompletableFuture 物件的返回值構成的 CompletableFuture<Object> 。

 public static void test4() throws Exception {

        CompletableFuture<String> completableFuture1=CompletableFuture.supplyAsync(()->{
            //模擬執行耗時任務
            System.out.println("task1 doing...");
            try {
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            //返回結果
            return "result1";
        });

        CompletableFuture<String> completableFuture2=CompletableFuture.supplyAsync(()->{
            //模擬執行耗時任務
            System.out.println("task2 doing...");
            try {
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            //返回結果
            return "result2";
        });

        CompletableFuture<Object> anyResult=CompletableFuture.anyOf(completableFuture1,completableFuture2);

        System.out.println("第一個完成的任務結果:"+anyResult.get());

        CompletableFuture<Void> allResult=CompletableFuture.allOf(completableFuture1,completableFuture2);

        //阻塞等待所有任務執行完成
        allResult.join();
        System.out.println("所有任務執行完成");

    }

將兩個CompletableFuture建立聯絡

通常,我們會有多個需要獨立執行但又有所依賴的的任務。比如先等用於的訂單處理完畢然後才傳送郵件通知客戶。

thenCompose 方法允許你對兩個非同步操作進行流水線,第一個操作完成時,將其結果作為引數傳遞給第二個操作。你可以建立兩個CompletableFutures 物件,對第一個 CompletableFuture 物件呼叫thenCompose ,並向其傳遞一個函式。當第一個CompletableFuture 執行完畢後,它的結果將作為該函式的引數,這個函式的返回值是以第一個 CompletableFuture 的返回做輸入計算出的第二個 CompletableFuture 物件。

public static void test5() throws Exception {

        CompletableFuture<String> completableFuture1=CompletableFuture.supplyAsync(()->{
            //模擬執行耗時任務
            System.out.println("task1 doing...");
            try {
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            //返回結果
            return "result1";
        });

        //等第一個任務完成後,將任務結果傳給引數result,執行後面的任務並返回一個代表任務的completableFuture
        CompletableFuture<String> completableFuture2= completableFuture1.thenCompose(result->CompletableFuture.supplyAsync(()->{
            //模擬執行耗時任務
            System.out.println("task2 doing...");
            try {
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            //返回結果
            return "result2";
        }));

        System.out.println(completableFuture2.get());

    }

另一種比較常見的情況是,你需要將兩個完
全不相干的 CompletableFuture 物件的結果整合起來,而且你也不希望等到第一個任務完全結
束才開始第二項任務。

這種情況,你應該使用 thenCombine 方法,它接收名為 BiFunction 的第二引數,這個引數
定義了當兩個 CompletableFuture 物件完成計算後,結果如何合併。

 public static void test6() throws Exception {

        CompletableFuture<Integer> completableFuture1 = CompletableFuture.supplyAsync(() -> {
            //模擬執行耗時任務
            System.out.println("task1 doing...");
            try {
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            //返回結果
            return 100;
        });

        //將第一個任務與第二個任務組合一起執行,都執行完成後,將兩個任務的結果合併
        CompletableFuture<Integer> completableFuture2 = completableFuture1.thenCombine(
                //第二個任務
                CompletableFuture.supplyAsync(() -> {
                    //模擬執行耗時任務
                    System.out.println("task2 doing...");
                    try {
                        Thread.sleep(3000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    //返回結果
                    return 2000;
                }),
                //合併函式
                (result1, result2) -> result1 + result2);

        System.out.println(completableFuture2.get());

    }

響應 CompletableFuture 的 completion 事件

我們可以在每個CompletableFuture 上註冊一個操作,該操作會在 CompletableFuture 完成執行後呼叫它。CompletableFuture 通過 thenAccept 方法提供了這一功能,它接收
CompletableFuture 執行完畢後的返回值做引數。

 public static void test7() throws Exception {

        CompletableFuture<Integer> completableFuture1 = CompletableFuture.supplyAsync(() -> {
            //模擬執行耗時任務
            System.out.println("task1 doing...");
            try {
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            //返回結果
            return 100;
        });
       
        //註冊完成事件
        completableFuture1.thenAccept(result->System.out.println("task1 done,result:"+result));

        CompletableFuture<Integer> completableFuture2=
                //第二個任務
                CompletableFuture.supplyAsync(() -> {
                    //模擬執行耗時任務
                    System.out.println("task2 doing...");
                    try {
                        Thread.sleep(3000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    //返回結果
                    return 2000;
                });

        //註冊完成事件
        completableFuture2.thenAccept(result->System.out.println("task2 done,result:"+result));
        
        //將第一個任務與第二個任務組合一起執行,都執行完成後,將兩個任務的結果合併
        CompletableFuture<Integer> completableFuture3 = completableFuture1.thenCombine(completableFuture2,
                //合併函式
                (result1, result2) -> {
                    try {
                        Thread.sleep(2000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    return result1 + result2;
                });

        System.out.println(completableFuture3.get());

    }

慢慢消化吧,重在實踐!