1. 程式人生 > >Reactive程式設計(二):程式碼演示

Reactive程式設計(二):程式碼演示

書接上文 Reactive程式設計 ,我們繼續用真實的程式碼來解釋一些概念。我們會更進一步理解Reactive的與眾不同以及它的功能。這些例子很抽象,但能夠讓我們更進一步理解用到的API和程式設計風格,真實的感受它的與眾不同。我們將會看到Reactive的核心元素,學習如何控制資料流,如果需要的話還會用到後臺執行緒進行處理。

建立專案

我們用Reactor庫來進行演示。當然也可以用其它的工具。如果不想拷貝黏貼程式碼,可以直接使用 github 上的示例專案。

可以使用Maven:

<dependency>
  <groupId>io.projectreactor</groupId
>
<artifactId>reactor-core</artifactId> <version>3.0.0.RC2</version> </dependency>

也可以使用Gradle:

compile 'io.projectreactor:reactor-core:3.0.0.RC2'

工作原理

Reactive由一系列事件以及釋出和訂閱這些事件的2個參與方組成的一個序列。我們也可以稱之為stream。如果需要,我們使用streams這個名詞,但是java8有一個java.util.Stream庫,與我們在這兒要講的概念是不同的,不要將這2個概念混淆。我們儘量集中闡述publisher和subscriber(Reactive Streams的行為)。

我們會使用Reactor庫,把publisher稱為 Flux(實現了Reactive Streams的Publisher介面),在RxJava庫中的名稱是Observable ,代表的是類似的概念。(Reactor2.0中的名稱為Stream,很容易跟Java 8 的 Streams混淆,因此我們只使用Reactor 3.0中的新定義)。

建立

Flux 是POJO型別的事件序列的一個Publisher,例如 Flux<T> 是型別 T 的一個Publisher。Flux 有一系列從不同的來源建立示例的靜態方法。例如從陣列建立 Flux


Flux<String> flux = Flux.just("red"
, "white", "blue");

我們建立了一個Flux,現在我們開始用它做一些事情。實際上只有2件事可以做:操作(轉換或與其它序列組合)和訂閱。

單值序列

我們經常遇到的序列往往只有一個元素,或者是沒有元素,例如通過id查詢記錄。在Reactor中Mono表示單值Flux或空Flux。Mono的API與Flux類似,但是更簡潔,因為不是所有的操作對單值序列有意義。RxJava中類似的型別叫Single,空序列叫Completable。在Reactor中空序列是Mono<Void>

操作符

Flux的絕大部分方法都是操作。在這兒我們不會把所有的方法都講一遍(可以看javadoc),我們只需要弄明白操作是什麼,可以做什麼。
例如,可以用log()將Flux的內部事件顯示出來,或者用map()進行轉換:


Flux<String> flux = Flux.just("red", "white", "blue");

Flux<String> upper = flux
  .log()
  .map(String::toUpperCase);

這段程式碼將輸入的字串轉換成大寫,非常簡單明瞭。同時很有意思的一點是(時刻注意,雖然剛開始不太習慣),資料並沒有開始處理。什麼都不會顯示,因為什麼都沒有發生(可以自己執行一下程式碼),呼叫Flux的操作符僅僅是建立了一個執行計劃。操作符實現的邏輯只有當資料開始流動時才會執行,當某一方訂閱這個Flux的時候。

Java 8 的 Streams 也有類似的處理資料流的方式:


Stream<String> stream = Streams.of("red", "white", "blue");
Stream<String> upper = stream.map(value -> {
    System.out.println(value);
    return value.toUpperCase();
});

但是Flux 和 Stream有非常大的差異,Stream的API不適用於Reactive。

訂閱

要讓資料流生效,我們需要用subscribe()方法來訂閱Flux,這些方法會回溯我們之前定義的操作鏈,請求publisher 產生資料。在我們的簡單示例中,字串集合會被遍歷進行處理。在更復雜的場景中,可能是從檔案系統讀取檔案,或者從資料庫中讀取資料,或者是呼叫一個http服務。

開始呼叫subscribe():


Flux.just("red", "white", "blue")
  .log()
  .map(String::toUpperCase)
.subscribe();

輸出內容:


