1. 程式人生 > 實用技巧 >聽說你還不知道CompletableFuture?

聽說你還不知道CompletableFuture?

java8已經在日常開發編碼中非常普遍了,掌握運用好它可以在開發中運用幾行精簡程式碼就可以完成所需功能。
今天將介紹CompletableFuture的在生產環境如何使用實踐。CompletableFuture類作為Java 8 Concurrency API改進而引入,熟悉的同學應該瞭解在Java 9 也有對CompletableFuture有一些改進,橘子之後再進入講解。
閱讀這篇文章需要知道的前置知識點有,函式語言程式設計,執行緒池原理等。還不熟悉的同學可以看看之前的文章,話不多說,開始吧。

為了更好的表達,我們結合例子講解,假設今天小橘收到TL任務,要求完成實時拉取資料的功能,完成後告知拉取完成。假設拉取資料需要從A,B,C三個服務中獲取,拉取完成推送需要呼叫D服務。

需求變更1:拉取資料需要從E服務獲取,但是會依賴從A服務獲取的結果。
需求變更2:從A服務一次能拉去一萬+資料,但是E服務的效能支撐不了大呼叫,在Provider端有限流兜底。
需求變更3:拉取資料過程中需要保證資料完整性,不能出現統計錯誤。

為什麼使用CompletableFuture

橘友們說了,這個可以用jdk5.0提供的Future來實現,我們將拉取資料需要用到的從A,B ,C三個服務介面放到FutureTask中,非同步的去執行獲取資料結果,然後再同步呼叫D服務。
OK,簡單實現這個功能沒有問題,但是有什麼缺陷,需要怎麼可以改進嘛?
我們通過原始碼註釋可以看到Future類返回的結果需要阻塞等待get方法返回結果,它提供了isDone()方法檢測非同步計算是否已經結束,get()方法等待非同步操作結束,以及獲取計算的結果。等到所有Future任務完成,通知執行緒獲取結果併合並。

從效能上,需要等待 Future 集合中的所有任務都完成(此需求沒問題,接著往下看), 從健壯性上,Futrue介面沒有方法去進行計算組合或者處理可能出現的錯誤。從功能擴充套件上,Future介面無法進行多個非同步計算之間相互獨立,同時第二個又依賴於第一個的結果。而今天的主角CompletableFuture都可以滿足上述功能,具有大約50種不同的構成,結合,執行非同步計算步驟和處理錯誤。(全部學習完所有方法是不現實的,掌握靈魂和核心方法即可依法炮製)

CompletableFuture API 使用

API太多,簡單列舉。讀者自行學習即可,本文重點不在介紹api

/**
任務A執行完執行B,執行B不需要依賴A的結果同時B不返回結果。
*/
CompletableFuture.supplyAsync(()->"resultA").thenRun(()->{});
/**
任務A執行完執行B,B執行依賴A結果同時B不返回結果
*/
CompletableFuture.supplyAsync(()->"resultA").thenAccept(resultA->{});
/**
任務A執行完執行B,B執行依賴A結果同時B返回結果
*/
CompletableFuture.supplyAsync(()->"resultA").thenApply(resultA->resultA+"resultB");
CompletableFuture<String>completableFuture
=CompletableFuture.supplyAsync(()->"orange")
.thenCompose(s->CompletableFuture.supplyAsync(()->s+"csong"));
//true
assertEquals("orangecsong",completableFuture.get());

你的疑問:該thenCompose方法不和thenApply一樣實現結果合併計算嘛?

剛學習時候確實有點迷惑,其實他們的內部形式是不一樣的,它們與Java 8中可用的Stream和Optional類的map和flatMap方法是有著類似的設計思路在裡面的。都是接收一個CompletableFuture並將其應用於計算結果,但thenCompose(flatMap)方法接收一個函式,該函式返回相同型別的另一個CompletableFuture物件。

CompletableFuture<String>completableFuture
=CompletableFuture.supplyAsync(()->"orange")
.thenCombine(CompletableFuture.supplyAsync(
()->"chizongzi"),(s1,s2)->s1+s2));

assertEquals("orangechizongzi",completableFuture.get());

thenCombine方法旨在當你想要使用多個計算結果時,而後續的處理同時需要依賴返回值,第一個計算結果返回 "orange",第二個計算結果返回 "chizongzi",對結果進行拼接,那麼結果就是"orange chizongzi" 啦。你可能會問如果結果無需處理呢?thenAcceptBoth將可以實現你的功能。那麼它和thenApply的區別又是啥呢?
thenCompose()方法是使用前一個Future作為引數。它會直接使結果變新的Future,而不是我們在thenApply()中到的巢狀Future,而是用來連線兩個CompletableFuture,是生成一個新的CompletableFuture,因此,如果想要繼續巢狀連結CompletableFuture 方法,那麼最好使用thenCompose()。

