(14)Reactor調度器與線程模型——響應式Spring的道法術器
本系列文章索引《響應式Spring的道法術器》
前情提要 Spring WebFlux快速上手 | Spring WebFlux性能測試
前情提要:Reactor 3快速上手 | 響應式流規範 | 自定義數據流
本文測試源碼
2.4 調度器與線程模型
在1.3.2節簡單介紹了不同類型的調度器Scheduler
,以及如何使用publishOn
和subscribeOn
切換不同的線程執行環境。
下邊使用一個簡單的例子再回憶一下:
@Test public void testScheduling() { Flux.range(0, 10) // .log() .publishOn(Schedulers.newParallel("myParallel")) // .log() .subscribeOn(Schedulers.newElastic("myElastic")) .log() .blockLast(); }
- 只保留這個log()的話,可以看到,源頭數據流是執行在
myElastic-x
線程上的; - 只保留這個log()的話,可以看到,
publishOn
之後數據流是執行在myParallel-x
線程上的; - 只保留這個log()的話,可以看到,
subscribeOn
之後數據流依然是執行在myParallel-x
線程上的。
通過以上三個log()
的輸出,可以發現,對於如下圖所示的操作鏈:
publishOn
會影響鏈中其後的操作符,比如第一個publishOn調整調度器為elastic,則filter
的處理操作是在彈性線程池中執行的;同理,flatMap
是執行在固定大小的parallel線程池中的;subscribeOn
無論出現在什麽位置,都只影響源頭的執行環境,也就是range
方法是執行在單線程中的,直至被第一個publishOn
切換調度器之前,所以range
後的map
也在單線程中執行。
這一節我們了解一下它的實現機制。
2.4.1 調度器
調度器相當於Reactor中的ExecutorService,不同的調度器定義不同的線程執行環境。Schedulers
工具類提供的靜態方法可搭建不同的線程執行環境。
Schedulers
類已經預先創建了幾種常用的不同線程池模型的調度器:使用single()
、elastic()
和parallel()
方法創建的調度器可以分別使用內置的單線程、彈性線程池和固定大小線程池。如果想創建新的調度器,可以使用newSingle()
newElastic()
和newParallel()
方法。這些方法都是返回一個Scheduler
的具體實現。
看一下Scheduler
都有哪些行為:
public interface Scheduler extends Disposable {
// 調度執行Runnable任務task。
Disposable schedule(Runnable task);
// 延遲一段指定的時間後執行。
Disposable schedule(Runnable task, long delay, TimeUnit unit);
// 周期性地執行任務。
Disposable schedulePeriodically(Runnable task, long initialDelay, long period, TimeUnit unit);
// 創建一個工作線程。
Worker createWorker();
// 啟動調度器
void start();
// 以下兩個方法可以暫時忽略
void dispose();
long now(TimeUnit unit)
// 一個Worker代表調度器可調度的一個工作線程,在一個Worker內,遵循FIFO(先進先出)的任務執行策略
interface Worker extends Disposable {
// 調度執行Runnable任務task。
Disposable schedule(Runnable task);
// 延遲一段指定的時間後執行。
Disposable schedule(Runnable task, long delay, TimeUnit unit);
// 周期性地執行任務。
Disposable schedulePeriodically(Runnable task, long initialDelay, long period, TimeUnit unit);
}
}
如圖所示,Scheduler
是領導,Worker
是員工,每個Scheduler
手中有若幹Worker
。接到任務後,Scheduler
負責分派,Worker
負責幹活。
在Scheduler
中,每個Worker
都是一個ScheduledExecutorService
,或一個包裝了ScheduledExecutorService
的對象。所以,Scheduler
擁有的並不是線程池,而是一個自行維護的ScheduledExecutorService
池。
所謂“自行維護”,主要有三點:
- 可供調遣的
Worker
。比如Schedulers.newParallel()
返回的ParallelScheduler
,其內維護的是一個固定大小的ScheduledExecutorService[]
數組;而ElasticScheduler
由一個ExecutorService的Queue
來維護。 - 任務分派策略。
ElasticScheduler
和ParallelScheduler
都有一個pick()
方法,用來選出合適的Worker
。 - 對於要處理的任務,包裝為
Callable
,從而可以異步地返回一個Future
給調用者。
2.4.2 切換執行環境
再回到publishOn
和subscribeOn
方法。
在Reactor中,對於數據流的處理,實際上是一系列方法調用和基於事件的回調,包括subscribe
、onSubscribe
、request
,以及onNext
、onError
、onComplete
。拿出2.1節的圖幫助理解:
當調用.subscribe()
方法時,會形成從上遊向下遊的數據流,數據流中的元素通過onNext* (onError|onComplete)
攜帶“順流而下”。同時,Reactor使用者看不到的是,還有一條從下遊向上遊的“訂閱鏈”,request就是沿著這個鏈向上反饋需求的。
publishOn
方法能夠將onNext
、onError
、onComplete
調度到給定的Scheduler
的Worker
上執行。所以如上圖場景中,再.map
和.filter
中間增加一個publisheOn(Schedulers.elastic())
的話,.filter
操作的onNext
的過濾處理將會執行在ElasticScheduler
的某個Worker
上。
subscribeOn
方法能夠將subscribe
(會調用onSubscribe
)、request
調度到給定的Scheduler
的Worker
上執行。所以在任何位置增加一個subscribeOn(Schedulers.elastic())
的話,都會借助自下而上的訂閱鏈,通過subscribe()
方法,將線程執行環境傳遞到“源頭”,從而Flux.just
會執行在ElasticScheduler
上。繼而影響到其後的操作符,直至遇到publishOn
改變了執行環境。
此外,有些操作符本身會需要調度器來進行多線程的處理,當你不明確指定調度器的時候,那些操作符會自行使用內置的單例調度器來執行。例如,Flux.delayElements(Duration)
使用的是 Schedulers.parallel()
調度器對象:
@Test
public void testDelayElements() {
Flux.range(0, 10)
.delayElements(Duration.ofMillis(10))
.log()
.blockLast();
}
從輸出可以看到onNext
運行在不同的線程上:
[ INFO] (main) onSubscribe(FluxConcatMap.ConcatMapImmediate)
[ INFO] (main) request(unbounded)
[ INFO] (parallel-1) onNext(0)
[ INFO] (parallel-2) onNext(1)
[ INFO] (parallel-3) onNext(2)
[ INFO] (parallel-4) onNext(3)
...
2.4.3 為數據流配置Context
在Reactor中,基於Scheduler
的線程調度確實非常簡單好用,但是還有個問題需要解決。
我們以往在編寫多線程的代碼時,如果涉及到只在線程內部使用的值,可能會使用ThreadLocal
進行包裝。
但是在響應式編程中,由於線程環境經常發生變化,這一用法就失去作用了,並且甚至帶來bug。比如,使用 Logback 的 MDC 來存儲日誌關聯的 ID 就屬於這種情況。
自從版本 3.1.0,Reactor 引入了一個類似於 ThreadLocal 的高級功能:Context。它作用於一個 Flux 或一個 Mono 上,而不是應用於一個線程(Thread)。也就是其生命周期伴隨整個數據流,而不是線程。
相對來說,用戶使用Context並不多,對此感興趣或有此需求的話,請看我翻譯的相關文檔,可以對Reactor內部實現尤其是Subscription
有更深的理解。
2.4.4 並行執行
如今多核架構已然普及,能夠方便的進行並行處理是很重要的。
對於一些能夠在一個線程中順序處理的任務,即使調度到ParallelScheduler上,通常也只由一個Worker來執行,比如:
@Test
public void testParallelFlux() throws InterruptedException {
Flux.range(1, 10)
.publishOn(Schedulers.parallel())
.log().subscribe();
TimeUnit.MILLISECONDS.sleep(10);
}
輸出如下:
[ INFO] (main) | onSubscribe([Fuseable] FluxPublishOn.PublishOnSubscriber)
[ INFO] (main) | request(unbounded)
[ INFO] (parallel-1) | onNext(1)
[ INFO] (parallel-1) | onNext(2)
[ INFO] (parallel-1) | onNext(3)
[ INFO] (parallel-1) | onNext(4)
[ INFO] (parallel-1) | onNext(5)
[ INFO] (parallel-1) | onNext(6)
[ INFO] (parallel-1) | onNext(7)
[ INFO] (parallel-1) | onNext(8)
[ INFO] (parallel-1) | onNext(9)
[ INFO] (parallel-1) | onNext(10)
[ INFO] (parallel-1) | onComplete()
有時候,我們確實需要一些任務能夠“均勻”分布在不同的工作線程上執行,這時候就需要用到ParallelFlux
。
你可以對任何Flux使用parallel()
操作符來得到一個ParallelFlux
。不過這個操作符本身並不會進行並行處理,而只是將負載劃分到多個執行“軌道”上(默認情況下,軌道個數與CPU核數相等)。
為了配置ParallelFlux
如何並行地執行每一個軌道,需要使用runOn(Scheduler)
,這裏,Schedulers.parallel() 是比較推薦的專門用於並行處理的調度器。
@Test
public void testParallelFlux() throws InterruptedException {
Flux.range(1, 10)
.parallel(2)
.runOn(Schedulers.parallel())
// .publishOn(Schedulers.parallel())
.log()
.subscribe();
TimeUnit.MILLISECONDS.sleep(10);
}
輸出如下:
[ INFO] (main) onSubscribe([Fuseable] FluxPublishOn.PublishOnSubscriber)
[ INFO] (main) request(unbounded)
[ INFO] (main) onSubscribe([Fuseable] FluxPublishOn.PublishOnSubscriber)
[ INFO] (main) request(unbounded)
[ INFO] (parallel-1) onNext(1)
[ INFO] (parallel-2) onNext(2)
[ INFO] (parallel-1) onNext(3)
[ INFO] (parallel-2) onNext(4)
[ INFO] (parallel-1) onNext(5)
[ INFO] (parallel-2) onNext(6)
[ INFO] (parallel-1) onNext(7)
[ INFO] (parallel-2) onNext(8)
[ INFO] (parallel-1) onNext(9)
[ INFO] (parallel-2) onNext(10)
[ INFO] (parallel-1) onComplete()
[ INFO] (parallel-2) onComplete()
可以看到,各個元素的onNext “均勻”分布執行在兩個線程上,最後每個線程上有獨立的onComplete
事件,這與publishOn
調度到ParallelScheduler上的情況是不同的。
(14)Reactor調度器與線程模型——響應式Spring的道法術器