1. 程式人生 > >(13)Reactor的backpressure策略——響應式Spring的道法術器

(13)Reactor的backpressure策略——響應式Spring的道法術器

2.3 不同的回壓策略

許多地方也叫做“背壓”、“負壓”,我在《Reactor參考文件》中是翻譯為“背壓”的,後來在看到有“回壓”的翻譯,忽然感覺從文字上似乎更加符合。

這一節討論回壓的問題,有兩個前提:

  1. 釋出者與訂閱者不在同一個執行緒中,因為在同一個執行緒中的話,通常使用傳統的邏輯就可以,不需要進行回壓處理;
  2. 釋出者發出資料的速度高於訂閱者處理資料的速度,也就是處於“PUSH”狀態下,如果相反,那就是“PUll”狀態,不需要處理回壓。

本節測試原始碼

2.3.1 回壓策略

回壓的處理有以下幾種策略:

  1. ERROR: 當下遊跟不上節奏的時候發出一個錯誤訊號。
  2. DROP:當下遊沒有準備好接收新的元素的時候拋棄這個元素。
  3. LATEST:讓下游只得到上游最新的元素。
  4. BUFFER:快取下游沒有來得及處理的元素(如果快取不限大小的可能導致OutOfMemoryError)。

這幾種策略定義在列舉型別OverflowStrategy中,不過還有一個IGNORE型別,即完全忽略下游背壓請求,這可能會在下游佇列積滿的時候導致 IllegalStateException。

2.3.2 使用create宣告回壓策略

上一節中,用於生成資料流的方法createpush可以用於非同步的場景,而且它們也支援回壓,我們可以通過提供一個 OverflowStrategy 來定義背壓行為。方法簽名:

    public static <T> Flux<T> create(Consumer<? super FluxSink<T>> emitter, OverflowStrategy backpressure) 

預設(沒有第二個引數的方法)是快取策略的,我們來試一下別的策略,比如DROP的策略。

我們繼續使用2.2節的那個測試例子,下邊是用create建立的“快的釋出者”,不過方便起見拆放到兩個私有方法裡供呼叫:

    public class Test_2_3 {
        /**
         * 使用create方法生成“快的釋出者”。
         * @param
strategy 回壓策略 * @return Flux */
private Flux<MyEventSource.MyEvent> createFlux(FluxSink.OverflowStrategy strategy) { return Flux.create(sink -> eventSource.register(new MyEventListener() { @Override public void onNewEvent(MyEventSource.MyEvent event) { System.out.println("publish >>> " + event.getMessage()); sink.next(event); } @Override public void onEventStopped() { sink.complete(); } }), strategy); // 1 } /** * 生成MyEvent。 * @param count 生成MyEvent的個數。 * @param millis 每個MyEvent之間的時間間隔。 */ private void generateEvent(int times, int millis) { // 迴圈生成MyEvent,每個MyEvent間隔millis毫秒 for (int i = 0; i < times; i++) { try { TimeUnit.MILLISECONDS.sleep(millis); } catch (InterruptedException e) { } eventSource.newEvent(new MyEventSource.MyEvent(new Date(), "Event-" + i)); } eventSource.eventStopped(); } }

有了“快的釋出者”,下面是“慢的訂閱者”,以及一些測試準備工作:

    public class Test_2_3 {
        private final int EVENT_DURATION   = 10;    // 生成的事件間隔時間,單位毫秒
        private final int EVENT_COUNT      = 20;    // 生成的事件個數
        private final int PROCESS_DURATION = 30;    // 訂閱者處理每個元素的時間,單位毫秒

        private Flux<MyEventSource.MyEvent> fastPublisher;
        private SlowSubscriber slowSubscriber;
        private MyEventSource eventSource;
        private CountDownLatch countDownLatch;

        /**
         * 準備工作。
         */
        @Before
        public void setup() {
            countDownLatch = new CountDownLatch(1);
            slowSubscriber = new SlowSubscriber();
            eventSource = new MyEventSource();
        }

        /**
         * 觸發訂閱,使用CountDownLatch等待訂閱者處理完成。
         */
        @After
        public void subscribe() throws InterruptedException {
            fastPublisher.subscribe(slowSubscriber);
            generateEvent(EVENT_COUNT, EVENT_DURATION);
            countDownLatch.await(1, TimeUnit.MINUTES);
        }

        /**
         * 內部類,“慢的訂閱者”。
         */
        class SlowSubscriber extends BaseSubscriber<MyEventSource.MyEvent> {

            @Override
            protected void hookOnSubscribe(Subscription subscription) {
                request(1);     // 訂閱時請求1個數據
            }

            @Override
            protected void hookOnNext(MyEventSource.MyEvent event) {
                System.out.println("                      receive <<< " + event.getMessage());
                try {
                    TimeUnit.MILLISECONDS.sleep(PROCESS_DURATION);
                } catch (InterruptedException e) {
                }
                request(1);     // 每處理完1個數據,就再請求1個
            }

            @Override
            protected void hookOnError(Throwable throwable) {
                System.err.println("                      receive <<< " + throwable);
            }

            @Override
            protected void hookOnComplete() {
                countDownLatch.countDown();
            }
        }
    }

下面是測試方法:

    /**
     * 測試create方法的不同OverflowStrategy的效果。
     */
    @Test
    public void testCreateBackPressureStratety() {
        fastPublisher =
                createFlux(FluxSink.OverflowStrategy.BUFFER)    // 1
                        .doOnRequest(n -> System.out.println("         ===  request: " + n + " ==="))    // 2
                        .publishOn(Schedulers.newSingle("newSingle"), 1);   // 3
    }
  1. 調整不同的策略(BUFFER/DROP/LATEST/ERROR/IGNORE)觀察效果,create方法預設為BUFFER;
  2. 打印出每次的請求(也就是後邊.publishOn的請求);
  3. 使用publishOn讓後續的操作符和訂閱者執行在一個單獨的名為newSingle的執行緒上,第二個引數1是預取個數,也就是.publishOn作為訂閱者每次向上遊request的個數,預設為256,所以一定程度上也起到了快取的效果,為了測試,設定為1。

通常情況下,釋出者於訂閱者並不在同一個執行緒上,這裡使用publishOn來模擬這種情況。

BUFFER策略的輸出如下(來不及處理的資料會快取下來,這是通常情況下的預設策略):

         ===  request: 1 ===
publish >>> Event-0
                      receive <<< Event-0
publish >>> Event-1
publish >>> Event-2
         ===  request: 1 ===
publish >>> Event-3
                      receive <<< Event-1
publish >>> Event-4
publish >>> Event-5
publish >>> Event-6
         ===  request: 1 ===
                      receive <<< Event-2
publish >>> Event-7
publish >>> Event-8
...

DROP策略的輸出如下(有新資料就緒的時候,看是否有request,有的話就發出,沒有就丟棄):

         ===  request: 1 ===
publish >>> Event-0
                      receive <<< Event-0
publish >>> Event-1
publish >>> Event-2
publish >>> Event-3
         ===  request: 1 ===
publish >>> Event-4
                      receive <<< Event-4
publish >>> Event-5
publish >>> Event-6
publish >>> Event-7
         ===  request: 1 ===
publish >>> Event-8
                      receive <<< Event-8
...

可以看到,第1/2/3/5/6/7/…的資料被丟棄了,當有request之後的資料會被髮出。調整一下publishOn方法的第二個引數(預取個數)為2,輸出如下:

         ===  request: 2 ===
publish >>> Event-0
                      receive <<< Event-0
publish >>> Event-1
publish >>> Event-2
publish >>> Event-3
                      receive <<< Event-1
publish >>> Event-4
publish >>> Event-5
publish >>> Event-6
         ===  request: 2 ===
publish >>> Event-7
                      receive <<< Event-7
publish >>> Event-8
publish >>> Event-9
publish >>> Event-10
                      receive <<< Event-8
publish >>> Event-11
publish >>> Event-12

可見,每次request(請求2個數據)之後的2個數據發出,更多就緒的資料由於沒有request就丟棄了。

LATEST的輸出如下(request到來的時候,將最新的資料發出):

         ===  request: 1 ===
publish >>> Event-0
                      receive <<< Event-0
publish >>> Event-1
publish >>> Event-2
publish >>> Event-3
         ===  request: 1 ===
                      receive <<< Event-3
publish >>> Event-4
publish >>> Event-5
         ===  request: 1 ===
                      receive <<< Event-5
publish >>> Event-6
publish >>> Event-7
publish >>> Event-8
         ===  request: 1 ===
                      receive <<< Event-8

ERROR的輸出如下(當訂閱者來不及處理時候發出一個錯誤訊號):

         ===  request: 1 ===
publish >>> Event-0
                      receive <<< Event-0
publish >>> Event-1
publish >>> Event-2
         ===  request: 1 ===
                      receive <<< reactor.core.Exceptions$OverflowException: The receiver is overrun by more signals than expected (bounded queue...)

IGNORE的輸出如下:

...
         ===  request: 2 ===
                      receive <<< Event-10
                      receive <<< Event-11
         ===  request: 2 ===
                      receive <<< Event-12
                      receive <<< reactor.core.Exceptions$OverflowException: Queue is full: Reactive Streams source doesn't respect backpressure

2.3.3 調整回壓策略的操作符

Reactor提供了響應的onBackpressureXxx操作符,調整回壓策略。測試方法如下:

    /**
     * 測試不同的onBackpressureXxx方法的效果。
     */
    @Test
    public void testOnBackPressureXxx() {
        fastPublisher = createFlux(FluxSink.OverflowStrategy.BUFFER)
                .onBackpressureBuffer()     // BUFFER
//                .onBackpressureDrop()     // DROP
//                .onBackpressureLatest()   // LATEST
//                .onBackpressureError()    // ERROR
                .doOnRequest(n -> System.out.println("         ===  request: " + n + " ==="))
                .publishOn(Schedulers.newSingle("newSingle"), 1);
    }

通過開啟某一個操作符的註釋可以觀察輸出。這裡就不貼輸出內容了,Reactor文件的示意圖更加直觀:

onBackpressureBuffer,對於來自其下游的request採取“快取”策略。

onBackpressureBuffer

onBackpressureDrop,元素就緒時,根據下游是否有未滿足的request來判斷是否發出當前元素。

onBackpressureDrop

onBackpressureLatest,當有新的request到來的時候,將最新的元素髮出。

onBackpressureLatest

onBackpressureError,當有多餘元素就緒時,發出錯誤訊號。

onBackpressureError

真是一圖勝千言啊,上邊的這些圖片都是來自Reactor官方文件。

當進行非同步程式設計時,通常會面臨相互協作的各個元件不在同一個執行緒的情況,比如一個生產者不斷生成訊息,而一個消費者不斷處理這些產生的訊息,二者通常不在一個執行緒甚至是兩個不同的元件。當有人不小心採用了無界資源(比如無上限的彈性執行緒池、無界佇列等),那麼在高併發或任務繁重時就有可能造成執行緒數爆炸增長,或佇列堆積,因此backpressure這種協調機制對於維持系統穩定具有重要作用。