Java 8原生API也可以開發響應式程式碼?
前段時間工作上比較忙,這篇文章一直沒來得及寫,本文是閱讀《Java8實戰》的時候,瞭解到Java 8裡已經提供了一個非同步非阻塞的介面(CompletableFuture),可以實現簡單的響應式程式設計的模式,因此用這篇文章做個梳理。我是帶著下面這幾個問題去學習CompletableFuture這個介面的,
- CompletableFuture是為了解決什麼問題而設計的?
- 它的使用場景是什麼?開源軟體中有實戰使用案例嗎?
- CompletableFuture的常用API都有哪些?如何使用?
- CompletableFuture和RxJava有什麼不同?
這篇文章梳理下來,基本上可以回答前面四個問題,OK,我們進入正文。
基本概念
RPC(遠端方法呼叫)的四種方式有:oneway、sync、future和callback,在dubbo或bolt這類通訊框架中,預設使用的是sync模式(同步+阻塞),future和callback都屬於非同步模式,不過future模式在get的時候會阻塞,callback模式則不需要等待結果,有結果後服務端會回撥請求方。
非同步呼叫這類模式,比較適合的場景是IO密集型場景,要執行很多遠端呼叫的任務,並且這些呼叫耗時可能比較久。以openwrite中的一個case為例:我釋出一篇文章,需要給幾個不同的寫作平臺建立文章,這時候我不希望這個過程是順序的,就比較適合用非同步呼叫模式。
Future模式除了在get()呼叫的時候會阻塞外,還有其他的侷限性,例如:沒有使用Java Lambda表示式的優勢,對一連串的非同步呼叫可以支援,但是寫出來的程式碼會比較複雜。
CompletableFuture的常用API
閱讀CompletableFuture的API的時候,我有一個體會——CompletableFuture之於Future,除了增加了回撥這個最重要的特性,其他的特性有點像Stream對於集合迭代的增強。
使用CompletableFuture,我們可以像Stream一樣使用一部呼叫,可以處理一些級聯的非同步呼叫(類似於Stream裡的flatMap)、可以過濾一些無用的非同步呼叫(anyOf、allOf)。
下面這張圖是我按照自己的理解,梳理除了CompletableFuture常見的API,閱讀的時候需要注意下面幾個點:
- 把握幾個大的分類:建立CompletableFuture、獲取CompletableFuture的執行結果、主動結束CompletableFuture、非同步呼叫任務的組合處理;
- 看著方法多,但是有規律可循,例如apply字樣的介面,傳入的方法引數都是有返回值的;
- 帶either字樣的,都是多個非同步任務有一個滿足條件即可的;
- 帶executor方法的,都表示該方法可以用自定義的執行緒池來優化效能。
Dubbo專案中的使用案例
Dubbo對於非同步化的支援起始在2.6.x中就有提供,是在釋出bean的時候加個屬性配置——async=true,然後利用上下文將非同步標識一層層傳遞下去。在之前的公司中有一次排查dubbo(當時我們用的是dubbox)非同步呼叫的問題,最後查到的原因就是多個非同步呼叫,上下文裡的資訊串了。
Dubbo 2.7 中使用了 JDK1.8 提供的 CompletableFuture 原生介面對自身的非同步化做了改進。CompletableFuture 可以支援 future 和 callback 兩種呼叫方式。在Dubbo最新的master程式碼中,我知道了Dubbo的非同步結果的定義,它的類圖如下,可以看出AsyncRpcResult是一個CompletableFuture介面的實現。
實戰Demo
通過下面的例子,可以看出CompletableFuture的最大好處——callback特性。首先定義一個介面,其中包括同步介面和該介面的非同步版本。
public interface AsyncInterfaceExample {
String computeSomeThine();
CompletableFuture<String> computeSomeThingAsync();
}
然後定義該介面的實現類,可以看出,如果要講現有的同步介面非同步化,是比較容易的;
public class AsyncInterfaceExampleImpl implements AsyncInterfaceExample {
@Override
public String computeSomeThine() {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
return "hello, world";
}
@Override
public CompletableFuture<String> computeSomeThingAsync() {
return CompletableFuture.supplyAsync(this::computeSomeThine);
}
}
然後看下我們的測試case,如下:
public class AsyncInterfaceExampleTest {
private static String getOtherThing() {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "other";
}
public static void main(String[] args) {
AsyncInterfaceExample asyncInterfaceExample = new AsyncInterfaceExampleImpl();
//case1 同步呼叫
long start = System.currentTimeMillis();
String someThing = asyncInterfaceExample.computeSomeThine();
String other = getOtherThing();
System.out.println("cost:" + (System.currentTimeMillis() - start) + " result:" + someThing + other);
//case2 非同步呼叫,使用回撥
start = System.currentTimeMillis();
CompletableFuture<String> someThingFuture = asyncInterfaceExample.computeSomeThingAsync();
other = getOtherThing();
long finalStart = start;
String finalOther = other;
someThingFuture.whenComplete((returnValue, exception) -> {
if (exception == null) {
System.out.println(
"cost:" + (System.currentTimeMillis() - finalStart) + " result:" + returnValue + finalOther);
} else {
exception.printStackTrace();
}
});
}
}
上面這個案例的執行結果如下圖所示:
***
本號(javaadu)專注於後端技術、JVM問題排查和優化、Java面試題、個人成長和自我管理等主題,為讀者提供一線開發者的工作和成長經驗,期待你能在這裡有所收穫