1. 程式人生 > >(14)Reactor調度器與線程模型——響應式Spring的道法術器

(14)Reactor調度器與線程模型——響應式Spring的道法術器

響應式編程 Spring WebFlux

本系列文章索引《響應式Spring的道法術器》
前情提要 Spring WebFlux快速上手 | Spring WebFlux性能測試
前情提要:Reactor 3快速上手 | 響應式流規範 | 自定義數據流
本文測試源碼

2.4 調度器與線程模型

在1.3.2節簡單介紹了不同類型的調度器Scheduler,以及如何使用publishOnsubscribeOn切換不同的線程執行環境。

下邊使用一個簡單的例子再回憶一下:

    @Test
    public void testScheduling() {
        Flux.range(0, 10)
//                .log()
                .publishOn(Schedulers.newParallel("myParallel"))
//                .log()
                .subscribeOn(Schedulers.newElastic("myElastic"))
                .log()
                .blockLast();
    }
  1. 只保留這個log()的話,可以看到,源頭數據流是執行在myElastic-x線程上的;
  2. 只保留這個log()的話,可以看到,publishOn之後數據流是執行在myParallel-x線程上的;
  3. 只保留這個log()的話,可以看到,subscribeOn之後數據流依然是執行在myParallel-x線程上的。

通過以上三個log()的輸出,可以發現,對於如下圖所示的操作鏈:

技術分享圖片

  • publishOn會影響鏈中其後的操作符,比如第一個publishOn調整調度器為elastic,則filter的處理操作是在彈性線程池中執行的;同理,flatMap是執行在固定大小的parallel線程池中的;
  • subscribeOn無論出現在什麽位置,都只影響源頭的執行環境,也就是range方法是執行在單線程中的,直至被第一個publishOn切換調度器之前,所以range後的map也在單線程中執行。

這一節我們了解一下它的實現機制。

2.4.1 調度器

調度器相當於Reactor中的ExecutorService,不同的調度器定義不同的線程執行環境。Schedulers工具類提供的靜態方法可搭建不同的線程執行環境。

Schedulers類已經預先創建了幾種常用的不同線程池模型的調度器:使用single()elastic()parallel()方法創建的調度器可以分別使用內置的單線程、彈性線程池和固定大小線程池。如果想創建新的調度器,可以使用newSingle()

newElastic()newParallel()方法。這些方法都是返回一個Scheduler的具體實現。

看一下Scheduler都有哪些行為:

public interface Scheduler extends Disposable {
    // 調度執行Runnable任務task。
    Disposable schedule(Runnable task);
    // 延遲一段指定的時間後執行。
    Disposable schedule(Runnable task, long delay, TimeUnit unit);
    // 周期性地執行任務。
    Disposable schedulePeriodically(Runnable task, long initialDelay, long period, TimeUnit unit);
    // 創建一個工作線程。
    Worker createWorker();
    // 啟動調度器
    void start();
    // 以下兩個方法可以暫時忽略
    void dispose();
    long now(TimeUnit unit)

    // 一個Worker代表調度器可調度的一個工作線程,在一個Worker內,遵循FIFO(先進先出)的任務執行策略
    interface Worker extends Disposable {
        // 調度執行Runnable任務task。
        Disposable schedule(Runnable task);
        // 延遲一段指定的時間後執行。
        Disposable schedule(Runnable task, long delay, TimeUnit unit);
        // 周期性地執行任務。
        Disposable schedulePeriodically(Runnable task, long initialDelay, long period, TimeUnit unit);
    }
}

技術分享圖片

如圖所示,Scheduler是領導,Worker是員工,每個Scheduler手中有若幹Worker。接到任務後,Scheduler負責分派,Worker負責幹活。

Scheduler中,每個Worker都是一個ScheduledExecutorService,或一個包裝了ScheduledExecutorService的對象。所以,Scheduler擁有的並不是線程池,而是一個自行維護的ScheduledExecutorService池。

所謂“自行維護”,主要有三點:

  1. 可供調遣的Worker。比如Schedulers.newParallel()返回的ParallelScheduler,其內維護的是一個固定大小的ScheduledExecutorService[]數組;而ElasticScheduler由一個ExecutorService的Queue來維護。
  2. 任務分派策略。ElasticSchedulerParallelScheduler都有一個pick()方法,用來選出合適的Worker
  3. 對於要處理的任務,包裝為Callable,從而可以異步地返回一個Future給調用者。

2.4.2 切換執行環境

再回到publishOnsubscribeOn方法。

在Reactor中,對於數據流的處理,實際上是一系列方法調用和基於事件的回調,包括subscribeonSubscriberequest,以及onNextonErroronComplete。拿出2.1節的圖幫助理解:

技術分享圖片

當調用.subscribe()方法時,會形成從上遊向下遊的數據流,數據流中的元素通過onNext* (onError|onComplete)攜帶“順流而下”。同時,Reactor使用者看不到的是,還有一條從下遊向上遊的“訂閱鏈”,request就是沿著這個鏈向上反饋需求的。

publishOn方法能夠將onNextonErroronComplete調度到給定的SchedulerWorker上執行。所以如上圖場景中,再.map.filter中間增加一個publisheOn(Schedulers.elastic())的話,.filter操作的onNext的過濾處理將會執行在ElasticScheduler的某個Worker上。