09:17:59.665 [main] INFO reactor.core.publisher.FluxLog -  onSubscribe(reactor.core.publisher.FluxIterable$IterableSubscription@3ffc5af1)
09:17:59.666 [main] INFO reactor.core.publisher.FluxLog -  request(unbounded)
09:17:59.666 [main] INFO reactor.core.publisher.FluxLog -  onNext(red)
09:17:59.667 [main] INFO reactor.core.publisher.FluxLog -  onNext(white)
09:17:59.667 [main] INFO reactor.core.publisher.FluxLog -  onNext(blue)
09:17:59.667 [main] INFO reactor.core.publisher.FluxLog -  onComplete()

可以看到當subscribe()沒有引數時,會請求 publisher 傳送所有的資料 - 只有一個request並且是 “unbounded”。我們還可以看到釋出的每一項的回撥(onNext()),結束的回撥(onComplete()),以及原始訂閱的回撥(onSubscribe())。如果需要,我們還可以用Flux的doOn*()方法來監聽這些事件的回撥。

subscribe()方法是過載的,有很多變體。其中一個重要且常用的形式是帶回調引數。第一個引數是 Consumer ,用於每一個數據項的回撥,還可以增加一個可選的 Consumer 用於錯誤處理,以及一個序列完成後執行的 Runnable 。

例如,為每一個數據項增加回調:


Flux.just("red", "white", "blue")
    .log()
    .map(String::toUpperCase)
.subscribe(System.out::println);

輸出為:


09:56:12.680 [main] INFO reactor.core.publisher.FluxLog -  onSubscribe(reactor.core.publisher.FluxArray$ArraySubscription@59f99ea)
09:56:12.682 [main] INFO reactor.core.publisher.FluxLog -  request(unbounded)
09:56:12.682 [main] INFO reactor.core.publisher.FluxLog -  onNext(red)
RED
09:56:12.682 [main] INFO reactor.core.publisher.FluxLog -  onNext(white)
WHITE
09:56:12.682 [main] INFO reactor.core.publisher.FluxLog -  onNext(blue)
BLUE
09:56:12.682 [main] INFO reactor.core.publisher.FluxLog -  onComplete()

我們可以通過多種方法控制資料流使它變成 “bounded” 。用於控制的內部介面是從 Subscriber 獲取到的 Subscription 。與前面簡單呼叫 subscribe() 等價的複雜形式是:


.subscribe(new Subscriber<String>() {

    @Override
    public void onSubscribe(Subscription s) {
        s.request(Long.MAX_VALUE);
    }
    @Override
    public void onNext(String t) {
        System.out.println(t);
    }
    @Override
    public void onError(Throwable t) {
    }
    @Override
    public void onComplete() {
    }

});

想要控制資料流為一次消費2個數據項,可以更加智慧的使用Subscription :


