Spring Reactive Development- flatMap, flatMapSequential and concatMap
Ractive 開發過程中如果遇到和Operator內操作元素返回一個 Mono或者Flux時我們知道都要使用flatMap進行抻平操作
其中有flatMap, flatMapSequential and concatMap 比較容易混淆 下面寫幾個例子進行下區分:
mock Reactive資料查詢服務:
//mock reactive data search service private Mono<String> getEmployeeName(String id) { Map<String, String> employees = new HashMap<>(); employees.put("1", "aSha"); employees.put("2", "billy"); employees.put("3", "cindy"); employees.put("4", "davide"); employees.put("5", "ella"); employees.put("6", "flee"); employees.put("7", "galosh"); employees.put("8", "hank"); String name = employees.get(id); try { // Thread.sleep(new Random().nextInt(100)*100); Thread.sleep(1000); } catch (InterruptedException e) { throw new RuntimeException("Thread sleep Error!!!"); } if (StringUtils.isBlank(name)) { throw new RuntimeException("Not Found!!!"); } return Mono.just(name); }
1. flatMap
@Test public void testFlatMap() { List<String> employeeIds = Arrays.asList("1", "2", "3", "4", "5", "6", "7", "8"); Flux<String> nameFlux = Flux.fromIterable(employeeIds) .flatMap(this::getEmployeeName) .log(); StepVerifier.create(nameFlux) .expectNextCount(8) .verifyComplete(); }
==========================================
22:12:53.173 [main] INFO reactor.Flux.FlatMap.1 - onNext(aSha)
22:12:54.178 [main] INFO reactor.Flux.FlatMap.1 - onNext(billy)
22:12:55.179 [main] INFO reactor.Flux.FlatMap.1 - onNext(cindy)
22:12:56.180 [main] INFO reactor.Flux.FlatMap.1 - onNext(davide)
22:12:57.185 [main] INFO reactor.Flux.FlatMap.1 - onNext(ella)
22:12:58.187 [main] INFO reactor.Flux.FlatMap.1 - onNext(flee)
22:12:59.189 [main] INFO reactor.Flux.FlatMap.1 - onNext(galosh)
22:13:00.190 [main] INFO reactor.Flux.FlatMap.1 - onNext(hank)
22:13:00.192 [main] INFO reactor.Flux.FlatMap.1 - onComplete()
但是flatMap不確保順序的, 當內部處理複雜的時候可能會導致順序混亂
例如為了提高效率使用parallelScheduler:
@Test public void testFlatMapParallelScheduler() { List<String> employeeIds = Arrays.asList("1", "2", "3", "4", "5", "6", "7", "8"); Flux<String> nameFlux = Flux.fromIterable(employeeIds) .window(2) .flatMap(identities -> identities.flatMap(this::getEmployeeName).subscribeOn(parallel())) .log(); StepVerifier.create(nameFlux) .expectNextCount(8) .verifyComplete(); }
================================
22:12:16.987 [main] INFO reactor.Flux.FlatMap.1 - onNext(aSha)
22:12:17.988 [main] INFO reactor.Flux.FlatMap.1 - onNext(billy)
22:12:18.990 [parallel-3] INFO reactor.Flux.FlatMap.1 - onNext(ella)
22:12:18.990 [parallel-3] INFO reactor.Flux.FlatMap.1 - onNext(cindy)
22:12:18.990 [parallel-3] INFO reactor.Flux.FlatMap.1 - onNext(galosh)
22:12:19.991 [parallel-2] INFO reactor.Flux.FlatMap.1 - onNext(davide)
22:12:19.991 [parallel-2] INFO reactor.Flux.FlatMap.1 - onNext(flee)
22:12:19.991 [parallel-2] INFO reactor.Flux.FlatMap.1 - onNext(hank)
22:12:19.991 [parallel-2] INFO reactor.Flux.FlatMap.1 - onComplete()
2 concatMap
concatMap 他是深度優先,可以深度保持資料的處理順序,但是效率就沒有提高。
@Test public void testConcatMapParallelScheduler() { List<String> employeeIds = Arrays.asList("1", "2", "3", "4", "5", "6", "7", "8"); Flux<String> nameFlux = Flux.fromIterable(employeeIds) .window(2) .concatMap(identities -> identities.flatMap(this::getEmployeeName).subscribeOn(parallel())) .log(); StepVerifier.create(nameFlux) .expectNextCount(8) .verifyComplete(); } ================================= 22:15:41.513 [parallel-1] INFO reactor.Flux.ConcatMap.1 - onNext(aSha) 22:15:42.516 [parallel-1] INFO reactor.Flux.ConcatMap.1 - onNext(billy) 22:15:43.521 [parallel-2] INFO reactor.Flux.ConcatMap.1 - onNext(cindy) 22:15:44.525 [parallel-2] INFO reactor.Flux.ConcatMap.1 - onNext(davide) 22:15:45.528 [parallel-3] INFO reactor.Flux.ConcatMap.1 - onNext(ella) 22:15:46.530 [parallel-3] INFO reactor.Flux.ConcatMap.1 - onNext(flee) 22:15:47.532 [parallel-4] INFO reactor.Flux.ConcatMap.1 - onNext(galosh) 22:15:48.533 [parallel-4] INFO reactor.Flux.ConcatMap.1 - onNext(hank) 22:15:48.534 [parallel-4] INFO reactor.Flux.ConcatMap.1 - onComplete()
3 flatMapSequential
flatMapSequential 是介於flatMap 與concatMap之間的,既能夠保證順序,又可以提高效率, 不過這個順序保證是最終結果順序一致
@Test public void testFlatMapSequentialParallelScheduler() { List<String> employeeIds = Arrays.asList("1", "2", "3", "4", "5", "6", "7", "8"); Flux<String> nameFlux = Flux.fromIterable(employeeIds) .window(2) .flatMapSequential(identities -> identities.flatMap(this::getEmployeeName).subscribeOn(parallel())) .log(); StepVerifier.create(nameFlux) .expectNextCount(8) .verifyComplete(); } ========================================== 22:18:45.966 [parallel-1] INFO reactor.Flux.MergeSequential.1 - onNext(aSha) 22:18:46.967 [parallel-1] INFO reactor.Flux.MergeSequential.1 - onNext(billy) 22:18:46.968 [parallel-1] INFO reactor.Flux.MergeSequential.1 - onNext(cindy) 22:18:46.968 [parallel-1] INFO reactor.Flux.MergeSequential.1 - onNext(davide) 22:18:46.968 [parallel-1] INFO reactor.Flux.MergeSequential.1 - onNext(ella) 22:18:46.968 [parallel-1] INFO reactor.Flux.MergeSequential.1 - onNext(flee) 22:18:46.968 [parallel-1] INFO reactor.Flux.MergeSequential.1 - onNext(galosh) 22:18:46.968 [parallel-1] INFO reactor.Flux.MergeSequential.1 - onNext(hank) 22:18:46.968 [parallel-1] INFO reactor.Flux.MergeSequential.1 - onComplete()