1. 程式人生 > >5分鐘理解 SpringBoot 響應式的核心-Reactor

5分鐘理解 SpringBoot 響應式的核心-Reactor

目錄

  • 一、前言
  • 二、 Mono 與 Flux
    • 構造器
  • 三、 流計算
    • 1. 緩衝
    • 2. 過濾/提取
    • 3. 轉換
    • 4. 合併
    • 5. 合流
    • 6. 累積
  • 四、異常處理
  • 五、執行緒排程
  • 小結
  • 參考閱讀

一、前言

關於 響應式 Reactive,前面的兩篇文章談了不少概念,基本都離不開下面兩點:

  • 響應式程式設計是面向流的、非同步化的開發方式
  • 響應式是非常通用的概念,無論在前端領域、還是實時流、離線處理場景中都是適用的。

有興趣的朋友可以看看這兩篇文章:

Reactive(1) 從響應式程式設計到“好萊塢”
Reactive(2) 響應式流與制奶廠業務

這次,我們把目光轉向 SpringBoot,在SpringBoot 2.0版本之後,提供了對響應式程式設計的全面支援。
因此在升級到 2.x版本之後,便能方便的實現事件驅動模型的後端程式設計,這其中離不開 webflux這個模組。
其同時也被 Spring 5 用作開發響應式 web 應用的核心基礎。 那麼, webflux 是一個怎樣的東西?

Webflux

Webflux 模組的名稱是 spring-webflux,名稱中的 Flux 來源於 Reactor 中的類 Flux。
該模組中包含了對 響應式 HTTP、伺服器推送 和 WebSocket 的支援。

Webflux 支援兩種不同的程式設計模型:

  • 第一種是 Spring MVC 中使用的基於 Java 註解的方式,一個使用Reactive風格的Controller如下所示:
@RestController
public class EchoController {
    @GetMapping("/echo")
    public Mono<String> sayHelloWorld() {
        return Mono.just("Echo!");
    }
}
  • 第二種是 基於 Java 8 的 lambda 表示式的函數語言程式設計模型。

這兩種程式設計模型只是在程式碼編寫方式上存在不同,但底層的基礎模組仍然是一樣的。
除此之外,Webflux 可以執行在支援 Servlet 3.1 非阻塞 IO API 的 Servlet 容器上,或是其他非同步執行時環境,如 Netty 和 Undertow。

關於Webflux 與 SpringMVC 的區別,可以參考下圖:

SpringBoot、Webflux、Reactor 可以說是層層包含的關係,其中,響應式能力的核心仍然是來自 Reactor元件。
由此可見,掌握Reactor的用法 必然是熟練進行 Spring 響應式程式設計的重點。

二、 Mono 與 Flux

在理解響應式Web程式設計之前,我們需要對Reactor 兩個核心概念做一些澄清,一個是Mono,另一個是Flux。

Flux 表示的是包含 0 到 N 個元素的非同步序列。在該序列中可以包含三種不同型別的訊息通知:

  • 正常的包含元素的訊息
  • 序列結束的訊息
  • 序列出錯的訊息

當訊息通知產生時,訂閱者中對應的方法 onNext(), onComplete()和 onError()會被呼叫。

Mono 表示的是包含 0 或者 1 個元素的非同步序列。該序列中同樣可以包含與 Flux 相同的三種類型的訊息通知。
Flux 和 Mono 之間可以進行轉換,比如對一個 Flux 序列進行計數操作,得到的結果是一個 Mono物件,或者把兩個 Mono 序列合併在一起,得到的是一個 Flux 物件。

構造器

Reactor提供了非常方便的API來建立 Flux、Mono 物件,如下:

使用靜態工廠類建立Flux

Flux.just("Hello", "World").subscribe(System.out::println);
Flux.fromArray(new Integer[] {1, 2, 3}).subscribe(System.out::println);
Flux.empty().subscribe(System.out::println);
Flux.range(1, 10).subscribe(System.out::println);
Flux.interval(Duration.of(10, ChronoUnit.SECONDS)).subscribe(System.out::println);
  • just():可以指定序列中包含的全部元素。創建出來的 Flux 序列在釋出這些元素之後會自動結束。
  • fromArray():可以從一個數組、Iterable 物件或 Stream 物件中建立 Flux 物件。
  • empty():建立一個不包含任何元素,只發布結束訊息的序列。
  • range(int start, int count):建立包含從 start 起始的 count 個數量的 Integer 物件的序列。
  • interval(Duration period)和 interval(Duration delay, Duration period):建立一個包含了從 0 開始遞增的 Long 物件的序列。其中包含的元素按照指定的間隔來發布。除了間隔時間之外,還可以指定起始元素髮布之前的延遲時間。