.subscribe(new Subscriber<String>() {

    private long count = 0;
    private Subscription subscription;

    @Override
    public void onSubscribe(Subscription subscription) {
        this.subscription = subscription;
        subscription.request(2);
    }

    @Override
    public void onNext(String t) {
        count++;
        if (count>=2) {
            count = 0;
            subscription.request(2);
        }
     }
...

這個 Subscriber 每次會打包2個數據項。這個場景很普遍,因此我們會考慮把實現提取到一個專門的類中以方便使用。輸出如下:


09:47:13.562 [main] INFO reactor.core.publisher.FluxLog -  onSubscribe(reactor.core.publisher.FluxArray$ArraySubscription@61832929)
09:47:13.564 [main] INFO reactor.core.publisher.FluxLog -  request(2)
09:47:13.564 [main] INFO reactor.core.publisher.FluxLog -  onNext(red)
09:47:13.565 [main] INFO reactor.core.publisher.FluxLog -  onNext(white)
09:47:13.565 [main] INFO reactor.core.publisher.FluxLog -  request(2)
09:47:13.565 [main] INFO reactor.core.publisher.FluxLog -  onNext(blue)
09:47:13.565 [main] INFO reactor.core.publisher.FluxLog -  onComplete()

事實上批量訂閱是一個非常普遍的場景,因此 Flux 已經包含了相關的方法。上面的例子可以實現為:


Flux.just("red", "white", "blue")
  .log()
  .map(String::toUpperCase)
.subscribe(null, 2);

(注意subscribe方法帶了一個請求限制引數)輸出為:


10:25:43.739 [main] INFO reactor.core.publisher.FluxLog -  onSubscribe(reactor.core.publisher.FluxArray$ArraySubscription@4667ae56)
10:25:43.740 [main] INFO reactor.core.publisher.FluxLog -  request(2)
10:25:43.740 [main] INFO reactor.core.publisher.FluxLog -  onNext(red)
10:25:43.741 [main] INFO reactor.core.publisher.FluxLog -  onNext(white)
10:25:43.741 [main] INFO reactor.core.publisher.FluxLog -  request(2)
10:25:43.741 [main] INFO reactor.core.publisher.FluxLog -  onNext(blue)
10:25:43.741 [main] INFO reactor.core.publisher.FluxLog -  onComplete()

執行緒、排程和後臺處理

上面的示例中有一個有趣的特點是所有的log方法都是在主執行緒中執行的,即 subscribe() 呼叫者的執行緒。這是一個關鍵點:Reactor以儘可能少的執行緒來實現高效能。過去5年我們習慣於使用多執行緒、執行緒池和非同步處理來提升系統性能。對於這種新的思路可能會比較詫異。但是事實是:即使是JVM這種專門對執行緒處理做過優化的技術,執行緒切換的成本也是很高的。在單個執行緒上進行計算總是要快的多。Reactor給了我們進行非同步程式設計的方法,並且假設我們知道我們在做什麼。

Flux提供了一些方法來控制執行緒的邊界。例如,可以使用 Flux.subscribeOn() 配置一個訂閱在後臺執行緒中進行處理:


Flux.just("red", "white", "blue")
  .log()
  .map(String::toUpperCase)
  .subscribeOn(Schedulers.parallel())
.subscribe(null, 2);

輸出結果:


13:43:41.279 [parallel-1-1] INFO reactor.core.publisher.FluxLog -  onSubscribe(reactor.core.publisher.FluxArray$ArraySubscription@58663fc3)
13:43:41.280 [parallel-1-1] INFO reactor.core.publisher.FluxLog -  request(2)
13:43:41.281 [parallel-1-1] INFO reactor.core.publisher.FluxLog -  onNext(red)
13:43:41.281 [parallel-1-1] INFO reactor.core.publisher.FluxLog -  onNext(white)
13:43:41.281 [parallel-1-1] INFO reactor.core.publisher.FluxLog -  request(2)
13:43:41.281 [parallel-1-1] INFO reactor.core.publisher.FluxLog -  onNext(blue)
13:43:41.281 [parallel-1-1] INFO reactor.core.publisher.FluxLog -  onComplete()

可以看到訂閱和所有的處理都在 “parallel-1-1” 這個後臺執行緒中。單執行緒對於CPU密集型的處理來說是沒問題的。然而如果是IO密集型的處理就可能會阻塞。在這個場景中,我們希望處理儘可能的完成不至於阻塞呼叫方。一個執行緒池仍會提供很大的幫助,我們可以用 Schedulers.parallel() 獲取執行緒池。將單個數據項的處理拆分到獨立的執行緒中進行處理,我們需要把它放到獨立的釋出方中,每個釋出方都在後臺執行緒中請求執行結果。一種方法是呼叫 flatMap() 操作,會把資料項對映到一個 Publisher 並返回一個新型別的序列:


Flux.just("red", "white", "blue")
  .log()
  .flatMap(value ->
     Mono.just(value.toUpperCase())
       .subscribeOn(Schedulers.parallel()),
     2)
.subscribe(value -> {
  log.info("Consumed: " + value);
})

注意 flatMap() 把資料項放入一個子 publisher ,這樣可以控制每個子項的訂閱而不是整個序列的訂閱。Reactor內部的預設行為可以儘可能長的掛起在一個執行緒上,因此如果需要特定的資料項在後臺執行緒中處理,必須要明確的指明。事實上這是一系列強制進行平行計算的方法中的一種。

輸出內容:


15:24:36.596 [main] INFO reactor.core.publisher.FluxLog -  onSubscribe(reactor.core.publisher.FluxIterable$IterableSubscription@6f1fba17)
15:24:36.610 [main] INFO reactor.core.publisher.FluxLog -  request(2)
15:24:36.610 [main] INFO reactor.core.publisher.FluxLog -  onNext(red)
15:24:36.613 [main] INFO reactor.core.publisher.FluxLog -  onNext(white)
15:24:36.613 [parallel-1-1] INFO com.example.FluxFeaturesTests - Consumed: RED
15:24:36.613 [parallel-1-1] INFO reactor.core.publisher.FluxLog -  request(1)
15:24:36.613 [parallel-1-1] INFO reactor.core.publisher.FluxLog -  onNext(blue)
15:24:36.613 [parallel-1-1] INFO reactor.core.publisher.FluxLog -  onComplete()
15:24:36.614 [parallel-3-1] INFO com.example.FluxFeaturesTests - Consumed: BLUE
15:24:36.617 [parallel-2-1] INFO com.example.FluxFeaturesTests - Consumed: WHITE

現在是多個執行緒在進行處理,並且 flatMap() 中的批量引數保證只要可能每次都會處理2個數據項。Reactor會讓自己儘可能的聰明,預先從 Publisher 中提取資料項,並且估算訂閱方的等待時間。

Flux 還有一個 publishOn() 方法的作用類似,只不過控制的是釋出方的行為:


Flux.just("red", "white", "blue")
  .log()
  .map(String::toUpperCase)
  .subscribeOn(Schedulers.newParallel("sub"))
  .publishOn(Schedulers.newParallel("pub"), 2)
.subscribe(value -> {
    log.info("Consumed: " + value);
});

輸出內容:


15:12:09.750 [sub-1-1] INFO reactor.core.publisher.FluxLog -  onSubscribe(reactor.core.publisher.FluxIterable$IterableSubscription@172ed57)
15:12:09.758 [sub-1-1] INFO reactor.core.publisher.FluxLog -  request(2)
15:12:09.759 [sub-1-1] INFO reactor.core.publisher.FluxLog -  onNext(red)
15:12:09.759 [sub-1-1] INFO reactor.core.publisher.FluxLog -  onNext(white)
15:12:09.770 [pub-1-1] INFO com.example.FluxFeaturesTests - Consumed: RED
15:12:09.771 [pub-1-1] INFO com.example.FluxFeaturesTests - Consumed: WHITE
15:12:09.777 [sub-1-1] INFO reactor.core.publisher.FluxLog -  request(2)
15:12:09.777 [sub-1-1] INFO reactor.core.publisher.FluxLog -  onNext(blue)
15:12:09.777 [sub-1-1] INFO reactor.core.publisher.FluxLog -  onComplete()
15:12:09.783 [pub-1-1] INFO com.example.FluxFeaturesTests - Consumed: BLUE

注意訂閱方的回撥(內容為 “Consumed: …​”)執行在釋出方執行緒 pub-1-1 上。如果把 subscribeOn() 方法去掉,會發現所有的資料項的處理都線上程 pub-1-1 上。這再一次說明 Reactor 使用盡可能少的執行緒 - 如果沒有明確的指定要切換執行緒,下一個呼叫會在當前呼叫的執行緒上執行。

提取器:有副作用的訂閱者

另一種訂閱序列的方式是呼叫 Mono.block()Mono.toFuture()Flux.toStream() (這些是提取器方法,將 Reactive 型別轉換為阻塞型別)。Flux 還有 collectList() 和 collectMap() 將 Flux 轉換成 Mono。他們並沒有真正的訂閱序列,但是他們會拋棄控制訂閱單個數據項的能力。

警告:
一個黃金規則是“永遠不要呼叫提取器”。當然有一些例外,例如在測試程式中需要能夠通過阻塞來彙總結果。

這些方法用於將 Reactive 轉換為阻塞模式,當我們需要適配一個老式的API,例如Spring MVC的時候。在呼叫 Mono.block() 的時候,我們放棄了 Reactive Streams 所有優勢。這是 Reactive Streams 和 Java 8 Streams 的關鍵區別 - Java Stream只有 “all or nothing” 的訂閱模式,等同於 Mono.block()。當然 subscribe() 也會阻塞呼叫執行緒,因此與轉換方法一樣危險,但是有足夠的控制手段 - 可以用 subscribeOn() 防止阻塞,也可以通過背壓來將資料項進行溢位並且定時的決定是否繼續處理。

總結

這篇文章我們講述了 Reactive Streams 和 Reactor API 的基本概念。可以通過 GitHub 上的示例程式碼或者是 Lite RX Hands On 實驗專案來進一步瞭解。在下一篇文章中我們會更深入的發掘 Reactive 模型中的阻塞、分發、非同步等方面,並且展示能夠真正受益的機會。