1. 程式人生 > >《Java 8 in Action》Chapter 11:CompletableFuture:組合式非同步程式設計

《Java 8 in Action》Chapter 11:CompletableFuture:組合式非同步程式設計

某個網站的資料來自Facebook、Twitter和Google,這就需要網站與網際網路上的多個Web服務通訊。可是,你並不希望因為等待某些服務的響應,阻塞應用程式的執行,浪費數十億寶貴的CPU時鐘週期。比如,不要因為等待Facebook的資料,暫停對來自Twitter的資料處理。

第7章中介紹的分支/合併框架以及並行流是實現並行處理的寶貴工具;它們將一個操作切分為多個子操作,在多個不同的核、CPU甚至是機器上並行地執行這些子操作。與此相反,如果你的意圖是實現併發,而非並行,或者你的主要目標是在同一個CPU上執行幾個鬆耦合的任務,充分利用CPU的核,讓其足夠忙碌,從而最大化程式的吞吐量,那麼你其實真正想做的是避免因為等待遠端服務的返回,或者對資料庫的查詢,而阻塞執行緒的執行,浪費寶貴的計算資源,因為這種等待的時間很可能相當長。

1. Future介面

Future介面在Java 5中被引入,設計初衷是對將來某個時刻會發生的結果進行建模。它建模了一種非同步計算,返回一個執行運算結果的引用,當運算結束後,這個引用被返回給呼叫方。在Future中觸發那些潛在耗時的操作把呼叫執行緒解放出來,讓它能繼續執行其他有價值的工作,不再需要等待耗時的操作完成。Future的另一個優點是它比更底層的Thread更易用。要使用Future,通常你只需要將耗時的操作封裝在一個Callable物件中,再將它提交給ExecutorService。使用Future以非同步的方式執行一個耗時的操作:

執行緒可以在ExecutorService以併發方式呼叫另一個執行緒執行耗時操作的同時,去執行一些其他的任務。接著,如果你已經執行到沒有非同步操作的結果就無法繼續任何有意義的工作時,可以呼叫它的get方法去獲取操作的結果。如果操作已經完成,該方法會立刻返回操作的結果,否則它會阻塞你的執行緒,直到操作完成,返回相應的結果。如果該長時間執行的操作永遠不返回了會怎樣?Future提供了一個無需任何引數的get方法,推薦使用過載版本的get方法,它接受一個超時的引數,可以定義執行緒等待Future結果的最長時間,避免無休止的等待。下圖是Future非同步執行執行緒原理圖。

2. 使用CompletableFuture構建非同步應用

Future介面有一定的侷限性,比如,我們很難表述Future結果之間的依賴性。因此我們引入了CompletableFuture。接下來通過一個“最佳價格查詢器“的應用,它會查詢多個線上商店,依據給定的產品或服務找出最低的價格,來展現CompletableFuture實現非同步應用。通過此例你能學到這些:

  • 如何編寫非同步API
  • 如何讓使用同步API的程式碼變為非阻塞程式碼
  • 如何使用流水線將兩個接續的非同步操作合併為一個非同步計算操作
  • 如何以響應式的方式處理非同步操作的完成事件

同步API和非同步API:

  • 同步API其實只是對傳統方法呼叫的另一種稱呼:你呼叫了某個方法,呼叫方在被呼叫方執行的過程中會等待,被呼叫方執行結束返回,呼叫方取得被呼叫方的返回值並繼續執行。即使呼叫方和被呼叫方在不同的執行緒中執行,呼叫方還是需要等待被呼叫方結束執行,這就是阻塞式呼叫這個名詞的由來。
  • 非同步API會直接返回,或者至少在被呼叫方計算完成之前,將它剩餘的計算任務交給另一個執行緒去做,該執行緒和呼叫方是非同步的——這就是非阻塞式呼叫的由來。執行剩餘計算任務的執行緒會將它的計算結果返回給呼叫方。返回的方式要麼是通過回撥函式,要麼是由呼叫方再次執行一個“等待,直到計算完成”的方法呼叫。

2.1 實戰:實現非同步API

2.1.1 同步方法