除了上述的方式之外,還可以使用 generate()、create()方法來自定義流資料的產生過程:

generate()

Flux.generate(sink -> {
    sink.next("Echo");
    sink.complete();
}).subscribe(System.out::println);

generate 只提供序列中單個訊息的產生邏輯(同步通知),其中的 sink.next()最多隻能呼叫一次,比如上面的程式碼中,產生一個Echo訊息後就結束了。

create()

Flux.create(sink -> {
    for (char i = 'a'; i <= 'z'; i++) {
        sink.next(i);
    }
    sink.complete();
}).subscribe(System.out::print);

create 提供的是整個序列的產生邏輯,sink.next()可以呼叫多次(非同步通知),如上面的程式碼將會產生a-z的小寫字母。

使用靜態工廠類建立Mono

Mono 的建立方式與 Flux 是很相似的。 除了Flux 所擁有的構造方式之外,還可以支援與Callable、Runnable、Supplier 等介面整合。

參考下面的程式碼:

Mono.fromSupplier(() -> "Mono1").subscribe(System.out::println);
Mono.justOrEmpty(Optional.of("Mono2")).subscribe(System.out::println);
Mono.create(sink -> sink.success("Mono3")).subscribe(System.out::println);

三、 流計算

1. 緩衝

在Reactive(1) 從響應式程式設計到“好萊塢” 一文中曾經提到過緩衝(buffer)的概念。
buffer 是流處理中非常常用的一種處理,意思就是將流的一段截停後再做處理。

比如下面的程式碼:

Flux.range(1, 100).buffer(20).subscribe(System.out::println);
Flux.interval(Duration.of(0, ChronoUnit.SECONDS),
              Duration.of(1, ChronoUnit.SECONDS))
        .buffer(Duration.of(5, ChronoUnit.SECONDS)).
        take(2).toStream().forEach(System.out::println);
Flux.range(1, 10).bufferUntil(i -> i % 2 == 0).subscribe(System.out::println);
Flux.range(1, 10).bufferWhile(i -> i % 2 == 0).subscribe(System.out::println);

第一個buffer(20)是指湊足20個數字後再進行處理,該語句會輸出5組資料(按20分組)
第二個buffer(Duration duration)是指湊足一段時間後的資料再近些處理,這裡是5秒鐘做一次處理
第三個bufferUtil(Predicate p)是指等到某個元素滿足斷言(條件)時進行收集處理,這裡將會輸出[1,2],[3,4]..這樣的奇偶數字對
第四個bufferWhile(Predicate p)則僅僅是收集滿足斷言(條件)的元素,這裡將會輸出2,4,6..這樣的偶數

與 buffer 類似的是window函式,後者的不同在於其在緩衝截停後並不會輸出一些元素列表,而是直接轉換為Flux物件,如下:

Flux.range(1, 100).window(20)
      .subscribe(flux -> 
           flux.buffer(5).subscribe(System.out::println));

window(20)返回的結果是一個Flux型別的物件,我們進而對其進行了緩衝處理。
因此上面的程式碼會按5個一組輸出:

[1, 2, 3, 4, 5]
[6, 7, 8, 9, 10]
[11, 12, 13, 14, 15]
...

2. 過濾/提取

上面的bufferWhile 其實充當了過濾的作用,當然,對於流元素的過濾也可以使用filter函式來處理:

Flux.range(1, 10).filter(i -> i % 2 == 0).subscribe(System.out::println);

take 函式可以用來提取想要的元素,這與filter 過濾動作是恰恰相反的,來看看take的用法:

Flux.range(1, 10).take(2).subscribe(System.out::println);
Flux.range(1, 10).takeLast(2).subscribe(System.out::println);
Flux.range(1, 10).takeWhile(i -> i < 5).subscribe(System.out::println);
Flux.range(1, 10).takeUntil(i -> i == 6).subscribe(System.out::println);

