Java8 In Action-3.高效 Java 8 程式設計(四)
4.CompletableFuture:組合式非同步程式設計 4.1Future 介面 java.util.concurrent.Future介面在Java 5中被引入,設計初衷是對將來某個時刻會發生的結果進行建模。它建模了一種非同步計算,返回一個執行運算結果的引用,當運算結束後,這個引用被返回給呼叫方。在Future中觸發那些潛在耗時的操作把呼叫執行緒解放出來,讓它能繼續執行其他有價值的工作,不再需要呆呆等待耗時的操作完成。該介面表示一個非同步計算(即呼叫執行緒可以繼續執行,不會因為呼叫方法而阻塞)的結果。
/**
* 使用Future以非同步的方式執行一個耗時的操作
* Future的另一個優點是它比更底層的Thread更易用。
* 要使用Future,通常你只需要將耗時的操作封裝在一個Callable物件中,
* 再將它提交給ExecutorService,就萬事大吉了。
*/
ExecutorService executor = Executors.newCachedThreadPool();
Future<Object> future = executor.submit(new Callable<Object>() {
return
doSomeLongComputation();//以非同步方式在新的執行緒中執行耗時的操作
});
doSomethingElse();//非同步操作進行的同時,可以做其他的事情
try {
Object result = future.get(1, TimeUnit.SECONDS);
} catch (InterruptedException e) {//當前執行緒在等待過程中被中斷
e.printStackTrace();
} catch (ExecutionException e) {//計算丟擲一個異常
e.printStackTrace();
} catch (TimeoutException e) { //在Future物件完成前超時
e. printStackTrace();
}
4.2實現非同步API 將同步方法轉換為非同步方法 錯誤處理
4.3讓你的程式碼免受阻塞之苦 使用並行流對請求進行並行操作 使用CompletableFuture 發起非同步請求 使用定製的執行器
package com.h.java8;
import java.util.Arrays;
import java.util.List;
import java.util.Random;
import java.util.concurrent.*;
import java.util.stream.Collectors;
/**
* <br>最佳價格查詢器</br>
*/
public class Shop {
private static List<Shop> shops;
static {
shops = Arrays.asList(new Shop("BestPrice"),
new Shop("LetsSaveBig"),
new Shop("MyFavoriteShop"),
new Shop("BuyItAll"),
new Shop("ShopEasy"),
new Shop("ShopColl"));
}
/**
* 為“最優價格查詢器”應用定製的執行器
* 更充分地利用CPU資源
*/
private static Executor executor = Executors.newFixedThreadPool(
//1為了避免發生由於商店的數目過多導致伺服器超負荷而崩潰,設定一個上限100
Math.min(shops.size(), 100),
new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r);
//使用守護執行緒——這種方式不會阻止程式的關停
t.setDaemon(true);
return t;
}
}
);
private String name;//商店名稱
public Shop(String name) {
this.name = name;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
/**
* 依據指定產品名稱返回價格
*
* @param product
* @return
*/
public double getPricce(String product) {
return calculatePrice(product);
}
/**
* 將同步方法改為非同步方法
*
* @param product
* @return
*/
public Future<Double> getPriceAsync(String product) {
/*
//建立CompletableFuture物件,它包含非同步計算的結果
CompletableFuture<Double> futurePrice = new CompletableFuture<>();
//在另一個執行緒中以非同步的方式執行計算
new Thread(() -> {
try {
double price = calculatePrice(product);
futurePrice.complete(price);//需長時間計算的任務結束並得出結果時,設定Future的返回值
} catch (Exception e) {
//錯誤處理,最終向客戶端丟擲的異常
futurePrice.completeExceptionally(e);
}
}).start();
return futurePrice;
*/
/**
* 使用工廠方法supplyAsync建立CompletableFuture物件(和上面的程式碼是等價的)
* supplyAsync方法接受一個生產者(Supplier)作為引數,返回一個CompletableFuture物件,
* 該物件完成非同步執行後會讀取呼叫生產者方法的返回值。生產者方法會交由ForkJoinPool池中的某個執行執行緒(Executor)執行
*/
return CompletableFuture.supplyAsync(() -> calculatePrice(product));
}
/**
* 價格計算器
*
* @param product
* @return
*/
private double calculatePrice(String product) {
delay(); //模擬價格計算的時間
return new Random().nextDouble() * product.charAt(0) + product.charAt(1); //返回一個價格的隨機值
}
/**
* 根據產品名稱返回一個字串列表,包含商店的名稱,該商店中指定商品的價格
*
* @param product
* @return
*/
public List<String> findPrices(String product) {
/* *//**
* 1.採用順序查詢所有商店的方式實現的findPrices方法
* 對這4個商店的查詢是順序進行的,並且一個查詢操作會阻塞另一個,每一個操作都要花費大約1秒左右的時間計算請求商品的價格,
* 所以該方法呼叫完成時間大概在4s
*//*
return shops.stream().map(shop -> String.format("%s price is %.2f",shop.getName(),shop.getPricce(product)))
.collect(Collectors.toList());*/
/**
* 2.使用並行流對請求進行並行操作
* 使用並行流並行地從不同的商店獲取價格
* 測試完成時間在1s多點
*/
/* return shops.parallelStream().map(shop -> String.format("%s price is %.2f",shop.getName(),shop.getPricce(product)))
.collect(Collectors.toList());*/
/**
* 3.使用CompletableFuture 發起非同步請求
* 為什麼下面要用兩個不同的流水線,而不是在同一個處理流的流水線上一個接一個地放置兩個map操作?
* 關鍵點:Stream的延遲特性會引起順序執行
*/
//使用CompletableFuture以非同步方式計算每種商品 的價格
/* List<CompletableFuture<String>> priceFutureList = shops.stream().map(shop -> CompletableFuture.supplyAsync(() -> shop.getName() + " price is " + shop.getPricce(product)))
.collect(Collectors.toList());
//等待所有非同步操作結束
return priceFutureList.stream().map(CompletableFuture::join).collect(Collectors.toList());*/
/**
* 使用定製的執行器優化
* supplyAsync(Supplier<U> supplier,Executor executor)
*/
List<CompletableFuture<String>> priceFutureList = shops.stream()
.map(shop -> CompletableFuture.supplyAsync(() -> shop.getName() + " price is " + shop.getPricce(product),executor))
.collect(Collectors.toList());
return priceFutureList.stream().map(CompletableFuture::join).collect(Collectors.toList());
}
/**
* 模擬1秒鐘延遲的方法
*/
public static void delay() {
try {
Thread.sleep(1000L);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}
package com.h.java8;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
public class TestMain {
public static void main(String[] args) throws IOException {
//使用非同步API進行價格查詢
Shop shop = new Shop("BestShop");
long start = System.nanoTime();
Future<Double> futurePrice = shop.getPriceAsync("my favorite product");
long invocationTime = ((System.nanoTime() - start) / 1_000_000);
System.out.println("Invocation returned after " + invocationTime + " msecs");
// 執行更多工,比如查詢其他商店
doSomethingElse();
// 在計算商品價格的同時
try {
Double price = futurePrice.get();
System.out.printf("Price is %.2f%n", price);
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
long retrievalTime = ((System.nanoTime() - start) / 1_000_000);
System.out.println("Price returned after " + retrievalTime + " msecs");
System.out.println("++++++++++++++++++++++");
long start2 = System.nanoTime();
List<String> list = shop.findPrices("my favorite product");
/**
* 使用並行流的CompletableFuture版本測試:
* 在Shop.shops的數量!=4時,兩者所用的時間差不多
* 在Shop.shops的數量=4時,使用並行流(1s多點)比CompletableFuture(2s多點)快一倍
* 並行流的CompletableFuture內部採用的是同樣的通用執行緒池,預設都使用固定數目的執行緒,
* Stream底層依賴的是執行緒數量固定的通用執行緒池
* 具體執行緒數取決於Runtime.getRuntime().availableProcessors()的返回值。
* 然而,CompletableFuture具有一定的優勢,因為它允許你對執行器(Executor)進行配置,尤其是執行緒池的大小,
* 讓它以更適合應用需求的方式進行配置,滿足程式的要求,而這是並行流API無法提供的.
* 如何調整執行緒池的大小?
* Nthreads = NCPU * UCPU * (1 + W/C)
* NCPU是處理器的核的數目,可以通過Runtime.getRuntime().availableProcessors()得到
* UCPU是期望的CPU利用率(該值應該介於0和1之間)
* W/C是等待時間與計算時間的比率
*/
long retrievalTime2 = ((System.nanoTime() - start2) / 1_000_000);
System.out.println(retrievalTime2);
System.out.println(Runtime.getRuntime().availableProcessors());
System.out.println(list);
}
public static void doSomethingElse() {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
並行——使用流還是CompletableFutures? 目前為止,你已經知道對集合進行平行計算有兩種方式:要麼將其轉化為並行流,利用map這樣的操作開展工作,要麼枚舉出集合中的每一個元素,建立新的執行緒,在Completable-Future內對其進行操作。後者提供了更多的靈活性,你可以調整執行緒池的大小,而這能幫助你確保整體的計算不會因為執行緒都在等待I/O而發生阻塞。 我們對使用這些API的建議如下。 ❑如果你進行的是計算密集型的操作,並且沒有I/O,那麼推薦使用Stream介面,因為實現簡單,同時效率也可能是最高的(如果所有的執行緒都是計算密集型的,那就沒有必要建立比處理器核數更多的執行緒)。 ❑反之,如果你並行的工作單元還涉及等待I/O的操作(包括網路連線等待),那麼使用 CompletableFuture靈活性更好,你可以像前文討論的那樣,依據等待/計算,或者W/C的比率設定需要使用的執行緒數。這種情況不使用並行流的另一個原因是,處理流的流水線中如果發生I/O等待,流的延遲特性會讓我們很難判斷到底什麼時候觸發了等待。
4.4對多個非同步任務進行流水線操作
package com.h.java8;
import java.util.Arrays;
import java.util.List;
import java.util.Random;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.stream.Collectors;
/**
* <br>最佳價格查詢器</br>
*/
public class Shop {
//商店名稱
private String name;
//商店列表
private static List<Shop> shops;
static {
shops = Arrays.asList(new Shop("BestPrice"),
new Shop("LetsSaveBig"),
new Shop("MyFavoriteShop"),
new Shop("BuyItAll"),
new Shop("ShopEasy"),
new Shop("ShopColl"));
}
public Shop(String name) {
this.name = name;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
private static Executor executor = Executors.newFixedThreadPool(
//1為了避免發生由於商店的數目過多導致伺服器超負荷而崩潰,設定一個上限100
Math.min(shops.size(), 100),
new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r);
//使用守護執行緒——這種方式不會阻止程式的關停
t.setDaemon(true);
return t;
}
}
);
/**
* 以ShopName:price:DiscountCode的格式返回一個String型別的值
* @param product
* @return
*/
public String getPrice(String product) {
double price = calculatePrice(product);
Discount.Code code = Discount.Code.values()[new Random().nextInt(Discount.Code.values().length)];
return String.format("%s:%.2f:%s", name, price, code);
}
public double getPrice2(String product){
return calculatePrice(product);
}
/**
* 價格計算器
* @param product
* @return
*/
private double calculatePrice(String product) {
delay(); //模擬價格計算的時間
return new Random().nextDouble() * product.charAt(0) + product.charAt(1); //返回一個價格的隨機值
}
/**
* 模擬1秒鐘延遲的方法
*/
public static void delay() {
try {
Thread.sleep(1000L);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
/**
* 根據產品名稱返回一個字串列表,包含商店的名稱,該商店中指定商品的價格
* @param product
* @return
*/
public List<String> findPrices(String product) {
/**
* 1.以最簡單的方式實現使用Discount服務的findPrices方法
*/
/*return shops.stream()
.map(shop -> shop.getPrice(product)) //取得每個shop中商品的原始價格
.map(Quote::parse) //在Quote物件中對shop返回的字串進行轉換
.map(Discount::applyDiscount)
.collect(Collectors.toList());*/
/**
* 2.使用CompletableFuture提供的特性,以非同步方式重新實現findPrices方法
*/
List<CompletableFuture<String>> futureList = shops.stream()
//以非同步方式取得每個shop中指定產品的原始價格
.map(shop -> CompletableFuture.supplyAsync(() -> shop.getPrice(product), executor))
//Quote物件存在時,對其返回的值進行轉換
.map(future -> future.thenApply(Quote::parse))
//使用另一個非同步任務構造期望的Future,申請折扣
.map(future -> future.thenCompose(quote -> CompletableFuture.supplyAsync(() -> Discount.applyDiscount(quote), executor)))
.collect(Collectors.toList());
//等待流中的所有Future執行完畢,並提取各自的返回值
return futureList.stream()
.map(CompletableFuture::join)
.collect(Collectors.toList());
}
/**
* 假設原始的商品價格的單位為美元,現在需要把它轉為歐元
* @param product
* @return
*/
public List<Double> findPrices2(String product) {
List<CompletableFuture<Double>> futureList = shops.stream()
.map(
shop -> CompletableFuture.supplyAsync(() -> shop.getPrice2("product"), executor)
/**
* 合併兩個獨立的CompletableFuture物件
*/
.thenCombine(
CompletableFuture.supplyAsync(() -> Shop.getRate()),
(price, rate) -> price * rate
)
)
.collect(Collectors.toList());
return futureList.stream()
//呼叫get或者join方法只會造成阻塞,直到CompletableFuture完成才能繼續往下執行
.map(CompletableFuture::join)
.collect(Collectors.toList());
}
public static double getRate(){
return 1.12;
}
}
package com.h.java8;
public class Quote {
private final String shopName;
private final double price;
private final Discount.Code discountCode;
public Quote(String shopName, double price, Discount.Code code) {
this.shopName = shopName;
this