同步操作中會為等待同步事件完成而等待1s,這種是無法接受的,對於程式體驗來說是非常不好的。

2.1.2 將同步方法轉換為非同步方法

Java 5引入了java.util.concurrent.Future介面表示一個非同步計算(即呼叫執行緒可以繼續執行,不會因為呼叫方法而阻塞)的結果。這意味著Future是一個暫時還不可知值的處理器,這個值在計算完成後,可以通過呼叫它的get方法取得。這種方式下,在進行價格查詢的同時,還能執行一些其他的任務,比如查詢其他商店中商品的價格,不會阻塞在那裡等待第一家商店返回請求的結果。最後,如果所有有意義的工作都已經完成,所有要執行的工作都依賴於商品價格時,再呼叫Future的get方法。執行了這個操作後,要麼獲得Future中封裝的值(如果非同步任務已經完成),要麼發生阻塞,直到該非同步任務完成,期望的值能夠訪問。同時,如果某個商品價格計算髮生異常,會將當前執行緒殺死,從而導致等待get方法返回結果的客戶端永久地被阻塞。客戶端可以使用過載版本的get方法,設定超時引數來避免。為了讓客戶端能瞭解無法提供請求商品價格的原因,你需要使用CompletableFuture的completeExceptionally方法將導致CompletableFuture內發生問題的異常丟擲。

2.1.3 使用工廠方法supplyAsync建立CompletableFuture物件

supplyAsync方法接受一個生產者(Supplier)作為引數,返回一個CompletableFuture物件,該物件完成非同步執行後會讀取呼叫生產者方法的返回值。生產者方法會交由ForkJoinPool池中的某個執行執行緒(Executor)執行,但是你也可以使用supplyAsync方法的過載版本,傳遞第二個引數指定不同的執行執行緒執行生產者方法。

3. 消除程式碼阻塞問題

3.1 順序同步請求

3.2 使用並行流對請求進行並行操作

3.3 使用CompletableFuture發起非同步請求

CompletableFuture版本的程式似乎比並行流版本的程式還快那麼一點兒。但是最後這個版本也不太令人滿意。它們看起來不相伯仲,究其原因都一樣:它們內部採用的是同樣的通用執行緒池,預設都使用固定數目的執行緒,具體執行緒數取決於Runtime.getRuntime().availableProcessors()的返回值。然而,CompletableFuture具有一定的優勢,因為它允許你對執行器(Executor)進行配置,尤其是執行緒池的大小,讓它以更適合應用需求的方式進行配置,滿足程式的要求,而這是並行流API無法提供的。
順序執行和並行執行的原理對比:

圖11-4的上半部分展示了使用單一流水線處理流的過程,我們看到,執行的流程(以虛線標識)是順序的。事實上,新的CompletableFuture物件只有在前一個操作完全結束之後,才能建立。與此相反,圖的下半部分展示瞭如何先將CompletableFutures物件聚集到一個列表中(即圖中以橢圓表示的部分),讓物件們可以在等待其他物件完成操作之前就能啟動。

3.4 使用CompletableFuture發起非同步請求WithExecutor

3.5 呼叫結果:

3.6 並行——使用流還是CompletableFutures

目前為止,你已經知道對集合進行平行計算有兩種方式:要麼將其轉化為並行流,利用map這樣的操作開展工作,要麼枚舉出集合中的每一個元素,建立新的執行緒,在CompletableFuture內對其進行操作。後者提供了更多的靈活性,你可以調整執行緒池的大小,而這能幫助你確保整體的計算不會因為執行緒都在等待I/O而發生阻塞。

  • 如果你進行的是計算密集型的操作,並且沒有I/O,那麼推薦使用Stream介面,因為實現簡單,同時效率也可能是最高的(如果所有的執行緒都是計算密集型的,那就沒有必要建立比處理器核數更多的執行緒)。
  • 如果你並行的工作單元還涉及等待I/O的操作(包括網路連線等待),那麼使用CompletableFuture靈活性更好,你可以像前文討論的那樣,依據等待/計算,或者 W/C的比率設定需要使用的執行緒數。這種情況不使用並行流的另一個原因是,處理流的流水線中如果發生I/O等待,流的延遲性會讓我們很難判斷到底什麼時候觸發了等待。