subscribeOn方法能夠將subscribe(會調用onSubscribe)、request調度到給定的SchedulerWorker上執行。所以在任何位置增加一個subscribeOn(Schedulers.elastic())的話,都會借助自下而上的訂閱鏈,通過subscribe()方法,將線程執行環境傳遞到“源頭”,從而Flux.just會執行在ElasticScheduler上。繼而影響到其後的操作符,直至遇到publishOn改變了執行環境。

此外,有些操作符本身會需要調度器來進行多線程的處理,當你不明確指定調度器的時候,那些操作符會自行使用內置的單例調度器來執行。例如,Flux.delayElements(Duration) 使用的是 Schedulers.parallel()調度器對象:

    @Test
    public void testDelayElements() {
        Flux.range(0, 10)
                .delayElements(Duration.ofMillis(10))
                .log()
                .blockLast();
    }

從輸出可以看到onNext運行在不同的線程上:

[ INFO] (main) onSubscribe(FluxConcatMap.ConcatMapImmediate)
[ INFO] (main) request(unbounded)
[ INFO] (parallel-1) onNext(0)
[ INFO] (parallel-2) onNext(1)
[ INFO] (parallel-3) onNext(2)
[ INFO] (parallel-4) onNext(3)
...

2.4.3 為數據流配置Context

在Reactor中,基於Scheduler的線程調度確實非常簡單好用,但是還有個問題需要解決。

我們以往在編寫多線程的代碼時,如果涉及到只在線程內部使用的值,可能會使用ThreadLocal進行包裝。

但是在響應式編程中,由於線程環境經常發生變化,這一用法就失去作用了,並且甚至帶來bug。比如,使用 Logback 的 MDC 來存儲日誌關聯的 ID 就屬於這種情況。

自從版本 3.1.0,Reactor 引入了一個類似於 ThreadLocal 的高級功能:Context。它作用於一個 Flux 或一個 Mono 上,而不是應用於一個線程(Thread)。也就是其生命周期伴隨整個數據流,而不是線程。

相對來說,用戶使用Context並不多,對此感興趣或有此需求的話,請看我翻譯的相關文檔,可以對Reactor內部實現尤其是Subscription有更深的理解。

2.4.4 並行執行

如今多核架構已然普及,能夠方便的進行並行處理是很重要的。

對於一些能夠在一個線程中順序處理的任務,即使調度到ParallelScheduler上,通常也只由一個Worker來執行,比如:

    @Test
    public void testParallelFlux() throws InterruptedException {
        Flux.range(1, 10)
                .publishOn(Schedulers.parallel())
                .log().subscribe();
        TimeUnit.MILLISECONDS.sleep(10);
    }

輸出如下:

[ INFO] (main) | onSubscribe([Fuseable] FluxPublishOn.PublishOnSubscriber)
[ INFO] (main) | request(unbounded)
[ INFO] (parallel-1) | onNext(1)
[ INFO] (parallel-1) | onNext(2)
[ INFO] (parallel-1) | onNext(3)
[ INFO] (parallel-1) | onNext(4)
[ INFO] (parallel-1) | onNext(5)
[ INFO] (parallel-1) | onNext(6)
[ INFO] (parallel-1) | onNext(7)
[ INFO] (parallel-1) | onNext(8)
[ INFO] (parallel-1) | onNext(9)
[ INFO] (parallel-1) | onNext(10)
[ INFO] (parallel-1) | onComplete()

有時候,我們確實需要一些任務能夠“均勻”分布在不同的工作線程上執行,這時候就需要用到ParallelFlux

你可以對任何Flux使用parallel()操作符來得到一個ParallelFlux。不過這個操作符本身並不會進行並行處理,而只是將負載劃分到多個執行“軌道”上(默認情況下,軌道個數與CPU核數相等)。

為了配置ParallelFlux如何並行地執行每一個軌道,需要使用runOn(Scheduler),這裏,Schedulers.parallel() 是比較推薦的專門用於並行處理的調度器。

    @Test
    public void testParallelFlux() throws InterruptedException {
        Flux.range(1, 10)
                .parallel(2)
                .runOn(Schedulers.parallel())
//                .publishOn(Schedulers.parallel())
                .log()
                .subscribe();

        TimeUnit.MILLISECONDS.sleep(10);
    }

輸出如下:

[ INFO] (main) onSubscribe([Fuseable] FluxPublishOn.PublishOnSubscriber)
[ INFO] (main) request(unbounded)
[ INFO] (main) onSubscribe([Fuseable] FluxPublishOn.PublishOnSubscriber)
[ INFO] (main) request(unbounded)
[ INFO] (parallel-1) onNext(1)
[ INFO] (parallel-2) onNext(2)
[ INFO] (parallel-1) onNext(3)
[ INFO] (parallel-2) onNext(4)
[ INFO] (parallel-1) onNext(5)
[ INFO] (parallel-2) onNext(6)
[ INFO] (parallel-1) onNext(7)
[ INFO] (parallel-2) onNext(8)
[ INFO] (parallel-1) onNext(9)
[ INFO] (parallel-2) onNext(10)
[ INFO] (parallel-1) onComplete()
[ INFO] (parallel-2) onComplete()

可以看到,各個元素的onNext “均勻”分布執行在兩個線程上,最後每個線程上有獨立的onComplete事件,這與publishOn調度到ParallelScheduler上的情況是不同的。

(14)Reactor調度器與線程模型——響應式Spring的道法術器