Reactor 3 學習筆記(2)
接上篇繼續學習各種方法:
4.9、reduce/reduceWith
@Test public void reduceTest() { Flux.range(1, 10).reduce((x, y) -> x + y).subscribe(System.out::println); Flux.range(1, 10).reduceWith(() -> 10, (x, y) -> x + y).subscribe(System.out::println); }
輸出:
55 65
上面的程式碼,reduce相當於把1到10累加求和,reduceWith則是先指定一個起始值,然後在這個起始值基礎上再累加。(tips: 除了累加,還可以做階乘)
reduce示意圖:
reduceWith示意圖:
4.10、merge/mergeSequential/contact
@Test public void mergeTest() { Flux.merge(Flux.interval(Duration.of(500, ChronoUnit.MILLIS)).take(5), Flux.interval(Duration.of(600, ChronoUnit.MILLIS), Duration.of(500, ChronoUnit.MILLIS)).take(5)) .toStream().forEach(System.out::println); System.out.println("-----------------------------"); Flux.mergeSequential(Flux.interval(Duration.of(500, ChronoUnit.MILLIS)).take(5), Flux.interval(Duration.of(600, ChronoUnit.MILLIS), Duration.of(500, ChronoUnit.MILLIS)).take(5)) .toStream().forEach(System.out::println); }
merge就是將把多個Flux"按元素實際產生的順序"合併,而mergeSequential則是按多個Flux"被訂閱的順序"來合併,以上面的程式碼來說,二個Flux,從時間上看,元素是交替產生的,所以merge的輸出結果,是混在一起的,而mergeSequential則是能分出Flux整體的先後順序。
0 0 1 1 2 2 3 3 4 4 ----------------------------- 0 1 2 3 4 0 1 2 3 4
merge示意圖:
mergeSequential示意圖:
與mergeSequential類似的,還有一個contact方法,示意圖如下:
4.11、combineLatest
@Test public void combineLatestTest() { Flux.combineLatest( Arrays::toString, Flux.interval(Duration.of(10000, ChronoUnit.MILLIS)).take(3), Flux.just("A", "B")) .toStream().forEach(System.out::println); System.out.println("------------------"); Flux.combineLatest( Arrays::toString, Flux.just(0, 1), Flux.just("A", "B")) .toStream().forEach(System.out::println); System.out.println("------------------"); Flux.combineLatest( Arrays::toString, Flux.interval(Duration.of(1000, ChronoUnit.MILLIS)).take(2), Flux.interval(Duration.of(10000, ChronoUnit.MILLIS)).take(2)) .toStream().forEach(System.out::println); }
該操作會將所有流中的最新產生的元素合併成一個新的元素,作為返回結果流中的元素。只要其中任何一個流中產生了新的元素,合併操作就會被執行一次。
分析一下第一段輸出:
第1個Flux用了延時生成,第1個數字0,10秒後才產生,這時第2個Flux中的A,B早就生成完畢,所以此時二個Flux中最新生在的元素,就是[0,B],類似的,10秒後,第2個數字1依次產生,再執行1次合併,生成[1,B]...
輸出:
[0, B] [1, B] [2, B] ------------------ [1, A] [1, B] ------------------ [1, 0] [1, 1]
示意圖如下:
4.12、first
@Test public void firstTest() { Flux.first(Flux.fromArray(new String[]{"A", "B"}), Flux.just(1, 2, 3)) .subscribe(System.out::println); }
這個很簡單理解,多個Flux,只取第1個Flux的元素。輸出如下:
A B
示意圖:
4.13、 map
@Test public void mapTest() { Flux.just('A', 'B', 'C').map(a -> (int) (a)).subscribe(System.out::println); }
map相當於把一種型別的元素序列,轉換成另一種型別,輸出如下:
65 66 67
示意圖:
五、訊息處理
寫程式碼時,難免會遇到各種異常或錯誤,所謂訊息處理,就是指如何處理這些異常。
5.1 訂閱錯誤訊息
@Test public void subscribeTest1() { Flux.just("A", "B", "C") .concatWith(Flux.error(new IndexOutOfBoundsException("下標越界啦!"))) .subscribe(System.out::println, System.err::println); }
注意:這裡subscribe第2個引數,指定了System.err::println ,即錯誤訊息,輸出到異常控制檯上。
輸出效果:
示意圖:
5.2 onErrorReturn
即:遇到錯誤時,用其它指定值返回
@Test public void subscribeTest2() { Flux.just("A", "B", "C") .concatWith(Flux.error(new IndexOutOfBoundsException("下標越界啦!"))) .onErrorReturn("X") .subscribe(System.out::println, System.err::println); }
輸出:
A B C X
示意圖:
5.3 onErrorResume
跟onErrorReturn有點接近,但更靈活,可以根據異常的型別,有選擇性的處理返回值。
@Test public void subscribeTest3() { Flux.just("A", "B", "C") .concatWith(Flux.error(new IndexOutOfBoundsException("下標越界啦!"))) .onErrorResume(e -> { if (e instanceof IndexOutOfBoundsException) { return Flux.just("X", "Y", "Z"); } else { return Mono.empty(); } }) .subscribe(System.out::println, System.err::println); }
輸出:
A B C X Y Z
示意圖:
5.4 retry
即:遇到異常,就重試。
@Test public void subscribeTest4() { Flux.just("A", "B", "C") .concatWith(Flux.error(new IndexOutOfBoundsException("下標越界啦!"))) .retry(1) .subscribe(System.out::println, System.err::println); }
輸出:
示意圖:
六、(執行緒)排程器
reactor中到處充滿了非同步呼叫,內部必然有一堆執行緒排程,Schedulers提供瞭如下幾種呼叫策略:
6.1 Schedulers.immediate() - 使用當前執行緒
6.2 Schedulers.elastic() - 使用執行緒池
6.3 Schedulers.newElastic("test1") - 使用(新)執行緒池(可以指定名稱,更方便除錯)
6.4 Schedulers.single() - 單個執行緒
6.5 Schedulers.newSingle("test2") - (新)單個執行緒(可以指定名稱,更方便除錯)
6.6 Schedulers.parallel() - 使用並行處理的執行緒池(取決於CPU核數)
6.7 Schedulers.newParallel("test3") - 使用並行處理的執行緒池(取決於CPU核數,可以指定名稱,方便除錯)
6.8 Schedulers.fromExecutorService(Executors.newScheduledThreadPool(5)) - 使用Executor(這個最靈活)
示例程式碼:
@Test public void schedulesTest() { Flux.fromArray(new String[]{"A", "B", "C", "D"}) .publishOn(Schedulers.newSingle("TEST-SINGLE", true)) .map(x -> String.format("[%s]: %s", Thread.currentThread().getName(), x)) .toStream() .forEach(System.out::println); }
輸出:
[TEST-SINGLE-1]: A [TEST-SINGLE-1]: B [TEST-SINGLE-1]: C [TEST-SINGLE-1]: D
七、測試&除錯
非同步處理,通常是比較難測試的,reactor提供了StepVerifier工具來進行測試。
7.1 常規單元測試
@Test public void stepTest() { StepVerifier.create(Flux.just(1, 2) .concatWith(Mono.error(new IndexOutOfBoundsException("test"))) .onErrorReturn(3)) .expectNext(1) .expectNext(2) .expectNext(3) .verifyComplete(); }
上面的示例,Flux先生成1,2這兩個元素,然後拋了個錯誤,但馬上用onErrorReturn處理了異常,所以最終應該是期待1,2,3,complete這樣的序列。
7.2 模擬時間流逝
Flux.interval這類延時操作,如果延時較大,比如幾個小時之類的,要真實模擬的話,效率很低,StepVerifier提供了withVirtualTime方法,來模擬加快時間的流逝(是不是很體貼^_^)
@Test public void stepTest2() { StepVerifier.withVirtualTime(() -> Flux.interval(Duration.of(10, ChronoUnit.MINUTES), Duration.of(5, ChronoUnit.SECONDS)) .take(2)) .expectSubscription() .expectNoEvent(Duration.of(10, ChronoUnit.MINUTES)) .thenAwait(Duration.of(5, ChronoUnit.SECONDS)) .expectNext(0L) .thenAwait(Duration.of(5, ChronoUnit.SECONDS)) .expectNext(1L) .verifyComplete(); }
上面這個Flux先停10分鐘,然後每隔5秒生成一個數字,然後取前2個數字。程式碼先呼叫
expectSubscription 期待流被訂閱,然後 expectNoEvent(Duration.of(10, ChronoUnit.MINUTES)) 期望10分鐘內,無任何事件(即:驗證Flux先暫停10分鐘),然後 thenAwait(Duration.of(5, ChronoUnit.SECONDS)) 等5秒鐘,這時已經生成了數字0 expectNext(0L) 期待0L ... 後面的大家自行理解吧。
7.3 記錄日誌
@Test public void publisherTest() { Flux.just(1, 0) .map(c -> 1 / c) .log("MY-TEST") .subscribe(System.out::println); }
輸出:
示意圖:
7.4 checkpoint檢查點
可以在一些懷疑的地方,加上checkpoint檢查,參考下面的程式碼:
@Test public void publisherTest() { Flux.just(1, 0) .map(c -> 1 / c) .checkpoint("AAA") .subscribe(System.out::println); }
輸出: