1. 程式人生 > >java8學習:CompetableFuture組合式非同步程式設計

java8學習:CompetableFuture組合式非同步程式設計

內容來自《 java8實戰 》,本篇文章內容均為非盈利,旨為方便自己查詢、總結備份、開源分享。如有侵權請告知,馬上刪除。
書籍購買地址:java8實戰

  • 如果你的意圖是實現併發,而非並行,或者你的主要目標是在同一個CPU上執行集合鬆耦合的任務,充分利用CPU的核,讓其足夠忙碌,從而最大化程式的吞吐量,那麼你其實真正想做的是避免因為等待遠端服務的返回等一些操作而阻塞執行緒的執行,浪費寶貴的計算資源,因為這種等待的時間可能很長,通過本文就會了解,Future介面,尤其是他的實現:CompletableFuture,是這種情況的處理利器

markdown_img_paste_20181123102952478

Future介面

  • 在java5被引入,他是代表一種非同步計算,返回一個執行運算結果的引用,當運算結束後,這個引用被返回給呼叫方.在Future中觸發那些潛在耗時的操作把呼叫執行緒解放出來,讓它能繼續執行其他右腳趾的工作,不需要等待耗時操作完成,就比如你去幹洗店把衣服交給它,然後你去做其他事情就好,衣服洗好了,自然就會有人給你打電話通知拿衣服了
  • 一個簡單的Future以非同步的方式執行一個耗時的操作的程式碼例項

    //通過此物件,可以向執行緒池提交任務
    ExecutorService service = Executors.newCachedThreadPool();
    //提交任務
    Future<Double> task = service.submit(new Callable<Double>() {
        @Override
        public Double call() throws Exception {
            return 7D;
        }
    });
    try {
        //獲取非同步操作的結果,如果被堵塞,那麼最多等待一秒之後退出
        Double aDouble = task.get(10, TimeUnit.SECONDS);
    } catch (InterruptedException e) {
        //當前執行緒在等待過程中被中斷
        e.printStackTrace();
    } catch (ExecutionException e) {
        //計算丟擲一個異常
        e.printStackTrace();
    } catch (TimeoutException e) {
        //超時異常
        e.printStackTrace();
    }

Future的侷限性

  • 通過上面的例子,我們知道了Future介面提供了方法來檢測一步計算是否已經結束(isDone),等待非同步操作結束,以及獲取計算的結果,但是這些特性還不足以寫出簡潔的併發程式碼,比如我們很難描述兩個Future結果之間的依賴性:比如"當A計算完成後,請將A計算的結果通知給B任務,等待兩個任務都完成後,請將計算結果與另一個操作結過合併",但是Future程式碼寫起來就又是一回事了,所以我們需要更具描述力的特性,比如

    • 兩個計算結果合併為一個
    • 等待所有Future任務完成
    • 僅等待最快的Future任務完成
    • 應對Future完成事件
  • 在我們瞭解過lambda之後,其實上面的需求我們可以聯想到lambda中的解決方法,比如上面的應對Futrue完成時間:如果新的Future實現採用lambda模式程式設計的話,那麼肯定是一個Supplier之類的一個函式式介面,以便將我們的實現進行行為引數化處理,java8中也是這樣做的,接下來要說的新的Future實現遵循了類似的模式,使用lambda思想和流水線的思想(Stream).

使用CompletableFuture構建非同步應用

  • 首先我們要了解一下相關的概念:同步API和非同步API

    • 同步API:你呼叫了某個方法,呼叫方在被呼叫方執行的過程中會進入等待,被呼叫方執行結束後返回,呼叫方取得被呼叫方的返回值並繼續執行,即使呼叫方和被呼叫方處於不同的執行緒中執行,呼叫方還是等待被呼叫方執行結束結果返回後才能繼續往下執行,所以此時同步API是在等待的
    • 非同步API:與上面正好相反:即非同步API直接返回,或者至少在被呼叫方結束計算完成之前,把它剩餘的計算任務交給另一個執行緒去做,該執行緒和呼叫方是非同步的:這就是非阻塞式呼叫的由來.執行剩餘計算任務的執行緒會將它的計算結果返回給呼叫方,返回的方式是通過回撥函式或者由呼叫方再次執行一個"等待,直到計算完成"的方法呼叫
  • 下面我們將要做一個"最佳價格查詢器"的應用:他會查詢多個線上的商店,依據給定的產品或者服務找出最低價格

實現開始

//這個方法是每個線上商店都需要有的方法:根據商品返回價格
//待實現部分可能會引發延遲較高的操作,比如資料庫的查詢等
public double getPrice(String product){
    //待實現
}
  • 為了演示延遲操作,我們直接執行緒睡眠一秒好了,如下

    public static void delay(){
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
  • 將上面的getPrice方式實現完整

    public double getPrice(String product){
        return calculatePrice(product);        
    }
    private double calculatePrice(String product){
        delay();
        return Math.random();
    }
  • 上面的程式碼如果呼叫的話,明顯是一個同步操作,你會進入方法並等待一秒,你才會得到你想要的結果,對於這個方法,體驗是很差的,所以我們將同步方法轉換為非同步方法

    //將getPrice方法修改
    public Future<Double> getPriceAsync(String product){
        //建立此物件,此物件包含計算的結果
        CompletableFuture<Double> future = new CompletableFuture<>();
        //在另一個執行緒中以非同步方式進行計算
        new Thread(() -> {
            double v = calculatePrice(product);
            //需要長時間計算的任務結束並得到結果時,設定為Future的返回值
            future.complete(v);
        }).start();
        //無須等待還沒有結束的計算,直接返回future物件
        return future;
    }
    • 返回值Future代表一個非同步計算的結果,即Future是一個暫時不知道值的處理器,在這個值處理完成後,可以呼叫get方法取得.
    • 如上的future.complete(v);,可以使用此方法,結束CompletableFuture的執行,並設定變數的值
  • 使用如上非同步API

    @Test
    public void test() throws InterruptedException, ExecutionException, TimeoutException {
        Shop shop = new Shop();
        //獲取指定商品的價格
        Future<Double> produceNameTask = shop.getPriceAsync("produceName");
        //其他任何操作,比如非同步耗時操作也可以
        //如果任務結束了,那麼就返回,否則就進入阻塞
        Double aDouble = produceNameTask.get(10, TimeUnit.SECONDS);
        System.out.println("aDouble = " + aDouble);
    }
    • 如上所提的任何操作,如果是非同步的,那麼也都是直接返回,然後繼續執行上面的程式碼,當到get的時候,如果任務完成,就返回值,否則get就進入阻塞,但是不超過指定時間
    • 如果在價格計算的過程中產生了錯誤,那麼用於提示錯誤的異常會被限制到試圖計算商品價格的當前執行緒的範圍內,最終會殺死該執行緒,而這會導致等待get方法返回結果的客戶端永久的被阻塞,即get不知道他的非同步呼叫發生錯誤了,錯誤被封在了非同步呼叫中,而get還在傻傻等待造成永久的阻塞,當然可以使用get的有時間限制的方法,如果超時就會發生超時異常,但是這樣你就不會有機會發現計算價格方法內部發生了什麼問題,為了讓客戶端知道為什麼報錯,我們需要使用CompletableFuture的completeExceptionally方法將內部錯誤丟擲,如下

錯誤處理

  • 我們可以製造一個除零錯誤.如下

    public Future<Double> getPriceAsync(String product){
        CompletableFuture<Double> future = new CompletableFuture<>();
        new Thread(() -> {
                double v = calculatePrice(product);
                //如果正常結束,那麼就設定值並返回
                int i = 1 / 0 ;   //異常發生
                future.complete(v);
        }).start();
        return future;
    }
  • 然後我們繼續執行上面的test測試方法,執行結果如下,並且程式終止

    Exception in thread "Thread-0" java.lang.ArithmeticException: / by zero
        at com.qidai.demotest.Shop.lambda$getPriceAsync$0(Shop.java:12)
        at java.lang.Thread.run(Thread.java:748)
    //這裡會等待get方法設定的時長
    java.util.concurrent.TimeoutException
        at java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1771)
        at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1915)
        at com.qidai.demotest.MyTest.test(MyTest.java:18)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
      ...
    • 修改shop的getPriceAsync()方法的實現

      public Future<Double> getPriceAsync(String product){
          CompletableFuture<Double> future = new CompletableFuture<>();
          new Thread(() -> {
              try {
                  double v = calculatePrice(product);
                  //如果正常結束,那麼就設定值並返回
                  int i = 1 / 0 ;
                  future.complete(v);
              }catch (Exception e){
                  future.completeExceptionally(e);   //告訴Future發生異常了,直接返回
              }
          }).start();
          return future;
      }
  • 控制檯測試結果如下

    java.util.concurrent.ExecutionException: java.lang.ArithmeticException: / by zero
        at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
        at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1915)
        at com.qidai.demotest.MyTest.test(MyTest.java:18)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
      ...  詳細的堆疊資訊,並且不會像上面一樣get一直在堵塞,而是遇到錯誤立刻返回
    Caused by: java.lang.ArithmeticException: / by zero
        at com.qidai.demotest.Shop.lambda$getPriceAsync$0(Shop.java:13)
        at java.lang.Thread.run(Thread.java:748)

使用工廠方法建立CompletableFuture物件

  • 之前建立了CompletableFuture物件了,但是有簡單的工廠方法可以直接建立此物件,如下使用supplyAsync建立物件

    public Future<Double> getPriceAsync(String product){
        return CompletableFuture.supplyAsync(() -> calculatePrice(product));
    }
    • 上面方法是對之前的方法的改造,方法實現更加簡單,supplyAsync接收一個supplier引數,返回一個CompletableFuture物件,該物件完成非同步執行後讀取呼叫生產者方法的返回值,生產者方法會交由ForkJoinPool中的某個執行緒去執行,上面的方法與之前的方法實現完全等價,並且已經實現了錯誤管理

CompletableFuture正確姿勢

  • 下面我們將假設只提供了同步的API,以及一個商家列表,如下

    public class Shop {
        private String name ;
        public Shop(String name) {
            this.name = name;
        }
        //同步
        public Double getPrice(String product){
            return calculatePrice(product);
        }
        private double calculatePrice(String product){
            delay();
            return Math.random();
        }
        public static void delay(){
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        public String getName() {
            return name;
        }
    }
    //商家列表  
    List<Shop> shops = Arrays.asList(new Shop("A"),new Shop("B")
                                    ,new Shop("C"),new Shop("D")
                                    ,new Shop("E"),new Shop("F")
                                    ,new Shop("G"),new Shop("H"));
  • 我們還需要一個方法:它接受產品名為引數,返回一個字串列表,字串為商品名+商品價格

    public List<String> findPrice(String product){}  //這個方法將在順序查詢,並行查詢和非同步查詢中實現不同邏輯
  • 順序方式查詢實現

    public List<String> findPrice(String product){
        return shops.stream().map(shop -> shop.getName() + shop.getPrice(product))
        .collect(Collectors.toList());
    }
  • 順序方式查詢驗證時間和結果

    public void test(){
        long l1 = System.nanoTime();
        List<String> list = findPrice("huawei");
        long l2 = System.nanoTime();
        System.out.println("done = " + (l2 - l1));
    }   //以後的並行查詢和非同步查詢都將才用這個方法驗證時間和結果
  • 順序方式查詢結果:done = 8063808400,不出意外是八秒多,因為每個方法都是順序執行的,並且每個方法都睡眠了一秒鐘,然後加上執行時間,八秒多很正常
  • 下面我們將使用並行流來實現findPrice方法

    public List<String> findPrice(String product){
        return shops.parallelStream().map(shop -> shop.getName() + shop.getPrice(product))
        .collect(Collectors.toList());
    } //僅僅是將stream 換為  parallelStream
  • 並行流查詢結果:done = 1068473800,因為我的機子是八核的,並且並行流預設的執行緒數就是你機子的核數,所以八個shop是同時進行處理的,所以時間耗費是一秒多
  • 使用CompletableFuture實現

    public List<CompletableFuture<String>> findPrice(String product){
        return shops.stream().map(shop -> CompletableFuture.supplyAsync(
        () -> shop.getName() + shop.getPrice(product)))
                .collect(Collectors.toList());
    }
  • 注意上面方法,你會得到一個List>,列表中的每個CompletableFuture物件在計算完成後都包含商店的String型別的名稱,但是由於用CompletableFuture實現的findPrice需要返回一個List,你就必須要等到所有Future執行完,將其包含的值抽取出來才能夠返回
  • 為了實現上面說的等待效果,我們有一個方法可以實現:join,CompletableFuture中的join與Future中的get有相同的含義,並且也生命在Future介面中,唯一的不同就是join不會丟擲異常,對List中的CompletableFuture執行join操作,一個接一個等待他們執行結束,如下

    public List<String> findPrice(String product){
        List<CompletableFuture<String>> collect = shops.stream()
                                                       .map(shop -> CompletableFuture.supplyAsync(() -> shop.getName() + shop.getPrice(product)))
                                                       .collect(Collectors.toList());
        return collect.stream().map(CompletableFuture::join).collect(Collectors.toList());
    }
  • 上面操作使用了兩個Stream,而不是在一個Stream進行兩次map,這是因為考慮流操作之間的延遲特性,如果你在單一流水線中處理流,發出不同商家的請求就只能同步,順序執行的方式才會成功,因此每個建立CompletableFuture物件只能在前一個操作結束之後執行查詢指定商家的動作,通知join方法返回計算結果,也就是說,在第一個map中得到了一個CompletableFuture物件,再次呼叫map進行join操作的話,那麼流就只能在這等待CompletableFuture完成操作才會繼續執行,自己測試的雙map情況下執行時間為:8080624000
  • 非同步方式的執行時間為:done = 2060340600
  • 到這我們還是有些失望的,因為非同步方式是並行流執行時間的將近兩倍了,那麼我們應該怎麼改進這個現象呢?
  • 並行流非常快,單這只是對於之前的測試,如果我們增加一個shop,那麼又會是什麼結果呢?

    • 順序流:done = 9064012900,毫不意外就是九秒多
    • 並行流:done = 2063944500,因為shop個數大於機器核數了,所以他會多出一個shop,它一直在等待某個shop執行完畢讓出執行緒然後自己去執行,所以是兩秒多
    • 非同步:done = 2070337400,與並行流,看起來差不多的時間,原因是跟並行流是一樣的,因為他們預設都是以機器核數個數為預設執行緒池的大小的,機器核數可以通過Runtime.getRuntime().availableProcessors()得到,然而CompletableFuture具有一定的優勢,因為它可以允許你對執行器Executor進行配置,尤其是執行緒池的大小,讓它跟能適應需求

定製Executor

  • 這裡就設計到了執行緒的大小,因為從上面我們就能看出執行緒個數對程式的執行帶來的影響
  • 有一個可以參考的公式:執行緒數=處理器核數 * 期望CPU利用率 * 等待時間與計算時間的比率,我們上面的非同步方式基本都是等待shop的計算方法返回結果,所以這裡等待時間與計算時間的比率估算為100,如果利用率也為100,那麼我的機器將要建立800個執行緒,但是對於上面的shop數量,這顯然太多了,我們最後的就是跟shop數量一致,這樣就可以一個執行緒分擔一個shop的處理任務,在實際操作中,如果shop數量可能太多,就必須有一個執行緒個數的上限,以確保機器不會崩潰
  • 定製執行器

    Executor executor = Executors.newFixedThreadPool(Math.min(shops.size(), 100), new ThreadFactory() {
        @Override
        public Thread newThread(Runnable r) {
            Thread thread = new Thread(r);
            //使用守護執行緒---這種方式不會阻止程式的關停
            thread.setDaemon(true);
            return thread;
        }
    });
  • 如上是一個由守護執行緒構成的執行緒池,當一個普通執行緒在執行時,java程式無法終止或者退出,所以最後剩下的那個執行緒會由於一直等待無法發生的時間而引發問題,但是切換為守護執行緒就以為這程式退出時他會被回收,這兩種執行緒效能上沒什麼差異,現在建立好了執行緒池,可以在非同步方法中使用了,比如

    public List<String> findPrice(String product){
        List<CompletableFuture<String>> collect = shops.stream()
                .map(shop -> CompletableFuture.supplyAsync(() -> shop.getName() + shop.getPrice(product),executor))  //注意引數變化
                .collect(Collectors.toList());
        return collect.stream().map(CompletableFuture::join).collect(Collectors.toList());
    }
  • 時間為:done = 1064816300

並行和非同步的選擇

  • 上面我們看到並行的效能意思不錯的,那麼我們應該如何選擇呢?

    • 如果是計算密集型的操作,沒有IO,那麼就推薦並行Stream
    • 如果涉及到IO或者網路連線等,那麼就推薦CompletableFuture

非同步程式的流水線操作

  • 上面中我們是用的CompletableFuture都是單次操作,到這將開始接受多個非同步操作結合在一起是如何使用的
  • 現在假設shop支援了一個折扣服務,服務折扣分為五個折扣力度,並用列舉型別變數代表

    public class   Discount {
        public enum Code{
            //無     銀         金         鉑金          鑽石
            NONE(0),SILVER(5),GOLD(10),PLATINUM(15),DIAMOND(20);
            //百分比
            private final int percentage;
            Code(int percentage) {
                this.percentage = percentage;
            }
        }
    }
  • 我們還假設所有的商店都以相同的格式返回資料,如:ShopName:price:Discount格式返回

    //修改getPrice方法
    public String getPrice(String product){
        Random random = new Random();
        Discount.Code code = Discount.Code.values()[random.nextInt(Discount.Code.values().length)];
        return this.name + ":" + calculatePrice(product) + ":" + code;
    }
  • 以上方法呼叫會返回類似字串:A:0.3771404561328807:SILVER
  • 我們還需要一個Quote類,該類可以將上面getPrice方法返回的String解析,並儲存在類中,如下

    public class Quote {
        private final String shopName;
        private final double price;
        private final Discount.Code discountCode;
        public Quote(String shopName, double price, Discount.Code discountCode) {
            this.shopName = shopName;
            this.price = price;
            this.discountCode = discountCode;
        }
        //解析shop.getPrice()方法返回的String
        public static Quote parse(String shopMes){
            String[] split = shopMes.split(":");
            String shopName = split[0];
            double price = Double.parseDouble(split[0]);
            Discount.Code code = Discount.Code.valueOf(split[2]);
            return new Quote(shopName,price,code);
        }
        public String getShopName() {
            return shopName;
        }
        public double getPrice() {
            return price;
        }
        public Discount.Code getDiscountCode() {
            return discountCode;
        }
    }
  • 同時我們還需要在之前的Discount類中加入兩個方法,如下

    public class   Discount {
        public enum Code{
          ...//上面有實現
        }
        public static String applyDiscount(Quote quote){
            //將商品的原始價格和折扣力度傳入,返回一個新價格
            return quote.getShopName()+" price = " + Discount.apply(quote.getPrice(),quote.getDiscountCode());
        }
        private static String apply(double price, Code discountCode) {
            delay();  //模擬服務響應的延遲
            //新價格
            return (price * (100 - discountCode.percentage) / 100) + "";
        }
        private static void delay(){
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
  • 到這總算是把需要的程式碼寫完了,然後下面我們來使用這個Discount

使用Discount

  • 首先嚐試以最直接的方式重新實現findPrice方法

    public List<String> findPrice(String product){
        return shops.stream().map(shop -> shop.getPrice(product))      //根據商品名得到商品資訊:ShopName:price:Discount
                .map(Quote::parse)                                     //根據商品資訊,將資訊封裝到Quote返回
                .map(Discount::applyDiscount)                          //將Quote傳入,並根據原始價格和折扣力度獲取最新價格,返回shopName+newPrice
                .collect(Collectors.toList());                         //收集到List中
    }
  • 執行結果

    18175537800
    huawei = [A price = 0.45873472420610784, B price = 0.055878368162042856,
              C price = 0.27810347563879867, D price = 0.3630003460659669,
              E price = 0.7504524049696628, F price = 0.2958088360956538,
              G price = 0.19074919381044, H price = 0.5328477712597838,
              I price = 0.10705723386858104]
  • 如上我們足足運行了18秒,我們現在能想到的優化措施就是使用並行流,採用之後的執行時長為:4077169100,但是我們根據之前的測試知道,這裡在shop數量增多的時候並不適合採用並行,因為他底層的執行緒池是固定的,而是採用CompletableFuture更好
  • 下面我們採用CompletableFuture實現非同步操作

    public List<String> findPrice(String product){
        List<CompletableFuture<String>> priceFutures = shops.stream()
                //以非同步方式取得每個shop中指定產品的原始價格
                .map(shop -> CompletableFuture.supplyAsync(() -> shop.getPrice(product),executor))
                //在quote物件存在的時候,對其返回的值進行轉換
                .map(future -> future.thenApply(Quote::parse))
                //使用另一個非同步任務構造期望的Future,申請折扣
                .map(future -> future.thenCompose(quote ->
                        CompletableFuture.supplyAsync(() -> Discount.applyDiscount(quote),executor)))
                .collect(Collectors.toList());
    
        return priceFutures.stream()
                //等待所有Future結束,並收集到List中
                .map(CompletableFuture::join).collect(Collectors.toList());
    }
  • 執行結果

    2092351201
    huawei = [A price = 0.32805686340328877, B price = 0.12371667853268178,
              C price = 0.019271284007279683, D price = 0.4014063161769382,
              E price = 0.457890861738724, F price = 0.12642987715813725,
              G price = 0.28084441232801843, H price = 0.07957054370541786,
              I price = 0.48027669847733084]
  • 步驟解毒!

    • 獲取價格:使用supplyAsync方法就可以一步的對shop進行查詢,第一個轉換的結果是Stream>,一旦執行結束,每個CompletableFuture物件中都會包含對應的shop返回的字串,執行器還是之前的執行器
    • 解析報價:將shop返回的String進行解析,由於解析不需要遠端操作,所以這裡並沒有採用非同步的方式進行處理,並且值得注意的是,thenApply方法並不會阻塞程式碼的執行(thenApply是同步方法,還有一個thenApplyAsync非同步方法),而是類似Stream中的中間操作一樣,只有當CompletableFuture最終結束執行時,你希望傳遞lambda給thenApply方法,將Stream中的CompletableFuture轉換為CompletableFuture
    • 為計算折扣後的價錢構造Future:因為第三步map中設計到了一個遠端操作,我們用睡眠來模擬的,呼叫supplyAsync代表一個非同步操作,這時候我們已經呼叫了兩次Future操作,我們希望可以將這倆次Future操作進行串接起來一起工作:從shop中獲取價格,然後將它轉換為quote,拿到返回的quote後,將其作為引數再傳入Discount,取得最後的折扣價格,thenCompose方法就允許對兩個非同步操作進行流水線,第一個操作完成時,將其結果作為引數傳遞給第二個操作.即你可以建立兩個CompletableFuture物件,對第一個CompletableFuture物件呼叫thenCompose,並向其傳遞一個函式.當第一個CompletableFuture執行完畢後,他的結果將作為該函式的的引數,這個函式的返回值是以第一個CompletableFuture的返回做輸入計算出的第二個CompletableFuture物件.
    • 之後就是等待CompletableFuture全部結束然後收集到List中返回即可
  • 看到這我自己是有些蒙的,因為對於CompletableFuture的方法的使用不是很熟悉就更不用談理解了,這時候我去了解了一下CompletableFuture的方法的使用,大家如果跟我一樣,可以去看看<>

將兩個不相干的CompletableFuture結合起來

  • 如果你看過了我提到的CompletableFuture方法使用這篇api使用,那麼這就很容易了
  • 需求:將一個返回int的和一個返回String的CompletableFuture結果結合在一起

    public void test() {
        CompletableFuture<String> stringCompletableFuture = CompletableFuture.supplyAsync(() -> 1)   //返回int
                .thenCombine(
                        CompletableFuture.supplyAsync(() -> "2"),   //返回string
                        (i, s) -> i + s   //int和string的組合邏輯
                );
        String join = stringCompletableFuture.join();
        System.out.println(join); //12
    }

響應CompletableFuture的completion時間

  • 我們之前的應用都是用延遲一秒來模擬網路延遲的,但是真實場景中,網路延遲不盡相同,可能會立刻返回,或者延遲到超時...,所以我們更改一下之前的模擬網路延遲的方法delay為randomDelay

    private static void randomDelay(){
        try {
            Thread.sleep(300+ RANDOM.nextInt(2000));
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
  • 他隨機返回時間代表不同的網路延遲
  • 之前的findPrice方法都是收集到List中,這樣的弊端是必須等到所有的CompletableFuture執行完成後才能夠返回,我們現在來優化他
  • 現在我們知道了執行慢的主要原因在於收集到List中,因為他會join等待,所以我們希望findPrice方法直接接受Stream就好了,這樣接收流,就不用收集到List中

    public Stream<CompletableFuture<String>> findPrice(String product){
        return shops.stream()
                //以非同步方式取得每個shop中指定產品的原始價格
                .map(shop -> CompletableFuture.supplyAsync(() -> shop.getPrice(product),executor))
                //在quote物件存在的時候,對其返回的值進行轉換
                .map(future -> future.thenApply(Quote::parse))
                //使用另一個非同步任務構造期望的Future,申請折扣
                .map(future -> future.thenCompose(quote ->
                        CompletableFuture.supplyAsync(() -> Discount.applyDiscount(quote),executor)));
    }
  • 上面的程式碼就實現了返回一個Stream供我們處理的功能,下面我們來使用這個方法

    public void test(){
        CompletableFuture[] huawei = findPrice("huawei").map(f -> f.thenAccept(System.out::println))
                .toArray(size ->  new CompletableFuture[size]);
        CompletableFuture.allOf(huawei).join();
        long l2 = System.nanoTime();
    }
  • 如上findPrice方法傳入需要查詢的product,他會返回一個Stream<CompletableFuture<String>>,然後對流進行map操作,傳入的是每一個CompletableFuture(String = producName+打折後的price),thenAccept之前已經說過,傳入一個引數,無返回,但是map需要有返回值,其實thenAccept返回的是一個CompletableFuture<Void>型別,所以map就會返回一個Stream<CompletableFuture<Void>>,我們目前希望做的是等待他結束,返回商家資訊,但是我們之前的randomDelay方法是隨機時間睡眠的,所以難免會有一些慢的商家,不管慢不慢,我們都需要等到商家返回價格,這時候我們可以把Stream中的所有CompletableFuture放到一個數組中,等待所有的任務執行完成
  • allof方法接收一個由CompletableFuture組成的陣列,陣列中所有的CompletableFuture物件執行完之後,他會返回一個CompletableFuture<Void>物件,我們呼叫join方法,等待這個物件執行結束

    輸出結果
    A price = 0.6960491237085883
    C price = 0.11038794177308586
    F price = 0.16672807719726013
    D price = 0.004004621568001343
    E price = 0.19972626299549148
    B price = 0.9778330750902723
    I price = 0.29346736062034645
    H price = 0.37760535718363003
    G price = 0.3492986178179131
  • 觀察輸出過程,是一條條輸出的,這也展示了網路延遲的效果
  • 上面的allof是等待所有任務結束,而anyof是等待任一一個任務結束,如果我們訪問兩個地址,兩個地址只不過是快慢的問題,而返回的結果都相同的時候,我們就可以使用anyof