第一個take(2)指提取前面的兩個元素;
第二個takeLast(2)指提取最後的兩個元素;
第三個takeWhile(Predicate p)指提取滿足條件的元素,這裡是1-4
第四個takeUtil(Predicate p)指一直提取直到滿足條件的元素出現為止,這裡是1-6

3. 轉換

使用map函式可以將流中的元素進行個體轉換,如下:

Flux.range(1, 10).map(x -> x*x).subscribe(System.out::println);

這裡的map使用的JDK8 所定義的 Function介面

4. 合併

某些情況下我們需要對兩個流中的元素進行合併處理,這與合併兩個陣列有點相似,但結合流的特點又會有不同的需求。

使用zipWith函式可以實現簡單的流元素合併處理:

Flux.just("I", "You")
        .zipWith(Flux.just("Win", "Lose"))
        .subscribe(System.out::println);
Flux.just("I", "You")
        .zipWith(Flux.just("Win", "Lose"), 
        (s1, s2) -> String.format("%s!%s!", s1, s2))
        .subscribe(System.out::println);

上面的程式碼輸出為:

[I,Win]
[You,Lose]
I!Win!
You!Lose!

第一個zipWith輸出的是Tuple物件(不可變的元祖),第二個zipWith增加了一個BiFunction來實現合併計算,輸出的是字串。

注意到zipWith是分別按照元素在流中的順序進行兩兩合併的,合併後的流長度則最短的流為準,遵循最短對齊原則。

用於實現合併的還有 combineLastest函式,combinLastest 會動態的將流中新產生元素(末位)進行合併,注意是隻要產生新元素都會觸發合併動作併產生一個結果元素,如下面的程式碼:

Flux.combineLatest(
        Arrays::toString,
        Flux.interval(Duration.of(0, ChronoUnit.MILLIS), 
            Duration.of(100, ChronoUnit.MILLIS)).take(2),
        Flux.interval(Duration.of(50, ChronoUnit.MILLIS), 
            Duration.of(100, ChronoUnit.MILLIS)).take(2)
).toStream().forEach(System.out::println);

輸出為:

[0, 0]
[1, 0]
[1, 1]

5. 合流

與合併比較類似的處理概念是合流,合流的不同之處就在於元素之間不會產生合併,最終流的元素個數(長度)是兩個源的個數之和。
合流的計算可以使用 merge或mergeSequential 函式,這兩者的區別在於:

merge後的元素是按產生時間排序的,而mergeSequential 則是按整個流被訂閱的時間來排序,如下面的程式碼:

Flux.merge(Flux.interval(
            Duration.of(0, ChronoUnit.MILLIS),
            Duration.of(100, ChronoUnit.MILLIS)).take(2),
        Flux.interval(
                Duration.of(50, ChronoUnit.MILLIS),
                Duration.of(100, ChronoUnit.MILLIS)).take(2))
        .toStream()
        .forEach(System.out::println);
System.out.println("---");
Flux.mergeSequential(Flux.interval(
        Duration.of(0, ChronoUnit.MILLIS),
        Duration.of(100, ChronoUnit.MILLIS)).take(2),
        Flux.interval(
                Duration.of(50, ChronoUnit.MILLIS),
                Duration.of(100, ChronoUnit.MILLIS)).take(2))
        .toStream()
        .forEach(System.out::println);

輸出為:

0
0
1
1
---
0
1
0
1

merge 是直接將Flux 元素進行合流之外,而flatMap則提供了更加高階的處理:
flatMap 函式會先將Flux中的元素轉換為 Flux(流),然後再新產生的Flux進行合流處理,如下:

Flux.just(1, 2)
        .flatMap(x -> Flux.interval(Duration.of(x * 10, ChronoUnit.MILLIS),
                Duration.of(10, ChronoUnit.MILLIS)).take(x))
        .toStream()
        .forEach(System.out::println);

flatMap也存在flatMapSequential的一個兄弟版本,後者決定了合併流元素的順序是與流的訂閱順序一致的。

6. 累積

reduce 和 reduceWith 操作符對流中包含的所有元素進行累積操作,得到一個包含計算結果的 Mono 序列。累積操作是通過一個 BiFunction 來表示的。reduceWith 允許在在操作時指定一個起始值(與第一個元素進行運算)

如下面的程式碼:

