1. 程式人生 > >(19)Reactor Processors——響應式Spring的道法術器

(19)Reactor Processors——響應式Spring的道法術器

Spring WebFlux 響應式編程

本系列文章索引《響應式Spring的道法術器》
前情提要 響應式流 | Reactor 3快速上手 | 響應式流規範

2.9 Processor

Processor既是一種特別的發布者(Publisher)又是一種訂閱者(Subscriber)。 所以你能夠訂閱一個Processor,也可以調用它們提供的方法來手動插入數據到序列,或終止序列。

前面一直在聊響應式流的四個接口中的三個:PublisherSubscriberSubscription,唯獨Processor遲遲沒有提及。原因在於想用好它們不太容易,多數情況下,我們應該進行避免使用Processor,通常來說僅用於一些特殊場景。

2.9.1 使用 Sink 來線程安全地生成流

比起直接使用Processor,更好的方式是通過調用sink()來得到它的Sink。這個Sink是線程安全的,可以用於在應用程序中多線程並發地生成數據。例如,通過UnicastProcessor得到一個線程安全的 sink:

    UnicastProcessor<Integer> processor = UnicastProcessor.create();
    FluxSink<Integer> sink = processor.sink(overflowStrategy);

多個線程可以並發地通過下邊的方法生成數據到sink。

    sink.next(n);

看到這裏是不是感覺跟generate生成數據流的方式很像?所以Reactor官方建議,當你想要使用Processor的時候,首先看看能否用generate實現同樣的功能,或者看看是否有相應的操作符可以達到你想要的效果

2.9.2 Reactor 內置的 Processor

Reactor Core 內置多種 Processor。這些 processor 具有不同的語法,大概分為三類。

  • 直接的(direct)(DirectProcessor 和 UnicastProcessor):這些 processors 只能通過直接 調用 Sink 的方法來推送數據。
  • 同步的(synchronous)
    (EmitterProcessor 和 ReplayProcessor):這些 processors 既可以直接調用 Sink 方法來推送數據,也可以通過訂閱到一個上遊的發布者來同步地產生數據。
  • 異步的(asynchronous)(WorkQueueProcessor 和 TopicProcessor):這些 processors 可以將從多個上遊發布者得到的數據推送下去。由於使用了 RingBuffer 的數據結構來緩存多個來自上遊的數據,因此更加有健壯性。

異步的 processor 在實例化的時候最復雜,因為有許多不同的選項。因此它們暴露出一個 Builder 接口。 而簡單的 processors 有靜態的工廠方法。

1)DirectProcessor

DirectProcessor可以將信號分發給零到多個訂閱者(Subscriber)。它是最容易實例化的,使用靜態方法 create() 即可。另一方面,它的不足是無法處理背壓。所以,當DirectProcessor推送的是 N 個元素,而至少有一個訂閱者的請求個數少於 N 的時候,就會發出一個IllegalStateException

一旦 Processor 結束(通常通過調用它的 Sink 的 error(Throwable) 或 complete() 方法), 雖然它允許更多的訂閱者訂閱它,但是會立即向它們重新發送終止信號。

2)UnicastProcessor

UnicastProcessor可以使用一個內置的緩存來處理背壓。代價就是它最多只能有一個訂閱者(上一節的例子通過publish轉換成了ConnectableFlux,所以可以接入兩個訂閱者)。

UnicastProcessor有多種選項,因此提供多種不同的create靜態方法。例如,它默認是 無限的(unbounded) :如果你在在訂閱者還沒有請求數據的情況下讓它推送數據,它會緩存所有數據。

可以通過提供一個自定義的 Queue 的具體實現傳遞給 create 工廠方法來改變默認行為。如果給出的隊列是有限的(bounded), 並且緩存已滿,而且未收到下遊的請求,processor 會拒絕推送數據。

在上邊“有限的”例子中,還可以在構造 processor 的時候提供一個回調方法,這個回調方法可以在每一個 被拒絕推送的元素上調用,從而讓開發者有機會清理這些元素。