4. 對多個非同步任務進行流水線操作

4.1 案例

通過在shop構成的流上採用流水線方式執行三次map操作,我們得到了結果。

  • 第一個操作將每個shop物件轉換成了一個字串,該字串包含了該 shop中指定商品的價格和折扣程式碼。
  • 第二個操作對這些字串進行了解析,在Quote物件中對它們進行轉換。
  • 第三個map會操作聯絡遠端的Discount服務,計算出最終的折扣價格,並返回該價格及提供該價格商品的shop。

程式碼如圖:



原理圖:

Java 8的CompletableFuture API提供了名為thenCompose的方法,它就是專門為這一目的而設計的,thenCompose方法允許你對兩個非同步操作進行流水線,第一個操作完成時,將其結果作為引數傳遞給第二個操作。換句話說,你可以建立兩個CompletableFutures物件,對第一個CompletableFuture物件呼叫thenCompose,並向其傳遞一個函式。當第一個 CompletableFuture執行完畢後,它的結果將作為該函式的引數,這個函式的返回值是以第一 個CompletableFuture的返回做輸入計算出的第二個CompletableFuture物件。thenCompose方法像CompletableFuture類中的其他方法一樣,也提供了一個以Async字尾結尾的版本thenComposeAsync。通常而言,名稱中不帶Async的方法和它的前一個任務一樣,在同一個執行緒中執行;而名稱以Async結尾的方法會將後續的任務提交到一個執行緒池,所以每個任務是由不同的執行緒處理的。

4.2 thenCombine方法

將兩個CompletableFuture物件結合起來,無論他們是否存在依賴。thenCombine方法,它接收名為BiFunction的第二引數,這個引數 定義了當兩個CompletableFuture物件完成計算後,結果如何合併。同thenCompose方法一樣, thenCombine方法也提供有一個Async的版本。這裡,如果使用thenCombineAsync會導致BiFunction中定義的合併操作被提交到執行緒池中,由另一個任務以非同步的方式執行。

程式碼圖:

原理圖:

4.3 響應CompletableFuture的completion事件

Java 8的CompletableFuture通過thenAccept方法提供了這一功能,它接收 CompletableFuture執行完畢後的返回值做引數。thenAccept方法也提供 了一個非同步版本,名為thenAcceptAsync。非同步版本的方法會對處理結果的消費者進行排程, 從執行緒池中選擇一個新的執行緒繼續執行,不再由同一個執行緒完成CompletableFuture的所有任 務。因為你想要避免不必要的上下文切換,更重要的是你希望避免在等待執行緒上浪費時間,儘快響應CompletableFuture的completion事件,所以這裡沒有采用非同步版本。

4.3.1 實戰

5. 小結

  • 執行比較操作時,尤其是那些依賴一個或多個遠端服務的操作,使用非同步任務可以改善程式的效能,加快程式的響應速度。
  • 你應該儘可能地為客戶提供非同步API。使用CompletableFuture類提供的特性,你能夠輕鬆地實現這一目標。
  • CompletableFuture類還提供了異常管理的機制,讓你有機會丟擲/管理非同步任務執行中發生的異常。
  • 將同步API的呼叫封裝到一個CompletableFuture中,你能夠以非同步的方式使用其結果。
  • 如果非同步任務之間相互獨立,或者它們之間某一些的結果是另一些的輸入,你可以將這些非同步任務構造或者合併成一個。
  • 你可以為CompletableFuture註冊一個回撥函式,在Future執行完畢或者它們計算的結果可用時,針對性地執行一些程式。
  • 你可以決定在什麼時候結束程式的執行,是等待由CompletableFuture物件構成的列表中所有的物件都執行完畢,還是隻要其中任何一個首先完成就中止程式的執行。

資源獲取

  • 公眾號回覆 : Java8 即可獲取《Java 8 in Action》中英文版!

Tips

  • 歡迎收藏和轉發,感謝你的支援!(๑•̀ㅂ•́)و✧
  • 歡迎關注我的公眾號:後端小哥,專注後端開發,希望和你一起進步!

