1. 程式人生 > 實用技巧 >Spring Reactive Development- flatMap, flatMapSequential and concatMap

Spring Reactive Development- flatMap, flatMapSequential and concatMap

Ractive 開發過程中如果遇到和Operator內操作元素返回一個 Mono或者Flux時我們知道都要使用flatMap進行抻平操作

其中有flatMap, flatMapSequential and concatMap 比較容易混淆 下面寫幾個例子進行下區分:

mock Reactive資料查詢服務:

//mock reactive data search service    
private Mono<String> getEmployeeName(String id) {
        Map<String, String> employees = new HashMap<>();
        employees.put("1", "aSha");
        employees.put("2", "billy");
        employees.put("3", "cindy");
        employees.put("4", "davide");
        employees.put("5", "ella");
        employees.put("6", "flee");
        employees.put("7", "galosh");
        employees.put("8", "hank");

        String name = employees.get(id);
        try {
//            Thread.sleep(new Random().nextInt(100)*100);
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            throw new RuntimeException("Thread sleep Error!!!");
        }
        if (StringUtils.isBlank(name)) {
            throw new RuntimeException("Not Found!!!");
        }
        return Mono.just(name);
    }

  

1. flatMap

    @Test
    public void testFlatMap() {
        List<String> employeeIds = Arrays.asList("1", "2", "3", "4", "5", "6", "7", "8");
        Flux<String> nameFlux = Flux.fromIterable(employeeIds)
                                    .flatMap(this::getEmployeeName)
                                    .log();
        StepVerifier.create(nameFlux)
                    .expectNextCount(8)
                    .verifyComplete();
    }

==========================================

22:12:53.173 [main] INFO reactor.Flux.FlatMap.1 - onNext(aSha)
22:12:54.178 [main] INFO reactor.Flux.FlatMap.1 - onNext(billy)
22:12:55.179 [main] INFO reactor.Flux.FlatMap.1 - onNext(cindy)
22:12:56.180 [main] INFO reactor.Flux.FlatMap.1 - onNext(davide)
22:12:57.185 [main] INFO reactor.Flux.FlatMap.1 - onNext(ella)
22:12:58.187 [main] INFO reactor.Flux.FlatMap.1 - onNext(flee)
22:12:59.189 [main] INFO reactor.Flux.FlatMap.1 - onNext(galosh)
22:13:00.190 [main] INFO reactor.Flux.FlatMap.1 - onNext(hank)
22:13:00.192 [main] INFO reactor.Flux.FlatMap.1 - onComplete()

 但是flatMap不確保順序的, 當內部處理複雜的時候可能會導致順序混亂

例如為了提高效率使用parallelScheduler:

    @Test
    public void testFlatMapParallelScheduler() {
        List<String> employeeIds = Arrays.asList("1", "2", "3", "4", "5", "6", "7", "8");
        Flux<String> nameFlux = Flux.fromIterable(employeeIds)
                                    .window(2)
                                    .flatMap(identities -> identities.flatMap(this::getEmployeeName).subscribeOn(parallel()))
                                    .log();
        StepVerifier.create(nameFlux)
                    .expectNextCount(8)
                    .verifyComplete();
    }
================================

22:12:16.987 [main] INFO reactor.Flux.FlatMap.1 - onNext(aSha)
22:12:17.988 [main] INFO reactor.Flux.FlatMap.1 - onNext(billy)
22:12:18.990 [parallel-3] INFO reactor.Flux.FlatMap.1 - onNext(ella)
22:12:18.990 [parallel-3] INFO reactor.Flux.FlatMap.1 - onNext(cindy)
22:12:18.990 [parallel-3] INFO reactor.Flux.FlatMap.1 - onNext(galosh)
22:12:19.991 [parallel-2] INFO reactor.Flux.FlatMap.1 - onNext(davide)
22:12:19.991 [parallel-2] INFO reactor.Flux.FlatMap.1 - onNext(flee)
22:12:19.991 [parallel-2] INFO reactor.Flux.FlatMap.1 - onNext(hank)
22:12:19.991 [parallel-2] INFO reactor.Flux.FlatMap.1 - onComplete()

2 concatMap

concatMap 他是深度優先,可以深度保持資料的處理順序,但是效率就沒有提高。

	@Test
	public void testConcatMapParallelScheduler() {
		List<String> employeeIds = Arrays.asList("1", "2", "3", "4", "5", "6", "7", "8");
		Flux<String> nameFlux = Flux.fromIterable(employeeIds)
									.window(2)
									.concatMap(identities -> identities.flatMap(this::getEmployeeName).subscribeOn(parallel()))
									.log();
		StepVerifier.create(nameFlux)
					.expectNextCount(8)
					.verifyComplete();
	}

=================================
22:15:41.513 [parallel-1] INFO reactor.Flux.ConcatMap.1 - onNext(aSha)
22:15:42.516 [parallel-1] INFO reactor.Flux.ConcatMap.1 - onNext(billy)
22:15:43.521 [parallel-2] INFO reactor.Flux.ConcatMap.1 - onNext(cindy)
22:15:44.525 [parallel-2] INFO reactor.Flux.ConcatMap.1 - onNext(davide)
22:15:45.528 [parallel-3] INFO reactor.Flux.ConcatMap.1 - onNext(ella)
22:15:46.530 [parallel-3] INFO reactor.Flux.ConcatMap.1 - onNext(flee)
22:15:47.532 [parallel-4] INFO reactor.Flux.ConcatMap.1 - onNext(galosh)
22:15:48.533 [parallel-4] INFO reactor.Flux.ConcatMap.1 - onNext(hank)
22:15:48.534 [parallel-4] INFO reactor.Flux.ConcatMap.1 - onComplete()

3 flatMapSequential

flatMapSequential 是介於flatMap 與concatMap之間的,既能夠保證順序,又可以提高效率, 不過這個順序保證是最終結果順序一致

	@Test
	public void testFlatMapSequentialParallelScheduler() {
		List<String> employeeIds = Arrays.asList("1", "2", "3", "4", "5", "6", "7", "8");
		Flux<String> nameFlux = Flux.fromIterable(employeeIds)
									.window(2)
									.flatMapSequential(identities -> identities.flatMap(this::getEmployeeName).subscribeOn(parallel()))
									.log();
		StepVerifier.create(nameFlux)
					.expectNextCount(8)
					.verifyComplete();
	}

==========================================
22:18:45.966 [parallel-1] INFO reactor.Flux.MergeSequential.1 - onNext(aSha)
22:18:46.967 [parallel-1] INFO reactor.Flux.MergeSequential.1 - onNext(billy)
22:18:46.968 [parallel-1] INFO reactor.Flux.MergeSequential.1 - onNext(cindy)
22:18:46.968 [parallel-1] INFO reactor.Flux.MergeSequential.1 - onNext(davide)
22:18:46.968 [parallel-1] INFO reactor.Flux.MergeSequential.1 - onNext(ella)
22:18:46.968 [parallel-1] INFO reactor.Flux.MergeSequential.1 - onNext(flee)
22:18:46.968 [parallel-1] INFO reactor.Flux.MergeSequential.1 - onNext(galosh)
22:18:46.968 [parallel-1] INFO reactor.Flux.MergeSequential.1 - onNext(hank)
22:18:46.968 [parallel-1] INFO reactor.Flux.MergeSequential.1 - onComplete()