5分鐘理解 SpringBoot 響應式的核心-Reactor
目錄
- 一、前言
- 二、 Mono 與 Flux
- 構造器
- 三、 流計算
- 1. 緩衝
- 2. 過濾/提取
- 3. 轉換
- 4. 合併
- 5. 合流
- 6. 累積
- 四、異常處理
- 五、執行緒排程
- 小結
- 參考閱讀
一、前言
關於 響應式 Reactive,前面的兩篇文章談了不少概念,基本都離不開下面兩點:
- 響應式程式設計是面向流的、非同步化的開發方式
- 響應式是非常通用的概念,無論在前端領域、還是實時流、離線處理場景中都是適用的。
有興趣的朋友可以看看這兩篇文章:
Reactive(1) 從響應式程式設計到“好萊塢”
Reactive(2) 響應式流與制奶廠業務
這次,我們把目光轉向 SpringBoot,在SpringBoot 2.0版本之後,提供了對響應式程式設計的全面支援。
因此在升級到 2.x版本之後,便能方便的實現事件驅動模型的後端程式設計,這其中離不開 webflux這個模組。
其同時也被 Spring 5 用作開發響應式 web 應用的核心基礎。 那麼, webflux 是一個怎樣的東西?
Webflux
Webflux 模組的名稱是 spring-webflux,名稱中的 Flux 來源於 Reactor 中的類 Flux。
該模組中包含了對 響應式 HTTP、伺服器推送 和 WebSocket 的支援。
Webflux 支援兩種不同的程式設計模型:
- 第一種是 Spring MVC 中使用的基於 Java 註解的方式,一個使用Reactive風格的Controller如下所示:
@RestController public class EchoController { @GetMapping("/echo") public Mono<String> sayHelloWorld() { return Mono.just("Echo!"); } }
- 第二種是 基於 Java 8 的 lambda 表示式的函數語言程式設計模型。
這兩種程式設計模型只是在程式碼編寫方式上存在不同,但底層的基礎模組仍然是一樣的。
除此之外,Webflux 可以執行在支援 Servlet 3.1 非阻塞 IO API 的 Servlet 容器上,或是其他非同步執行時環境,如 Netty 和 Undertow。
關於Webflux 與 SpringMVC 的區別,可以參考下圖:
SpringBoot、Webflux、Reactor 可以說是層層包含的關係,其中,響應式能力的核心仍然是來自 Reactor元件。
由此可見,掌握Reactor的用法 必然是熟練進行 Spring 響應式程式設計的重點。
二、 Mono 與 Flux
在理解響應式Web程式設計之前,我們需要對Reactor 兩個核心概念做一些澄清,一個是Mono,另一個是Flux。
Flux 表示的是包含 0 到 N 個元素的非同步序列。在該序列中可以包含三種不同型別的訊息通知:
- 正常的包含元素的訊息
- 序列結束的訊息
- 序列出錯的訊息
當訊息通知產生時,訂閱者中對應的方法 onNext(), onComplete()和 onError()會被呼叫。
Mono 表示的是包含 0 或者 1 個元素的非同步序列。該序列中同樣可以包含與 Flux 相同的三種類型的訊息通知。
Flux 和 Mono 之間可以進行轉換,比如對一個 Flux 序列進行計數操作,得到的結果是一個 Mono
構造器
Reactor提供了非常方便的API來建立 Flux、Mono 物件,如下:
使用靜態工廠類建立Flux
Flux.just("Hello", "World").subscribe(System.out::println);
Flux.fromArray(new Integer[] {1, 2, 3}).subscribe(System.out::println);
Flux.empty().subscribe(System.out::println);
Flux.range(1, 10).subscribe(System.out::println);
Flux.interval(Duration.of(10, ChronoUnit.SECONDS)).subscribe(System.out::println);
- just():可以指定序列中包含的全部元素。創建出來的 Flux 序列在釋出這些元素之後會自動結束。
- fromArray():可以從一個數組、Iterable 物件或 Stream 物件中建立 Flux 物件。
- empty():建立一個不包含任何元素,只發布結束訊息的序列。
- range(int start, int count):建立包含從 start 起始的 count 個數量的 Integer 物件的序列。
- interval(Duration period)和 interval(Duration delay, Duration period):建立一個包含了從 0 開始遞增的 Long 物件的序列。其中包含的元素按照指定的間隔來發布。除了間隔時間之外,還可以指定起始元素髮布之前的延遲時間。
除了上述的方式之外,還可以使用 generate()、create()方法來自定義流資料的產生過程:
generate()
Flux.generate(sink -> {
sink.next("Echo");
sink.complete();
}).subscribe(System.out::println);
generate 只提供序列中單個訊息的產生邏輯(同步通知),其中的 sink.next()最多隻能呼叫一次,比如上面的程式碼中,產生一個Echo訊息後就結束了。
create()
Flux.create(sink -> {
for (char i = 'a'; i <= 'z'; i++) {
sink.next(i);
}
sink.complete();
}).subscribe(System.out::print);
create 提供的是整個序列的產生邏輯,sink.next()可以呼叫多次(非同步通知),如上面的程式碼將會產生a-z的小寫字母。
使用靜態工廠類建立Mono
Mono 的建立方式與 Flux 是很相似的。 除了Flux 所擁有的構造方式之外,還可以支援與Callable、Runnable、Supplier 等介面整合。
參考下面的程式碼:
Mono.fromSupplier(() -> "Mono1").subscribe(System.out::println);
Mono.justOrEmpty(Optional.of("Mono2")).subscribe(System.out::println);
Mono.create(sink -> sink.success("Mono3")).subscribe(System.out::println);
三、 流計算
1. 緩衝
在Reactive(1) 從響應式程式設計到“好萊塢” 一文中曾經提到過緩衝(buffer)的概念。
buffer 是流處理中非常常用的一種處理,意思就是將流的一段截停後再做處理。
比如下面的程式碼:
Flux.range(1, 100).buffer(20).subscribe(System.out::println);
Flux.interval(Duration.of(0, ChronoUnit.SECONDS),
Duration.of(1, ChronoUnit.SECONDS))
.buffer(Duration.of(5, ChronoUnit.SECONDS)).
take(2).toStream().forEach(System.out::println);
Flux.range(1, 10).bufferUntil(i -> i % 2 == 0).subscribe(System.out::println);
Flux.range(1, 10).bufferWhile(i -> i % 2 == 0).subscribe(System.out::println);
第一個buffer(20)是指湊足20個數字後再進行處理,該語句會輸出5組資料(按20分組)
第二個buffer(Duration duration)是指湊足一段時間後的資料再近些處理,這裡是5秒鐘做一次處理
第三個bufferUtil(Predicate p)是指等到某個元素滿足斷言(條件)時進行收集處理,這裡將會輸出[1,2],[3,4]..這樣的奇偶數字對
第四個bufferWhile(Predicate p)則僅僅是收集滿足斷言(條件)的元素,這裡將會輸出2,4,6..這樣的偶數
與 buffer 類似的是window函式,後者的不同在於其在緩衝截停後並不會輸出一些元素列表,而是直接轉換為Flux物件,如下:
Flux.range(1, 100).window(20)
.subscribe(flux ->
flux.buffer(5).subscribe(System.out::println));
window(20)返回的結果是一個Flux
因此上面的程式碼會按5個一組輸出:
[1, 2, 3, 4, 5]
[6, 7, 8, 9, 10]
[11, 12, 13, 14, 15]
...
2. 過濾/提取
上面的bufferWhile 其實充當了過濾的作用,當然,對於流元素的過濾也可以使用filter函式來處理:
Flux.range(1, 10).filter(i -> i % 2 == 0).subscribe(System.out::println);
take 函式可以用來提取想要的元素,這與filter 過濾動作是恰恰相反的,來看看take的用法:
Flux.range(1, 10).take(2).subscribe(System.out::println);
Flux.range(1, 10).takeLast(2).subscribe(System.out::println);
Flux.range(1, 10).takeWhile(i -> i < 5).subscribe(System.out::println);
Flux.range(1, 10).takeUntil(i -> i == 6).subscribe(System.out::println);
第一個take(2)指提取前面的兩個元素;
第二個takeLast(2)指提取最後的兩個元素;
第三個takeWhile(Predicate p)指提取滿足條件的元素,這裡是1-4
第四個takeUtil(Predicate p)指一直提取直到滿足條件的元素出現為止,這裡是1-6
3. 轉換
使用map函式可以將流中的元素進行個體轉換,如下:
Flux.range(1, 10).map(x -> x*x).subscribe(System.out::println);
這裡的map使用的JDK8 所定義的 Function介面
4. 合併
某些情況下我們需要對兩個流中的元素進行合併處理,這與合併兩個陣列有點相似,但結合流的特點又會有不同的需求。
使用zipWith函式可以實現簡單的流元素合併處理:
Flux.just("I", "You")
.zipWith(Flux.just("Win", "Lose"))
.subscribe(System.out::println);
Flux.just("I", "You")
.zipWith(Flux.just("Win", "Lose"),
(s1, s2) -> String.format("%s!%s!", s1, s2))
.subscribe(System.out::println);
上面的程式碼輸出為:
[I,Win]
[You,Lose]
I!Win!
You!Lose!
第一個zipWith輸出的是Tuple物件(不可變的元祖),第二個zipWith增加了一個BiFunction來實現合併計算,輸出的是字串。
注意到zipWith是分別按照元素在流中的順序進行兩兩合併的,合併後的流長度則最短的流為準,遵循最短對齊原則。
用於實現合併的還有 combineLastest函式,combinLastest 會動態的將流中新產生元素(末位)進行合併,注意是隻要產生新元素都會觸發合併動作併產生一個結果元素,如下面的程式碼:
Flux.combineLatest(
Arrays::toString,
Flux.interval(Duration.of(0, ChronoUnit.MILLIS),
Duration.of(100, ChronoUnit.MILLIS)).take(2),
Flux.interval(Duration.of(50, ChronoUnit.MILLIS),
Duration.of(100, ChronoUnit.MILLIS)).take(2)
).toStream().forEach(System.out::println);
輸出為:
[0, 0]
[1, 0]
[1, 1]
5. 合流
與合併比較類似的處理概念是合流,合流的不同之處就在於元素之間不會產生合併,最終流的元素個數(長度)是兩個源的個數之和。
合流的計算可以使用 merge或mergeSequential 函式,這兩者的區別在於:
merge後的元素是按產生時間排序的,而mergeSequential 則是按整個流被訂閱的時間來排序,如下面的程式碼:
Flux.merge(Flux.interval(
Duration.of(0, ChronoUnit.MILLIS),
Duration.of(100, ChronoUnit.MILLIS)).take(2),
Flux.interval(
Duration.of(50, ChronoUnit.MILLIS),
Duration.of(100, ChronoUnit.MILLIS)).take(2))
.toStream()
.forEach(System.out::println);
System.out.println("---");
Flux.mergeSequential(Flux.interval(
Duration.of(0, ChronoUnit.MILLIS),
Duration.of(100, ChronoUnit.MILLIS)).take(2),
Flux.interval(
Duration.of(50, ChronoUnit.MILLIS),
Duration.of(100, ChronoUnit.MILLIS)).take(2))
.toStream()
.forEach(System.out::println);
輸出為:
0
0
1
1
---
0
1
0
1
merge 是直接將Flux 元素進行合流之外,而flatMap則提供了更加高階的處理:
flatMap 函式會先將Flux中的元素轉換為 Flux(流),然後再新產生的Flux進行合流處理,如下:
Flux.just(1, 2)
.flatMap(x -> Flux.interval(Duration.of(x * 10, ChronoUnit.MILLIS),
Duration.of(10, ChronoUnit.MILLIS)).take(x))
.toStream()
.forEach(System.out::println);
flatMap也存在flatMapSequential的一個兄弟版本,後者決定了合併流元素的順序是與流的訂閱順序一致的。
6. 累積
reduce 和 reduceWith 操作符對流中包含的所有元素進行累積操作,得到一個包含計算結果的 Mono 序列。累積操作是通過一個 BiFunction 來表示的。reduceWith 允許在在操作時指定一個起始值(與第一個元素進行運算)
如下面的程式碼:
Flux.range(1, 100).reduce((x, y) -> x + y).subscribe(System.out::println);
Flux.range(1, 100).reduceWith(() -> 100, (x, y) -> x + y).subscribe(System.out::println);
這裡通過reduce計算出1-100的累加結果(1+2+3+...100),結果輸出為:
5050
5150
四、異常處理
在前面所提及的這些功能基本都屬於正常的流處理,然而對於異常的捕獲以及採取一些修正手段也是同樣重要的。
利用Flux/Mono 框架可以很方便的做到這點。
將正常訊息和錯誤訊息分別列印
Flux.just(1, 2)
.concatWith(Mono.error(new IllegalStateException()))
.subscribe(System.out::println, System.err::println);
當產生錯誤時預設返回0
Flux.just(1, 2)
.concatWith(Mono.error(new IllegalStateException()))
.onErrorReturn(0)
.subscribe(System.out::println);
自定義異常時的處理
Flux.just(1, 2)
.concatWith(Mono.error(new IllegalArgumentException()))
.onErrorResume(e -> {
if (e instanceof IllegalStateException) {
return Mono.just(0);
} else if (e instanceof IllegalArgumentException) {
return Mono.just(-1);
}
return Mono.empty();
})
.subscribe(System.out::println);
當產生錯誤時重試
Flux.just(1, 2)
.concatWith(Mono.error(new IllegalStateException()))
.retry(1)
.subscribe(System.out::println);
這裡的retry(1)表示最多重試1次,而且重試將從訂閱的位置開始重新發送流事件
五、執行緒排程
我們說過,響應式是非同步化的,那麼就會涉及到多執行緒的排程。
Reactor 提供了非常方便的排程器(Scheduler)工具方法,可以指定流的產生以及轉換(計算)釋出所採用的執行緒排程方式。
這些方式包括:
類別 | 描述 |
---|---|
immediate | 採用當前執行緒 |
single | 單一可複用的執行緒 |
elastic | 彈性可複用的執行緒池(IO型) |
parallel | 並行操作優化的執行緒池(CPU計算型) |
timer | 支援任務排程的執行緒池 |
fromExecutorService | 自定義執行緒池 |
下面,以一個簡單的例項來演示不同的執行緒排程:
Flux.create(sink -> {
sink.next(Thread.currentThread().getName());
sink.complete();
})
.publishOn(Schedulers.single())
.map(x -> String.format("[%s] %s", Thread.currentThread().getName(), x))
.publishOn(Schedulers.elastic())
.map(x -> String.format("[%s] %s", Thread.currentThread().getName(), x))
.subscribeOn(Schedulers.parallel())
.toStream()
.forEach(System.out::println);
在這段程式碼中,使用publishOn指定了流釋出的排程器,subscribeOn則指定的是流產生的排程器。
首先是parallel排程器進行流資料的生成,接著使用一個single單執行緒排程器進行釋出,此時經過第一個map轉換為另一個Flux流,其中的訊息疊加了當前執行緒的名稱。最後進入的是一個elastic彈性排程器,再次進行一次同樣的map轉換。
最終,經過多層轉換後的輸出如下:
[elastic-2] [single-1] parallel-1
小結
SpringBoot 2.x、Spring 5 對於響應式的Web程式設計(基於Reactor)都提供了全面的支援,藉助於框架的能力可以快速的完成一些簡單的響應式程式碼開發。
本文提供了較多 Reactor API的程式碼樣例,旨在幫助讀者能快速的理解 響應式程式設計的概念及方式。
對於習慣了傳統程式設計正規化的開發人員來說,熟練使用 Reactor 仍然需要一些思維上的轉變。
就筆者的自身感覺來看,Reactor 存在一些學習和適應的成本,但一旦熟悉使用之後便能體會它的先進之處。 就如 JDK8 引入的Stream API之後,許多開發者則漸漸拋棄forEach的方式..
本身這就是一種生產效率的提升機會,何樂而不為? 更何況,從應用框架的發展前景來看,響應式的前景是明朗的。
參考閱讀
使用 Reactor 進行反應式程式設計
https://www.ibm.com/developerworks/cn/java/j-cn-with-reactor-response-encode/index.html
Spring 5 的 WebFlux 開發介紹
https://www.ibm.com/developerworks/cn/java/spring5-webflux-reactive/index.