相關推薦

Java 8 in Action》Chapter 1為什麼要關心Java 8

自1998年 JDK 1.0(Java 1.0) 釋出以來,Java 已經受到了學生、專案經理和程式設計師等一大批活躍使用者的歡迎。這一語言極富活力,不斷被用在大大小小的專案裡。從 Java 1.1(1997年) 一直到 Java 7(2011年),Java 通過增加新功能,不斷得到良好的升級。Java 8

Java 8 in Action》Chapter 3Lambda表示式

1. Lambda簡介 可以把Lambda表示式理解為簡潔地表示可傳遞的匿名函式的一種方式:它沒有名稱,但它有引數列表、函式主體、返回型別,可能還有一個可以丟擲的異常列表。 匿名——我們說匿名,是因為它不像普通的方法那樣有一個明確的名稱:寫得少而想得多! 函式——我們說它是函式,是因為Lambda函式不像方

Java 8 in Action》Chapter 4引入流

1. 流簡介 流是Java API的新成員,它允許你以宣告性方式處理資料集合(通過查詢語句來表達,而不是臨時編寫一個實現)。就現在來說,你可以把它們看成遍歷資料集的高階迭代器。此外,流還可以透明地並行處理。讓我們來看一個例項返回低熱量(<400)的菜餚名稱: Java7版本: List<Dish&

Java 8 in Action》Chapter 6用流收集資料

1. 收集器簡介 collect() 接收一個型別為 Collector 的引數,這個引數決定了如何把流中的元素聚合到其它資料結構中。Collectors 類包含了大量常用收集器的工廠方法,toList() 和 toSet() 就是其中最常見的兩個,除了它們還有很多收集器,用來對資料進行對複雜的轉換。 指令式

Java 8 in Action》Chapter 7並行資料處理與效能

在Java 7之前,並行處理資料集合非常麻煩。第一,你得明確地把包含資料的資料結構分成若干子部分。第二,你要給每個子部分分配一個獨立的執行緒。第三,你需要在恰當的時候對它們進行同步來避免不希望出現的競爭條件,等待所有執行緒完成,最後把這些部分結果合併起來。Java 7引入了一個叫作分支/合併的框架,讓這些操

Java 8 in Action》Chapter 9預設方法

傳統上,Java程式的介面是將相關方法按照約定組合到一起的方式。實現介面的類必須為介面中定義的每個方法提供一個實現,或者從父類中

Java 8 in Action》Chapter 10用Optional取代null

1965年,英國一位名為Tony Hoare的電腦科學家在設計ALGOL W語言時提出了null引用的想法。ALGOL W是第一批在堆上分配記錄的型別語言之一。Hoare選擇null引用這種方式,“只是因為這種方法實現起來非常容易”。雖然他的設計初衷就是要“通過編譯器的自動檢測機制,確保所有使用引用的地方都

Java 8 in Action》Chapter 11CompletableFuture組合式非同步程式設計

某個網站的資料來自Facebook、Twitter和Google,這就需要網站與網際網路上的多個Web服務通訊。可是,你並不希望因為等待某些服務的響應,阻塞應用程式的執行,浪費數十億寶貴的CPU時鐘週期。比如,不要因為等待Facebook的資料,暫停對來自Twitter的資料處理。 第7章中介紹的分支/合

Java 8 in Action》Chapter 8重構、測試和除錯

我們會介紹幾種方法,幫助你重構程式碼,以適配使用Lambda表示式,讓你的程式碼具備更好的可讀性和靈活性。除此之外,我們還會討論目前比較流行的幾種面向物件的設計模式, 包括策略模式、模板方法模式、觀察者模式、責任鏈模式,以及工廠模式,在結合Lambda表示式之後變得更簡潔的情況。最後,我們會介紹如何測試和除

Java 8 Lambda表示式實現設計模式命令模式

在這篇部落格裡,我將說明如何在使用 Java 8 Lambda表示式 的函數語言程式設計方式 時實現 命令 設計模式 。命令模式的目標是將請求封裝成一個物件,從對客戶端的不同型別請求,例如佇列或日誌請求引數化,並提供相應的操作。命令模式是一種通用程式設計方式,該方式基於執行