Flux.range(1, 100).reduce((x, y) -> x + y).subscribe(System.out::println);
Flux.range(1, 100).reduceWith(() -> 100, (x, y) -> x + y).subscribe(System.out::println);

這裡通過reduce計算出1-100的累加結果(1+2+3+...100),結果輸出為:

5050
5150

四、異常處理

在前面所提及的這些功能基本都屬於正常的流處理,然而對於異常的捕獲以及採取一些修正手段也是同樣重要的。

利用Flux/Mono 框架可以很方便的做到這點。

將正常訊息和錯誤訊息分別列印

Flux.just(1, 2)
        .concatWith(Mono.error(new IllegalStateException()))
        .subscribe(System.out::println, System.err::println);

當產生錯誤時預設返回0

Flux.just(1, 2)
        .concatWith(Mono.error(new IllegalStateException()))
        .onErrorReturn(0)
        .subscribe(System.out::println);

自定義異常時的處理

Flux.just(1, 2)
        .concatWith(Mono.error(new IllegalArgumentException()))
        .onErrorResume(e -> {
            if (e instanceof IllegalStateException) {
                return Mono.just(0);
            } else if (e instanceof IllegalArgumentException) {
                return Mono.just(-1);
            }
            return Mono.empty();
        })
        .subscribe(System.out::println);

當產生錯誤時重試

Flux.just(1, 2)
        .concatWith(Mono.error(new IllegalStateException()))
        .retry(1)
        .subscribe(System.out::println);

這裡的retry(1)表示最多重試1次,而且重試將從訂閱的位置開始重新發送流事件

五、執行緒排程

我們說過,響應式是非同步化的,那麼就會涉及到多執行緒的排程。

Reactor 提供了非常方便的排程器(Scheduler)工具方法,可以指定流的產生以及轉換(計算)釋出所採用的執行緒排程方式。
這些方式包括:

類別 描述
immediate 採用當前執行緒
single 單一可複用的執行緒
elastic 彈性可複用的執行緒池(IO型)
parallel 並行操作優化的執行緒池(CPU計算型)
timer 支援任務排程的執行緒池
fromExecutorService 自定義執行緒池

下面,以一個簡單的例項來演示不同的執行緒排程:

Flux.create(sink -> {
    sink.next(Thread.currentThread().getName());
    sink.complete();
})
.publishOn(Schedulers.single())
.map(x -> String.format("[%s] %s", Thread.currentThread().getName(), x))
.publishOn(Schedulers.elastic())
.map(x -> String.format("[%s] %s", Thread.currentThread().getName(), x))
.subscribeOn(Schedulers.parallel())
.toStream()
.forEach(System.out::println);

在這段程式碼中,使用publishOn指定了流釋出的排程器,subscribeOn則指定的是流產生的排程器。
首先是parallel排程器進行流資料的生成,接著使用一個single單執行緒排程器進行釋出,此時經過第一個map轉換為另一個Flux流,其中的訊息疊加了當前執行緒的名稱。最後進入的是一個elastic彈性排程器,再次進行一次同樣的map轉換。

最終,經過多層轉換後的輸出如下:

[elastic-2] [single-1] parallel-1

小結

SpringBoot 2.x、Spring 5 對於響應式的Web程式設計(基於Reactor)都提供了全面的支援,藉助於框架的能力可以快速的完成一些簡單的響應式程式碼開發。
本文提供了較多 Reactor API的程式碼樣例,旨在幫助讀者能快速的理解 響應式程式設計的概念及方式。

對於習慣了傳統程式設計正規化的開發人員來說,熟練使用 Reactor 仍然需要一些思維上的轉變。
就筆者的自身感覺來看,Reactor 存在一些學習和適應的成本,但一旦熟悉使用之後便能體會它的先進之處。 就如 JDK8 引入的Stream API之後,許多開發者則漸漸拋棄forEach的方式..

本身這就是一種生產效率的提升機會,何樂而不為? 更何況,從應用框架的發展前景來看,響應式的前景是明朗的。

參考閱讀

使用 Reactor 進行反應式程式設計
https://www.ibm.com/developerworks/cn/java/j-cn-with-reactor-response-encode/index.html

Spring 5 的 WebFlux 開發介紹
https://www.ibm.com/developerworks/cn/java/spring5-webflux-reactive/index.