1. 程式人生 > >Java 8原生API也可以開發響應式程式碼?

Java 8原生API也可以開發響應式程式碼?

前段時間工作上比較忙,這篇文章一直沒來得及寫,本文是閱讀《Java8實戰》的時候,瞭解到Java 8裡已經提供了一個非同步非阻塞的介面(CompletableFuture),可以實現簡單的響應式程式設計的模式,因此用這篇文章做個梳理。我是帶著下面這幾個問題去學習CompletableFuture這個介面的,

  1. CompletableFuture是為了解決什麼問題而設計的?
  2. 它的使用場景是什麼?開源軟體中有實戰使用案例嗎?
  3. CompletableFuture的常用API都有哪些?如何使用?
  4. CompletableFuture和RxJava有什麼不同?

這篇文章梳理下來,基本上可以回答前面四個問題,OK,我們進入正文。

基本概念


RPC(遠端方法呼叫)的四種方式有:oneway、sync、future和callback,在dubbo或bolt這類通訊框架中,預設使用的是sync模式(同步+阻塞),future和callback都屬於非同步模式,不過future模式在get的時候會阻塞,callback模式則不需要等待結果,有結果後服務端會回撥請求方。

非同步呼叫這類模式,比較適合的場景是IO密集型場景,要執行很多遠端呼叫的任務,並且這些呼叫耗時可能比較久。以openwrite中的一個case為例:我釋出一篇文章,需要給幾個不同的寫作平臺建立文章,這時候我不希望這個過程是順序的,就比較適合用非同步呼叫模式。

Future模式除了在get()呼叫的時候會阻塞外,還有其他的侷限性,例如:沒有使用Java Lambda表示式的優勢,對一連串的非同步呼叫可以支援,但是寫出來的程式碼會比較複雜。

CompletableFuture的常用API

閱讀CompletableFuture的API的時候,我有一個體會——CompletableFuture之於Future,除了增加了回撥這個最重要的特性,其他的特性有點像Stream對於集合迭代的增強。

使用CompletableFuture,我們可以像Stream一樣使用一部呼叫,可以處理一些級聯的非同步呼叫(類似於Stream裡的flatMap)、可以過濾一些無用的非同步呼叫(anyOf、allOf)。

下面這張圖是我按照自己的理解,梳理除了CompletableFuture常見的API,閱讀的時候需要注意下面幾個點:

  1. 把握幾個大的分類:建立CompletableFuture、獲取CompletableFuture的執行結果、主動結束CompletableFuture、非同步呼叫任務的組合處理;
  2. 看著方法多,但是有規律可循,例如apply字樣的介面,傳入的方法引數都是有返回值的;
  3. 帶either字樣的,都是多個非同步任務有一個滿足條件即可的;
  4. 帶executor方法的,都表示該方法可以用自定義的執行緒池來優化效能。

Dubbo專案中的使用案例

Dubbo對於非同步化的支援起始在2.6.x中就有提供,是在釋出bean的時候加個屬性配置——async=true,然後利用上下文將非同步標識一層層傳遞下去。在之前的公司中有一次排查dubbo(當時我們用的是dubbox)非同步呼叫的問題,最後查到的原因就是多個非同步呼叫,上下文裡的資訊串了。

Dubbo 2.7 中使用了 JDK1.8 提供的 CompletableFuture 原生介面對自身的非同步化做了改進。CompletableFuture 可以支援 future 和 callback 兩種呼叫方式。在Dubbo最新的master程式碼中,我知道了Dubbo的非同步結果的定義,它的類圖如下,可以看出AsyncRpcResult是一個CompletableFuture介面的實現。

實戰Demo

通過下面的例子,可以看出CompletableFuture的最大好處——callback特性。首先定義一個介面,其中包括同步介面和該介面的非同步版本。

public interface AsyncInterfaceExample {

    String computeSomeThine();

    CompletableFuture<String> computeSomeThingAsync();
}

然後定義該介面的實現類,可以看出,如果要講現有的同步介面非同步化,是比較容易的;

public class AsyncInterfaceExampleImpl implements AsyncInterfaceExample {

    @Override
    public String computeSomeThine() {
        try {
            Thread.sleep(2000);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
        return "hello, world";
    }

    @Override
    public CompletableFuture<String> computeSomeThingAsync() {
        return CompletableFuture.supplyAsync(this::computeSomeThine);
    }
}

然後看下我們的測試case,如下:

public class AsyncInterfaceExampleTest {

    private static String getOtherThing() {
        try {
            Thread.sleep(2000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        return "other";
    }

    public static void main(String[] args) {
        AsyncInterfaceExample asyncInterfaceExample = new AsyncInterfaceExampleImpl();

        //case1 同步呼叫
        long start = System.currentTimeMillis();
        String someThing = asyncInterfaceExample.computeSomeThine();
        String other = getOtherThing();
        System.out.println("cost:" + (System.currentTimeMillis() - start) + "  result:" + someThing + other);

        //case2 非同步呼叫,使用回撥
        start = System.currentTimeMillis();
        CompletableFuture<String> someThingFuture = asyncInterfaceExample.computeSomeThingAsync();
        other = getOtherThing();

        long finalStart = start;
        String finalOther = other;
        someThingFuture.whenComplete((returnValue, exception) -> {
            if (exception == null) {
                System.out.println(
                    "cost:" + (System.currentTimeMillis() - finalStart) + "  result:" + returnValue + finalOther);
            } else {
                exception.printStackTrace();
            }
        });
    }
}

上面這個案例的執行結果如下圖所示:
***
本號(javaadu)專注於後端技術、JVM問題排查和優化、Java面試題、個人成長和自我管理等主題,為讀者提供一線開發者的工作和成長經驗,期待你能在這裡有所收穫