《Java8實戰》-第十一章筆記(CompletableFuture組合式非同步程式設計

CompletableFuture:組合式非同步程式設計 最近這些年,兩種趨勢不斷地推動我們反思我們設計軟體的方式。第一種趨勢和應用執行的硬體平臺相關,第二種趨勢與應用程式的架構相關,尤其是它們之間如何互動。我們在第7章中已經討論過硬體平臺的影響。我們注意到隨著多核處理器的出現,提升應用程式處理速度最有效的

Java 8新特性之CompletableFuture:組合式非同步程式設計

隨著多核處理器的出現,提升應用程式的處理速度最有效的方式就是可以編寫出發揮多核能力的軟體,我們已經可以通過切分大型的任務,讓每個子任務並行執行,使用執行緒的方式,分支/合併框架(Java 7) 和並行流(Java 8)來實現。 現在很多大型的網際網路公司都對外提供了API服務,比如百度的地圖,微博的新聞,天

Java 8實戰(Java 8 in action)學習總結(三)

Streams API可以表達複雜的資料處理查詢。常用的流操作如下表: 你可以使用filter、distinct、skip和limit對流做篩選和切片。 你可以使用map和flatMap提取或轉換流中的元素。 你可以使用findFirst和findAny方法查詢流中的元素。你可以allMatch、none

Ask HN: Have you moved beyond Java 8 in production?

Have you moved to Java 9 , 10 or 11 in production ? Yes / No ? How big is your source code ? Is it a legacy code ? Does it have automated tests ? How much

Java 8 In Action之引用特定型別的任意物件的例項方法

此種引用型別名稱原文為:reference to an instance method of an arbitrary object of a particular type 今天在和同學討論另外一個問題的時候(直接導致這個問題只有明天再解決了),突然爭論到能不能用類來呼叫

java8學習CompetableFuture組合式非同步程式設計

內容來自《 java8實戰 》,本篇文章內容均為非盈利,旨為方便自己查詢、總結備份、開源分享。如有侵權請告知,馬上刪除。書籍購買地址:java8實戰 如果你的意圖是實現併發,而非並行,或者你的主要目標是在同一個CPU上執行集合鬆耦合的任務,充分利用CPU的核,讓其足夠忙碌,從而最大化程式

JavaScript是如何工作的事件迴圈和非同步程式設計的崛起 + 5種使用 async/await 更好地編碼方式!

摘要: 深度理解JS事件迴圈!!! 原文:JavaScript是如何工作的:事件迴圈和非同步程式設計的崛起+ 5種使用 async/await 更好地編碼方式! 作者:前端小智 Fundebug經授權轉載,版權歸原作者所有。 此篇是 JavaScript是如何工作的第四篇,其它三篇可以看這

【譯】JavaScript的工作原理事件迴圈及非同步程式設計的出現和 5 種更好的 async/await 程式設計方式

此篇是JavaScript的工作原理的第四篇,其它三篇可以看這裡: 【譯】JavaScript的工作原理:引擎,執行時和呼叫堆疊的概述 【譯】JavaScript的工作原理:V8引擎內部+關於如何編寫優化程式碼的5個技巧 【譯】JavaScript的工作原理:記憶體管理和4種常見的記憶體洩漏

nodejs學習筆記二閉包和非同步程式設計

閉包到底是什麼鬼 閉包就是函式��,但是它可以繼承並訪問它自身被宣告的那個作用域裡的變數。當你將一個回撥函式作為引數傳遞給另外一個進行i/o操作的函式時,回撥函式稍後會被呼叫,神奇的是,在被呼叫時,回撥函式會記住它自身宣告時所在的上下文,並且可以訪問該上下文及

有了 CompletableFuture,使得非同步程式設計沒有那麼難了!

本文導讀: 業務需求場景介紹 技術設計方案思考 Future 設計模式實戰 CompletableFuture 模式實戰 CompletableFuture 生產建議 CompletableFuture 效能測試 CompletableFuture 使用擴充套件 1、業務需求場景介紹 不變的東西就是一