3)EmitterProcessor

EmitterProcessor能夠向多個訂閱者發送數據,並且可以對每一個訂閱者進行背壓處理。它本身也可以訂閱一個發布者並同步獲得數據。

最初如果沒有訂閱者,它仍然允許推送一些數據到緩存,緩存大小由bufferSize定義。 之後如果仍然沒有訂閱者訂閱它並消費數據,對onNext的調用會阻塞,直到有訂閱者接入 (這時只能並發地訂閱了)。

因此第一個訂閱者會收到最多bufferSize個元素。然而之後,後續接入的訂閱者只能獲取到它們開始訂閱之後推送的數據。這個內部的緩存會繼續用於背壓的目的。

默認情況下,如果所有的訂閱者都取消了訂閱,它會清空內部緩存,並且不再接受更多的訂閱者。這一點可以通過 create 靜態工廠方法的 autoCancel 參數來配置。

4)ReplayProcessor

ReplayProcessor會緩存直接通過自身的 Sink 推送的元素,以及來自上遊發布者的元素, 並且後來的訂閱者也會收到重發(replay)的這些元素。

可以通過多種配置方式創建它:

  • 緩存一個元素(cacheLast)。
  • 緩存一定個數的歷史元素(create(int)),所有的歷史元素(create())。
  • 緩存基於時間窗期間內的元素(createTimeout(Duration))。
  • 緩存基於歷史個數和時間窗的元素(createSizeOrTimeout(int, Duration))。

5)TopicProcessor

TopicProcessor是一個異步的 processor,它能夠重發來自多個上遊發布者的元素, 這需要在創建它的時候配置shared(build() 的 share(boolean) 配置)。

如果你企圖在並發環境下通過並發的上遊發布者調用TopicProcessoronNextonComplete,或onError方法,就必須配置shared。否則,並發調用就是非法的,從而 processor 是完全兼容響應式流規範的。

TopicProcessor能夠對多個訂閱者發送數據。它通過對每一個訂閱者關聯一個線程來實現這一點, 這個線程會一直執行直到 processor 發出onErroronComplete信號,或關聯的訂閱者被取消。 最多可以接受的訂閱者個數由構造者方法executor指定,通過提供一個有限線程數的 ExecutorService來限制這一個數。

這個 processor 基於一個RingBuffer數據結構來存儲已發送的數據。每一個訂閱者線程 自行管理其相關的數據在RingBuffer中的索引。

這個 processor 也有一個autoCancel構造器方法:如果設置為true(默認的),那麽當 所有的訂閱者取消之後,上遊發布者也就被取消了。

6)WorkQueueProcessor

WorkQueueProcessor也是一個異步的 processor,也能夠重發來自多個上遊發布者的元素, 同樣在創建時需要配置shared(它多數構造器配置與TopicProcessor相同)。

它放松了對響應式流規範的兼容,但是好處就在於相對於TopicProcessor來說需要更少的資源。 它仍然基於RingBuffer,但是不再要求每一個訂閱者都關聯一個線程,因此相對於TopicProcessor來說更具擴展性。

代價在於分發模式有些區別:來自訂閱者的請求會匯總在一起,並且這個 processor 每次只對一個 訂閱者發送數據,因此需要循環(round-robin)對訂閱者發送數據,而不是一次全部發出的模式(無法保證完全公平的循環分發)。

WorkQueueProcessor多數構造器方法與TopicProcessor相同,比如autoCancelshare, 以及waitStrategy。下遊訂閱者的最大數目同樣由構造器executor配置的ExecutorService 決定。

註意:最好不要有太多訂閱者訂閱WorkQueueProcessor,因為這會鎖住 processor。如果你需要限制訂閱者數量,最好使用一個ThreadPoolExecutorForkJoinPool。這個 processor 能夠檢測到(線程池)容量並在訂閱者過多時拋出異常。


本文的介紹並未給出示例,在下一章我們編寫“響應式Netty”的時候會介紹到Processor的使用。

(19)Reactor Processors——響應式Spring的道法術器