(19)Reactor Processors——響應式Spring的道法術器
本系列文章索引《響應式Spring的道法術器》
前情提要 響應式流 | Reactor 3快速上手 | 響應式流規範
2.9 Processor
Processor
既是一種特別的發布者(Publisher
)又是一種訂閱者(Subscriber
)。 所以你能夠訂閱一個Processor
,也可以調用它們提供的方法來手動插入數據到序列,或終止序列。
前面一直在聊響應式流的四個接口中的三個:Publisher
、Subscriber
、Subscription
,唯獨Processor
遲遲沒有提及。原因在於想用好它們不太容易,多數情況下,我們應該進行避免使用Processor
,通常來說僅用於一些特殊場景。
2.9.1 使用 Sink 來線程安全地生成流
比起直接使用Processor,更好的方式是通過調用sink()
來得到它的Sink。這個Sink是線程安全的,可以用於在應用程序中多線程並發地生成數據。例如,通過UnicastProcessor
得到一個線程安全的 sink:
UnicastProcessor<Integer> processor = UnicastProcessor.create();
FluxSink<Integer> sink = processor.sink(overflowStrategy);
多個線程可以並發地通過下邊的方法生成數據到sink。
sink.next(n);
看到這裏是不是感覺跟generate
生成數據流的方式很像?所以Reactor官方建議,當你想要使用Processor的時候,首先看看能否用generate
實現同樣的功能,或者看看是否有相應的操作符可以達到你想要的效果。
2.9.2 Reactor 內置的 Processor
Reactor Core 內置多種 Processor。這些 processor 具有不同的語法,大概分為三類。
- 直接的(direct)(DirectProcessor 和 UnicastProcessor):這些 processors 只能通過直接 調用 Sink 的方法來推送數據。
- 同步的(synchronous)
- 異步的(asynchronous)(WorkQueueProcessor 和 TopicProcessor):這些 processors 可以將從多個上遊發布者得到的數據推送下去。由於使用了 RingBuffer 的數據結構來緩存多個來自上遊的數據,因此更加有健壯性。
異步的 processor 在實例化的時候最復雜,因為有許多不同的選項。因此它們暴露出一個 Builder 接口。 而簡單的 processors 有靜態的工廠方法。
1)DirectProcessor
DirectProcessor
可以將信號分發給零到多個訂閱者(Subscriber)。它是最容易實例化的,使用靜態方法 create() 即可。另一方面,它的不足是無法處理背壓。所以,當DirectProcessor
推送的是 N 個元素,而至少有一個訂閱者的請求個數少於 N 的時候,就會發出一個IllegalStateException
。
一旦 Processor 結束(通常通過調用它的 Sink 的 error(Throwable) 或 complete() 方法), 雖然它允許更多的訂閱者訂閱它,但是會立即向它們重新發送終止信號。
2)UnicastProcessor
UnicastProcessor
可以使用一個內置的緩存來處理背壓。代價就是它最多只能有一個訂閱者(上一節的例子通過publish
轉換成了ConnectableFlux
,所以可以接入兩個訂閱者)。
UnicastProcessor
有多種選項,因此提供多種不同的create
靜態方法。例如,它默認是 無限的(unbounded) :如果你在在訂閱者還沒有請求數據的情況下讓它推送數據,它會緩存所有數據。
可以通過提供一個自定義的 Queue 的具體實現傳遞給 create 工廠方法來改變默認行為。如果給出的隊列是有限的(bounded), 並且緩存已滿,而且未收到下遊的請求,processor 會拒絕推送數據。
在上邊“有限的”例子中,還可以在構造 processor 的時候提供一個回調方法,這個回調方法可以在每一個 被拒絕推送的元素上調用,從而讓開發者有機會清理這些元素。
3)EmitterProcessor
EmitterProcessor
能夠向多個訂閱者發送數據,並且可以對每一個訂閱者進行背壓處理。它本身也可以訂閱一個發布者並同步獲得數據。
最初如果沒有訂閱者,它仍然允許推送一些數據到緩存,緩存大小由bufferSize
定義。 之後如果仍然沒有訂閱者訂閱它並消費數據,對onNext
的調用會阻塞,直到有訂閱者接入 (這時只能並發地訂閱了)。
因此第一個訂閱者會收到最多bufferSize
個元素。然而之後,後續接入的訂閱者只能獲取到它們開始訂閱之後推送的數據。這個內部的緩存會繼續用於背壓的目的。
默認情況下,如果所有的訂閱者都取消了訂閱,它會清空內部緩存,並且不再接受更多的訂閱者。這一點可以通過 create 靜態工廠方法的 autoCancel 參數來配置。
4)ReplayProcessor
ReplayProcessor
會緩存直接通過自身的 Sink 推送的元素,以及來自上遊發布者的元素, 並且後來的訂閱者也會收到重發(replay)的這些元素。
可以通過多種配置方式創建它:
- 緩存一個元素(cacheLast)。
- 緩存一定個數的歷史元素(create(int)),所有的歷史元素(create())。
- 緩存基於時間窗期間內的元素(createTimeout(Duration))。
- 緩存基於歷史個數和時間窗的元素(createSizeOrTimeout(int, Duration))。
5)TopicProcessor
TopicProcessor
是一個異步的 processor,它能夠重發來自多個上遊發布者的元素, 這需要在創建它的時候配置shared
(build() 的 share(boolean) 配置)。
如果你企圖在並發環境下通過並發的上遊發布者調用
TopicProcessor
的onNext
、onComplete
,或onError
方法,就必須配置shared
。否則,並發調用就是非法的,從而 processor 是完全兼容響應式流規範的。
TopicProcessor
能夠對多個訂閱者發送數據。它通過對每一個訂閱者關聯一個線程來實現這一點, 這個線程會一直執行直到 processor 發出onError
或onComplete
信號,或關聯的訂閱者被取消。 最多可以接受的訂閱者個數由構造者方法executor
指定,通過提供一個有限線程數的 ExecutorService
來限制這一個數。
這個 processor 基於一個RingBuffer
數據結構來存儲已發送的數據。每一個訂閱者線程 自行管理其相關的數據在RingBuffer
中的索引。
這個 processor 也有一個autoCancel
構造器方法:如果設置為true
(默認的),那麽當 所有的訂閱者取消之後,上遊發布者也就被取消了。
6)WorkQueueProcessor
WorkQueueProcessor
也是一個異步的 processor,也能夠重發來自多個上遊發布者的元素, 同樣在創建時需要配置shared
(它多數構造器配置與TopicProcessor
相同)。
它放松了對響應式流規範的兼容,但是好處就在於相對於TopicProcessor
來說需要更少的資源。 它仍然基於RingBuffer
,但是不再要求每一個訂閱者都關聯一個線程,因此相對於TopicProcessor
來說更具擴展性。
代價在於分發模式有些區別:來自訂閱者的請求會匯總在一起,並且這個 processor 每次只對一個 訂閱者發送數據,因此需要循環(round-robin)對訂閱者發送數據,而不是一次全部發出的模式(無法保證完全公平的循環分發)。
WorkQueueProcessor
多數構造器方法與TopicProcessor
相同,比如autoCancel
、share
, 以及waitStrategy
。下遊訂閱者的最大數目同樣由構造器executor
配置的ExecutorService
決定。
註意:最好不要有太多訂閱者訂閱
WorkQueueProcessor
,因為這會鎖住 processor。如果你需要限制訂閱者數量,最好使用一個ThreadPoolExecutor
或ForkJoinPool
。這個 processor 能夠檢測到(線程池)容量並在訂閱者過多時拋出異常。
本文的介紹並未給出示例,在下一章我們編寫“響應式Netty”的時候會介紹到Processor的使用。
(19)Reactor Processors——響應式Spring的道法術器