publicstaticCompletableFuture<Void>allOf(CompletableFuture<?>...cfs){...}

當我們需要並行執行多個任務時,我們通常希望等待所有它們執行,然後處理它們的組合結果。CompletableFuture提供了allOf靜態方法允許等待所有的完成任務,但是它返回型別是CompletableFuture 。侷限性在於它不會返回所有任務的綜合結果。相反,你必須手動從Futures獲取結果。那麼怎麼解決呢,CompletableFuture提供了join()可以解決,這裡小橘用Stream實現同樣可以的。

StringmultiFutures=Stream.of(future1,future2,future3)
.map(CompletableFuture::join)
.collect(Collectors.joining(""));

assertEquals("Todayissun",multiFutures);

那麼 CompletableFuture 針對異常是如何處理的呢?

publicCompletableFuture<T>exceptionally(Function<Throwable,?extendsT>fn);
public<U>CompletionStage<U>handle(BiFunction<?superT,Throwable,?extendsU>fn);
CompletableFuture.supplyAsync(()->"resultA")
.thenApply(resultA->resultA+"resultB")
.thenApply(resultB->resultB+"resultC")

//如果resultA,resultB,resultC在獲取中有異常

CompletableFuture<String>future=CompletableFuture.supplyAsync(()->{
thrownewRuntimeException();
}).exceptionally(ex->"errorResultA")
.thenApply(resultA->resultA+"resultB")
.thenApply(resultB->resultB+"resultC")

上面的程式碼中,任務 A 丟擲異常,然後通過exceptionally() 方法處理了異常,並返回新的結果,這個新的結果將傳遞給任務 B。如果inovke future.join方法結果將會輸出 "errorResultA resultB result C"

上述方法基本就是底層函式式api的使用,聰明的橘友們實踐起來吧!

CompletableFuture 例子

Talk is cheap , show me code。自從上篇 你還在擔心rpc介面超時嗎 文章末尾講述大批量呼叫,其中是順序invoke呼叫,其實我們分析,非同步呼叫利用CompletableFuture需要怎麼實現呢?

/**
*@Description:
*@author:orangeCs
*@create:2020-06-25
*/
publicclassAsyncInvokeUtil{

privateAsyncInvokeUtil(){}

/**
*@paramparamList源資料(需處理資料載體)
*@parambuildParam中轉函式(獲取的結果做一層trans,來滿足呼叫服務條件)
*@paramtransParam中轉函式(獲取的結果做一層trans,來滿足呼叫服務條件)
*@paramprocessFunction中轉處理函式
*@paramsize分批大小
*@paramexecutorService暴露外部自定義實現執行緒池(demo沒判空,可以做成非必傳)
*@param<R>
*@param<T>
*@param<P>
*@param<k>
*@return
*@throwsExecutionException
*@throwsInterruptedException
*/
publicstatic<R,T,P,k>List<R>partitionAsyncInvokeWithRes(List<T>paramList,
Function<List<T>,P>buildParam,
Function<P,List<k>>transParam,
Function<List<k>,List<R>>processFunction,
Integersize,
ExecutorServiceexecutorService)throwsExecutionException,InterruptedException{
List<CompletableFuture<List<R>>>completableFutures=Lists.partition(paramList,size).stream()
.map(buildParam)
.map(transParam)
.map(eachList->CompletableFuture.supplyAsync(()->
processFunction.apply(eachList),executorService))
.collect(Collectors.toList());
//get
CompletableFuture<Void>finishCompletableFuture=CompletableFuture.allOf(completableFutures.toArray(newCompletableFuture[0]));
finishCompletableFuture.get();
returncompletableFutures.stream().map(CompletableFuture::join)
.filter(Objects::nonNull).reduce(newArrayList<>(),(resList1,resList2)->{
resList1.addAll(resList2);
returnresList1;
});
}

}

僅僅這一篇文章是不夠的,任何知識都是長期積累,反覆思考才能變成自己的東西,在浮躁的社會,我們年輕人切勿浮躁,今天介紹到這裡了,喜歡博主的朋友們記得點個關注哦。

> 本文由部落格群發一文多發等運營工具平臺 [OpenWrite](https://openwrite.cn?from=article_bottom) 釋出