Java8新特性之:CompletableFuture
1.Future接口
Future設計的初衷:對將來某個時刻會發生的結果進行建模。
它建模了一種異步計算,返回一個執行運算結果的引用,當運算結束後,這個引用被返回給調用方。在Future中出發那些潛在耗時的操作把調用線程解放出來,讓它能繼續執行其他有價值的工作,不再需要等待耗時的操作完成。
Future的優點:比更底層的Thread更易用。
要使用Future,通常只需要將耗時的操作封裝在一個Callable對象中,再將它提交給ExecutorService。
ExecutorService executor = Executors.newCachedThreadPool(); Future<Double> future = executor.submit(new Callable<Double>() { //向ExecutorService提交一個Callable對象 @Override public Double call() throws Exception { return doSomeLongComputation(); //以異步方式在新的線程中執行耗時的操作 } }); doSomethingElse(); //異步操作進行的同時,你可以做其他的事情 try { //獲取異步操作的結果,如果最終被阻塞,無法得到結果,那麽在最多等待1秒鐘之後退出 Double result = future.get(1, TimeUnit.SECONDS); } catch (ExecutionException ee) { //加上拋出一個異常 } catch (InterruptedException ie) { //當前線程在等待過程中被中斷 } catch (TimeoutException te) { //在Future對象完成之前超過已過期 }
2. 實現異步API、代碼避免阻塞
使用工廠方法supplyAsync創建CompletableFuture
public Future<Double> getPriceAsync2(String product) { return CompletableFuture.supplyAsync(() -> calculatePrice(product)); }
supplyAsync方法接受一個生產者(Supplier)作為參數,返回一個CompletableFuture對象,該對象完成異步執行後會讀取調用生產者方法的返回值。
生產者方法會交由ForkJoinPool池中的某個執行線程(Executor)運行,但是你也可以使用supplyAsync方法的重載版本,傳遞第二個參數指定不同的執行線程執行生產者方法。
join方法等待異步操作結束
CompletableFuture類中的join方法和Future接口中的get有相同的含義,等待運行結束。並且也聲明在Future接口中,唯一的不同是join方法不會拋出任何檢測到的異常。因此使用它時不需要再使用try/catch語句塊。
public List<String> findPrices(String product) { //使用CompletableFuture以異步方式計算每種商品的價格 List<CompletableFuture<String>> priceFutures = shops.stream() .map(shop -> CompletableFuture.supplyAsync(() -> shop.getName() + " price is " + shop.getPriceAsync(product))) .collect(Collectors.toList()); //等待所有異步操作結束 return priceFutures.stream(). map(CompletableFuture::join) .collect(Collectors.toList()); }
使用定制執行器
創建一個配有線程池的執行器。
線程數的選擇:N(threads) = N(CPU) * U(CPU) * (1 + W/C)
-- N(CPU):處理器的核的數目,可以通過Runtime.getRuntime().availableProcessors()得到;
-- U(CPU):期望的CPU利用率(該值應該介於0和1之間);
-- W/C:等待時間與計算時間的比率。
//創建一個線程池,線程池中線程的數目為100和商店數目二者中較小的一個值(這裏100為線程池的上限) private final Executor executor = Executors.newFixedThreadPool(Math.min(shops.size(), 100), new ThreadFactory() { @Override public Thread newThread(Runnable r) { Thread t = new Thread(r); t.setDaemon(true); //使用守護線程--這種方式不會阻止程序的關停 return t; } });
集合進行並行計算有兩種方式:並行流和CompletableFutures。
-- 計算密集型操作,並且沒有I/O,推薦使用Stream接口。因為實現簡單,同時效率也可能是最高的(如果所有的線程都是計算密集型的,那就沒有必要創建比處理器核數更多的線程);
-- 如果並行的工作單元還涉及等待I/O的操作(包括網絡連接等待),那麽使用CompletableFuture靈活性更好。這種情況下處理流的流水線中如果發生I/O等待,流的延遲特性會讓我們很難判斷到底什麽時候觸發了等待。
3. 對多個異步任務進行流水線操作
thenApply:將一個由字符串轉換Quote的方法作為參數傳遞給他
thenCompose:該方法允許你對兩個異步操作進行流水線,第一個操作完成時,將其結果作為參數傳遞給第二個操作。
對第一個CompletableFuture對象調用thenCompose,並向其傳遞一個函數。當第一個CompletableFuture執行完畢後,他的結果將作為該函數的參數,這個函數的返回值是以第一個CompletableFuture的返回做輸入計算出的第二 個CompletableFuture對象。
使用thenCompose減少很多線程切換開銷。
thenCombine:將兩個CompletableFuture對象結果整合起來。該方法接收名為BiFunction的第二參數,這個參數定義了兩個CompletableFuture對象完成計算後,如何合並。
thenAccept:方法接收CompletableFuture執行完畢後的返回值做參數。不必等待那些還未返回的結果。
Java8新特性之:CompletableFuture