1. 程式人生 > >Java8 In Action-3.高效 Java 8 程式設計(四)

Java8 In Action-3.高效 Java 8 程式設計(四)

4.CompletableFuture:組合式非同步程式設計 4.1Future 介面 java.util.concurrent.Future介面在Java 5中被引入,設計初衷是對將來某個時刻會發生的結果進行建模。它建模了一種非同步計算,返回一個執行運算結果的引用,當運算結束後,這個引用被返回給呼叫方。在Future中觸發那些潛在耗時的操作把呼叫執行緒解放出來,讓它能繼續執行其他有價值的工作,不再需要呆呆等待耗時的操作完成。該介面表示一個非同步計算(即呼叫執行緒可以繼續執行,不會因為呼叫方法而阻塞)的結果。

/**
         * 使用Future以非同步的方式執行一個耗時的操作
         * Future的另一個優點是它比更底層的Thread更易用。
         * 要使用Future,通常你只需要將耗時的操作封裝在一個Callable物件中,
         * 再將它提交給ExecutorService,就萬事大吉了。
         */
ExecutorService executor = Executors.newCachedThreadPool(); Future<Object> future = executor.submit(new Callable<Object>() { return doSomeLongComputation();//以非同步方式在新的執行緒中執行耗時的操作 }); doSomethingElse();//非同步操作進行的同時,可以做其他的事情 try
{ Object result = future.get(1, TimeUnit.SECONDS); } catch (InterruptedException e) {//當前執行緒在等待過程中被中斷 e.printStackTrace(); } catch (ExecutionException e) {//計算丟擲一個異常 e.printStackTrace(); } catch (TimeoutException e) { //在Future物件完成前超時 e.
printStackTrace(); }

在這裡插入圖片描述

4.2實現非同步API 將同步方法轉換為非同步方法 錯誤處理

4.3讓你的程式碼免受阻塞之苦 使用並行流對請求進行並行操作 使用CompletableFuture 發起非同步請求 使用定製的執行器

package com.h.java8;

import java.util.Arrays;
import java.util.List;
import java.util.Random;
import java.util.concurrent.*;
import java.util.stream.Collectors;

/**
 * <br>最佳價格查詢器</br>
 */
public class Shop {
    private static List<Shop> shops;

    static {
        shops = Arrays.asList(new Shop("BestPrice"),
                new Shop("LetsSaveBig"),
                new Shop("MyFavoriteShop"),
                new Shop("BuyItAll"),
                new Shop("ShopEasy"),
                new Shop("ShopColl"));
    }

    /**
     * 為“最優價格查詢器”應用定製的執行器
     * 更充分地利用CPU資源
     */
    private static Executor executor = Executors.newFixedThreadPool(
            //1為了避免發生由於商店的數目過多導致伺服器超負荷而崩潰,設定一個上限100
            Math.min(shops.size(), 100),
            new ThreadFactory() {
                @Override
                public Thread newThread(Runnable r) {
                    Thread t = new Thread(r);
                    //使用守護執行緒——這種方式不會阻止程式的關停
                    t.setDaemon(true);
                    return t;
                }
            }
    );

    private String name;//商店名稱

    public Shop(String name) {
        this.name = name;
    }

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    /**
     * 依據指定產品名稱返回價格
     *
     * @param product
     * @return
     */
    public double getPricce(String product) {
        return calculatePrice(product);
    }

    /**
     * 將同步方法改為非同步方法
     *
     * @param product
     * @return
     */
    public Future<Double> getPriceAsync(String product) {
       /*
       //建立CompletableFuture物件,它包含非同步計算的結果
        CompletableFuture<Double> futurePrice = new CompletableFuture<>();
        //在另一個執行緒中以非同步的方式執行計算
        new Thread(() -> {
            try {
                double price = calculatePrice(product);
                futurePrice.complete(price);//需長時間計算的任務結束並得出結果時,設定Future的返回值
            } catch (Exception e) {
                //錯誤處理,最終向客戶端丟擲的異常
                futurePrice.completeExceptionally(e);
            }
        }).start();
        return futurePrice;
        */

        /**
         * 使用工廠方法supplyAsync建立CompletableFuture物件(和上面的程式碼是等價的)
         * supplyAsync方法接受一個生產者(Supplier)作為引數,返回一個CompletableFuture物件,
         * 該物件完成非同步執行後會讀取呼叫生產者方法的返回值。生產者方法會交由ForkJoinPool池中的某個執行執行緒(Executor)執行
         */
        return CompletableFuture.supplyAsync(() -> calculatePrice(product));
    }

    /**
     * 價格計算器
     *
     * @param product
     * @return
     */
    private double calculatePrice(String product) {
        delay(); //模擬價格計算的時間
        return new Random().nextDouble() * product.charAt(0) + product.charAt(1); //返回一個價格的隨機值
    }

    /**
     * 根據產品名稱返回一個字串列表,包含商店的名稱,該商店中指定商品的價格
     *
     * @param product
     * @return
     */
    public List<String> findPrices(String product) {
       /* *//**
         * 1.採用順序查詢所有商店的方式實現的findPrices方法
         * 對這4個商店的查詢是順序進行的,並且一個查詢操作會阻塞另一個,每一個操作都要花費大約1秒左右的時間計算請求商品的價格,
         * 所以該方法呼叫完成時間大概在4s
         *//*
        return shops.stream().map(shop -> String.format("%s price is %.2f",shop.getName(),shop.getPricce(product)))
                             .collect(Collectors.toList());*/

        /**
         * 2.使用並行流對請求進行並行操作
         *  使用並行流並行地從不同的商店獲取價格
         *  測試完成時間在1s多點
         */
      /*  return shops.parallelStream().map(shop -> String.format("%s price is %.2f",shop.getName(),shop.getPricce(product)))
                .collect(Collectors.toList());*/

        /**
         * 3.使用CompletableFuture 發起非同步請求
         * 為什麼下面要用兩個不同的流水線,而不是在同一個處理流的流水線上一個接一個地放置兩個map操作?
         * 關鍵點:Stream的延遲特性會引起順序執行
         */
        //使用CompletableFuture以非同步方式計算每種商品 的價格
       /* List<CompletableFuture<String>> priceFutureList = shops.stream().map(shop -> CompletableFuture.supplyAsync(() -> shop.getName() + " price is " + shop.getPricce(product)))
                .collect(Collectors.toList());
        //等待所有非同步操作結束
        return priceFutureList.stream().map(CompletableFuture::join).collect(Collectors.toList());*/

        /**
         * 使用定製的執行器優化
         * supplyAsync(Supplier<U> supplier,Executor executor)
         */
        List<CompletableFuture<String>> priceFutureList = shops.stream()
                .map(shop -> CompletableFuture.supplyAsync(() -> shop.getName() + " price is " + shop.getPricce(product),executor))
                .collect(Collectors.toList());
        return priceFutureList.stream().map(CompletableFuture::join).collect(Collectors.toList());
    }

    /**
     * 模擬1秒鐘延遲的方法
     */
    public static void delay() {
        try {
            Thread.sleep(1000L);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }
}
package com.h.java8;

import java.io.IOException;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;

public class TestMain {

    public static void main(String[] args) throws IOException {
        //使用非同步API進行價格查詢
        Shop shop = new Shop("BestShop");
        long start = System.nanoTime();
        Future<Double> futurePrice = shop.getPriceAsync("my favorite product");
        long invocationTime = ((System.nanoTime() - start) / 1_000_000);
        System.out.println("Invocation returned after " + invocationTime + " msecs");
        // 執行更多工,比如查詢其他商店
        doSomethingElse();
        // 在計算商品價格的同時
        try {
            Double price = futurePrice.get();
            System.out.printf("Price is %.2f%n", price);
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }
        long retrievalTime = ((System.nanoTime() - start) / 1_000_000);
        System.out.println("Price returned after " + retrievalTime + " msecs");

        System.out.println("++++++++++++++++++++++");
        long start2 = System.nanoTime();
        List<String> list = shop.findPrices("my favorite product");
        /**
         * 使用並行流的CompletableFuture版本測試:
         * 在Shop.shops的數量!=4時,兩者所用的時間差不多
         * 在Shop.shops的數量=4時,使用並行流(1s多點)比CompletableFuture(2s多點)快一倍
         * 並行流的CompletableFuture內部採用的是同樣的通用執行緒池,預設都使用固定數目的執行緒,
         * Stream底層依賴的是執行緒數量固定的通用執行緒池
         * 具體執行緒數取決於Runtime.getRuntime().availableProcessors()的返回值。
         * 然而,CompletableFuture具有一定的優勢,因為它允許你對執行器(Executor)進行配置,尤其是執行緒池的大小,
         * 讓它以更適合應用需求的方式進行配置,滿足程式的要求,而這是並行流API無法提供的.
         * 如何調整執行緒池的大小?
         * Nthreads = NCPU * UCPU * (1 + W/C)
         *  NCPU是處理器的核的數目,可以通過Runtime.getRuntime().availableProcessors()得到
         *  UCPU是期望的CPU利用率(該值應該介於0和1之間)
         *  W/C是等待時間與計算時間的比率
         */
        long retrievalTime2 = ((System.nanoTime() - start2) / 1_000_000);
        System.out.println(retrievalTime2);
        System.out.println(Runtime.getRuntime().availableProcessors());
        System.out.println(list);
    }

    public static void doSomethingElse() {
        try {
            Thread.sleep(100);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

並行——使用流還是CompletableFutures? 目前為止,你已經知道對集合進行平行計算有兩種方式:要麼將其轉化為並行流,利用map這樣的操作開展工作,要麼枚舉出集合中的每一個元素,建立新的執行緒,在Completable-Future內對其進行操作。後者提供了更多的靈活性,你可以調整執行緒池的大小,而這能幫助你確保整體的計算不會因為執行緒都在等待I/O而發生阻塞。 我們對使用這些API的建議如下。 ❑如果你進行的是計算密集型的操作,並且沒有I/O,那麼推薦使用Stream介面,因為實現簡單,同時效率也可能是最高的(如果所有的執行緒都是計算密集型的,那就沒有必要建立比處理器核數更多的執行緒)。 ❑反之,如果你並行的工作單元還涉及等待I/O的操作(包括網路連線等待),那麼使用 CompletableFuture靈活性更好,你可以像前文討論的那樣,依據等待/計算,或者W/C的比率設定需要使用的執行緒數。這種情況不使用並行流的另一個原因是,處理流的流水線中如果發生I/O等待,流的延遲特性會讓我們很難判斷到底什麼時候觸發了等待。

4.4對多個非同步任務進行流水線操作

package com.h.java8;

import java.util.Arrays;
import java.util.List;
import java.util.Random;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.stream.Collectors;

/**
 * <br>最佳價格查詢器</br>
 */
public class Shop {
    //商店名稱
    private String name;
    //商店列表
    private static List<Shop> shops;

    static {
        shops = Arrays.asList(new Shop("BestPrice"),
                new Shop("LetsSaveBig"),
                new Shop("MyFavoriteShop"),
                new Shop("BuyItAll"),
                new Shop("ShopEasy"),
                new Shop("ShopColl"));
    }

    public Shop(String name) {
        this.name = name;
    }

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    private static Executor executor = Executors.newFixedThreadPool(
            //1為了避免發生由於商店的數目過多導致伺服器超負荷而崩潰,設定一個上限100
            Math.min(shops.size(), 100),
            new ThreadFactory() {
                @Override
                public Thread newThread(Runnable r) {
                    Thread t = new Thread(r);
                    //使用守護執行緒——這種方式不會阻止程式的關停
                    t.setDaemon(true);
                    return t;
                }
            }
    );

    /**
     * 以ShopName:price:DiscountCode的格式返回一個String型別的值
     * @param product
     * @return
     */
    public String getPrice(String product) {
        double price = calculatePrice(product);
        Discount.Code code = Discount.Code.values()[new Random().nextInt(Discount.Code.values().length)];
        return String.format("%s:%.2f:%s", name, price, code);
    }
    
	public double getPrice2(String product){
        return calculatePrice(product);
    }
    /**
     * 價格計算器
     * @param product
     * @return
     */
    private double calculatePrice(String product) {
        delay(); //模擬價格計算的時間
        return new Random().nextDouble() * product.charAt(0) + product.charAt(1); //返回一個價格的隨機值
    }

    /**
     * 模擬1秒鐘延遲的方法
     */
    public static void delay() {
        try {
            Thread.sleep(1000L);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }


    /**
     * 根據產品名稱返回一個字串列表,包含商店的名稱,該商店中指定商品的價格
     * @param product
     * @return
     */
    public List<String> findPrices(String product) {
        /**
         * 1.以最簡單的方式實現使用Discount服務的findPrices方法
         */
        /*return shops.stream()
                .map(shop -> shop.getPrice(product)) //取得每個shop中商品的原始價格
                .map(Quote::parse) //在Quote物件中對shop返回的字串進行轉換
                .map(Discount::applyDiscount)
                .collect(Collectors.toList());*/
        /**
         * 2.使用CompletableFuture提供的特性,以非同步方式重新實現findPrices方法
         */
        List<CompletableFuture<String>> futureList = shops.stream()
                //以非同步方式取得每個shop中指定產品的原始價格
                .map(shop -> CompletableFuture.supplyAsync(() -> shop.getPrice(product), executor))
                //Quote物件存在時,對其返回的值進行轉換
                .map(future -> future.thenApply(Quote::parse))
                //使用另一個非同步任務構造期望的Future,申請折扣
                .map(future -> future.thenCompose(quote -> CompletableFuture.supplyAsync(() -> Discount.applyDiscount(quote), executor)))
                .collect(Collectors.toList());
        //等待流中的所有Future執行完畢,並提取各自的返回值
        return futureList.stream()
                            .map(CompletableFuture::join)
                            .collect(Collectors.toList());
    }

 /**
     * 假設原始的商品價格的單位為美元,現在需要把它轉為歐元
     * @param product
     * @return
     */
    public List<Double> findPrices2(String product) {
        List<CompletableFuture<Double>> futureList = shops.stream()
                .map(
                        shop -> CompletableFuture.supplyAsync(() -> shop.getPrice2("product"), executor)
                                /**
                                 * 合併兩個獨立的CompletableFuture物件
                                 */
                                .thenCombine(
                                        CompletableFuture.supplyAsync(() -> Shop.getRate()),
                                        (price, rate) -> price * rate
                                )

                )
                .collect(Collectors.toList());
        return futureList.stream()
      					  //呼叫get或者join方法只會造成阻塞,直到CompletableFuture完成才能繼續往下執行
                         .map(CompletableFuture::join)
                          .collect(Collectors.toList());

    }

    public static double getRate(){
        return 1.12;
    }
}
package com.h.java8;

public class Quote {
    private final String shopName;
    private final double price;
    private final Discount.Code discountCode;

    public Quote(String shopName, double price, Discount.Code code) {
        this.shopName = shopName;
        this