(13)Reactor的backpressure策略——響應式Spring的道法術器
2.3 不同的回壓策略
許多地方也叫做“背壓”、“負壓”,我在《Reactor參考文件》中是翻譯為“背壓”的,後來在看到有“回壓”的翻譯,忽然感覺從文字上似乎更加符合。
這一節討論回壓的問題,有兩個前提:
- 釋出者與訂閱者不在同一個執行緒中,因為在同一個執行緒中的話,通常使用傳統的邏輯就可以,不需要進行回壓處理;
- 釋出者發出資料的速度高於訂閱者處理資料的速度,也就是處於“PUSH”狀態下,如果相反,那就是“PUll”狀態,不需要處理回壓。
本節測試原始碼
2.3.1 回壓策略
回壓的處理有以下幾種策略:
- ERROR: 當下遊跟不上節奏的時候發出一個錯誤訊號。
- DROP:當下遊沒有準備好接收新的元素的時候拋棄這個元素。
- LATEST:讓下游只得到上游最新的元素。
- BUFFER:快取下游沒有來得及處理的元素(如果快取不限大小的可能導致OutOfMemoryError)。
這幾種策略定義在列舉型別OverflowStrategy
中,不過還有一個IGNORE型別,即完全忽略下游背壓請求,這可能會在下游佇列積滿的時候導致 IllegalStateException。
2.3.2 使用create宣告回壓策略
上一節中,用於生成資料流的方法create
和push
可以用於非同步的場景,而且它們也支援回壓,我們可以通過提供一個 OverflowStrategy 來定義背壓行為。方法簽名:
public static <T> Flux<T> create(Consumer<? super FluxSink<T>> emitter, OverflowStrategy backpressure)
預設(沒有第二個引數的方法)是快取策略的,我們來試一下別的策略,比如DROP的策略。
我們繼續使用2.2節的那個測試例子,下邊是用create
建立的“快的釋出者”,不過方便起見拆放到兩個私有方法裡供呼叫:
public class Test_2_3 {
/**
* 使用create方法生成“快的釋出者”。
* @param strategy 回壓策略
* @return Flux
*/
private Flux<MyEventSource.MyEvent> createFlux(FluxSink.OverflowStrategy strategy) {
return Flux.create(sink -> eventSource.register(new MyEventListener() {
@Override
public void onNewEvent(MyEventSource.MyEvent event) {
System.out.println("publish >>> " + event.getMessage());
sink.next(event);
}
@Override
public void onEventStopped() {
sink.complete();
}
}), strategy); // 1
}
/**
* 生成MyEvent。
* @param count 生成MyEvent的個數。
* @param millis 每個MyEvent之間的時間間隔。
*/
private void generateEvent(int times, int millis) {
// 迴圈生成MyEvent,每個MyEvent間隔millis毫秒
for (int i = 0; i < times; i++) {
try {
TimeUnit.MILLISECONDS.sleep(millis);
} catch (InterruptedException e) {
}
eventSource.newEvent(new MyEventSource.MyEvent(new Date(), "Event-" + i));
}
eventSource.eventStopped();
}
}
有了“快的釋出者”,下面是“慢的訂閱者”,以及一些測試準備工作:
public class Test_2_3 {
private final int EVENT_DURATION = 10; // 生成的事件間隔時間,單位毫秒
private final int EVENT_COUNT = 20; // 生成的事件個數
private final int PROCESS_DURATION = 30; // 訂閱者處理每個元素的時間,單位毫秒
private Flux<MyEventSource.MyEvent> fastPublisher;
private SlowSubscriber slowSubscriber;
private MyEventSource eventSource;
private CountDownLatch countDownLatch;
/**
* 準備工作。
*/
@Before
public void setup() {
countDownLatch = new CountDownLatch(1);
slowSubscriber = new SlowSubscriber();
eventSource = new MyEventSource();
}
/**
* 觸發訂閱,使用CountDownLatch等待訂閱者處理完成。
*/
@After
public void subscribe() throws InterruptedException {
fastPublisher.subscribe(slowSubscriber);
generateEvent(EVENT_COUNT, EVENT_DURATION);
countDownLatch.await(1, TimeUnit.MINUTES);
}
/**
* 內部類,“慢的訂閱者”。
*/
class SlowSubscriber extends BaseSubscriber<MyEventSource.MyEvent> {
@Override
protected void hookOnSubscribe(Subscription subscription) {
request(1); // 訂閱時請求1個數據
}
@Override
protected void hookOnNext(MyEventSource.MyEvent event) {
System.out.println(" receive <<< " + event.getMessage());
try {
TimeUnit.MILLISECONDS.sleep(PROCESS_DURATION);
} catch (InterruptedException e) {
}
request(1); // 每處理完1個數據,就再請求1個
}
@Override
protected void hookOnError(Throwable throwable) {
System.err.println(" receive <<< " + throwable);
}
@Override
protected void hookOnComplete() {
countDownLatch.countDown();
}
}
}
下面是測試方法:
/**
* 測試create方法的不同OverflowStrategy的效果。
*/
@Test
public void testCreateBackPressureStratety() {
fastPublisher =
createFlux(FluxSink.OverflowStrategy.BUFFER) // 1
.doOnRequest(n -> System.out.println(" === request: " + n + " ===")) // 2
.publishOn(Schedulers.newSingle("newSingle"), 1); // 3
}
- 調整不同的策略(BUFFER/DROP/LATEST/ERROR/IGNORE)觀察效果,create方法預設為BUFFER;
- 打印出每次的請求(也就是後邊
.publishOn
的請求); - 使用
publishOn
讓後續的操作符和訂閱者執行在一個單獨的名為newSingle
的執行緒上,第二個引數1是預取個數,也就是.publishOn
作為訂閱者每次向上遊request的個數,預設為256,所以一定程度上也起到了快取的效果,為了測試,設定為1。
通常情況下,釋出者於訂閱者並不在同一個執行緒上,這裡使用
publishOn
來模擬這種情況。
BUFFER
策略的輸出如下(來不及處理的資料會快取下來,這是通常情況下的預設策略):
=== request: 1 ===
publish >>> Event-0
receive <<< Event-0
publish >>> Event-1
publish >>> Event-2
=== request: 1 ===
publish >>> Event-3
receive <<< Event-1
publish >>> Event-4
publish >>> Event-5
publish >>> Event-6
=== request: 1 ===
receive <<< Event-2
publish >>> Event-7
publish >>> Event-8
...
DROP
策略的輸出如下(有新資料就緒的時候,看是否有request,有的話就發出,沒有就丟棄):
=== request: 1 ===
publish >>> Event-0
receive <<< Event-0
publish >>> Event-1
publish >>> Event-2
publish >>> Event-3
=== request: 1 ===
publish >>> Event-4
receive <<< Event-4
publish >>> Event-5
publish >>> Event-6
publish >>> Event-7
=== request: 1 ===
publish >>> Event-8
receive <<< Event-8
...
可以看到,第1/2/3/5/6/7/…的資料被丟棄了,當有request之後的資料會被髮出。調整一下publishOn
方法的第二個引數(預取個數)為2,輸出如下:
=== request: 2 ===
publish >>> Event-0
receive <<< Event-0
publish >>> Event-1
publish >>> Event-2
publish >>> Event-3
receive <<< Event-1
publish >>> Event-4
publish >>> Event-5
publish >>> Event-6
=== request: 2 ===
publish >>> Event-7
receive <<< Event-7
publish >>> Event-8
publish >>> Event-9
publish >>> Event-10
receive <<< Event-8
publish >>> Event-11
publish >>> Event-12
可見,每次request(請求2個數據)之後的2個數據發出,更多就緒的資料由於沒有request就丟棄了。
LATEST
的輸出如下(request到來的時候,將最新的資料發出):
=== request: 1 ===
publish >>> Event-0
receive <<< Event-0
publish >>> Event-1
publish >>> Event-2
publish >>> Event-3
=== request: 1 ===
receive <<< Event-3
publish >>> Event-4
publish >>> Event-5
=== request: 1 ===
receive <<< Event-5
publish >>> Event-6
publish >>> Event-7
publish >>> Event-8
=== request: 1 ===
receive <<< Event-8
ERROR
的輸出如下(當訂閱者來不及處理時候發出一個錯誤訊號):
=== request: 1 ===
publish >>> Event-0
receive <<< Event-0
publish >>> Event-1
publish >>> Event-2
=== request: 1 ===
receive <<< reactor.core.Exceptions$OverflowException: The receiver is overrun by more signals than expected (bounded queue...)
IGNORE
的輸出如下:
...
=== request: 2 ===
receive <<< Event-10
receive <<< Event-11
=== request: 2 ===
receive <<< Event-12
receive <<< reactor.core.Exceptions$OverflowException: Queue is full: Reactive Streams source doesn't respect backpressure
2.3.3 調整回壓策略的操作符
Reactor提供了響應的onBackpressureXxx
操作符,調整回壓策略。測試方法如下:
/**
* 測試不同的onBackpressureXxx方法的效果。
*/
@Test
public void testOnBackPressureXxx() {
fastPublisher = createFlux(FluxSink.OverflowStrategy.BUFFER)
.onBackpressureBuffer() // BUFFER
// .onBackpressureDrop() // DROP
// .onBackpressureLatest() // LATEST
// .onBackpressureError() // ERROR
.doOnRequest(n -> System.out.println(" === request: " + n + " ==="))
.publishOn(Schedulers.newSingle("newSingle"), 1);
}
通過開啟某一個操作符的註釋可以觀察輸出。這裡就不貼輸出內容了,Reactor文件的示意圖更加直觀:
onBackpressureBuffer,對於來自其下游的request採取“快取”策略。
onBackpressureDrop,元素就緒時,根據下游是否有未滿足的request來判斷是否發出當前元素。
onBackpressureLatest,當有新的request到來的時候,將最新的元素髮出。
onBackpressureError,當有多餘元素就緒時,發出錯誤訊號。
真是一圖勝千言啊,上邊的這些圖片都是來自Reactor官方文件。
當進行非同步程式設計時,通常會面臨相互協作的各個元件不在同一個執行緒的情況,比如一個生產者不斷生成訊息,而一個消費者不斷處理這些產生的訊息,二者通常不在一個執行緒甚至是兩個不同的元件。當有人不小心採用了無界資源(比如無上限的彈性執行緒池、無界佇列等),那麼在高併發或任務繁重時就有可能造成執行緒數爆炸增長,或佇列堆積,因此backpressure這種協調機制對於維持系統穩定具有重要作用。