RxJava 併發之資料流發射太快如何辦(背壓(Backpressure))
RxJava系列教程:
Backpressure
Rx 中的資料流是從一個地方發射到另外一個地方。每個地方處理資料的速度是不一樣的。如果生產者發射資料的速度比消費者處理的快會出現什麼情況?在同步操作中,這不是個問題,例如:
// Produce
Observable<Integer> producer = Observable.create(o -> {
o.onNext(1);
o.onNext(2);
o.onCompleted();
});
// Consume
producer.subscribe(i -> {
try {
Thread.sleep (1000);
System.out.println(i);
} catch (Exception e) { }
});
雖然上面的消費者處理資料的速度慢,但是由於是同步呼叫的,所以當 o.onNext(1) 執行後,一直阻塞到消費者處理完才執行 o.onNext(2)。 但是生產者和消費者非同步處理的情況很常見。如果是在非同步的情況下會出現什麼情況呢?
在傳統的 pull 模型中,當消費者請求資料的時候,如果生產者比較慢,則消費者會阻塞等待。如果生產者比較快,則生產者會等待消費者處理完後再生產新的資料。
而 Rx 為 push 模型。 在 Rx 中,只要生產者資料好了就發射出去了。如果生產者比較慢,則消費者就會等待新的資料到來。如果生產者快,則就會有很多資料發射給消費者,而不管消費者當前有沒有能力處理資料。這樣會導致一個問題,例如:
Observable.interval(1, TimeUnit.MILLISECONDS)
.observeOn(Schedulers.newThread())
.subscribe(
i -> {
System.out.println(i);
try {
Thread.sleep(100);
} catch (Exception e) { }
},
System.out::println);
結果:
0
1
rx.exceptions .MissingBackpressureException
上面的 MissingBackpressureException 告訴我們,生產者太快了,我們的操作函式無法處理這種情況。
消費者的補救措施
有些操作函式可以減少傳送給消費者的資料。
過濾資料
sample 操作函式可以指定生產者發射資料的最大速度,多餘的資料被丟棄了。
Observable.interval(1, TimeUnit.MILLISECONDS)
.observeOn(Schedulers.newThread())
.sample(100, TimeUnit.MILLISECONDS)
.subscribe(
i -> {
System.out.println(i);
try {
Thread.sleep(100);
} catch (Exception e) { }
},
System.out::println);
結果:
82
182
283
...
throttle 和 Debounce 也能實現類似的效果。
Collect
如果你不想丟棄資料,則當消費者忙的時候可以使用 buffer 和 window 操作函式來收集資料。如果批量處理資料速度比較快,則可以使用這種方式。
Observable.interval(10, TimeUnit.MILLISECONDS)
.observeOn(Schedulers.newThread())
.buffer(100, TimeUnit.MILLISECONDS)
.subscribe(
i -> {
System.out.println(i);
try {
Thread.sleep(100);
} catch (Exception e) { }
},
System.out::println);
結果:
[0, 1, 2, 3, 4, 5, 6, 7]
[8, 9, 10, 11, 12, 13, 14, 15, 16, 17]
[18, 19, 20, 21, 22, 23, 24, 25, 26, 27]
...
Reactive pull
上面的方式有時候可以解決問題,但是並不是 Rx 中最好的處理方式。有時候在 生產者這裡處理可能是最好的情況。Backpressure 是一種用來在生產者端降低發射速度的方式。
RxJava 實現了一種通過 Subscriber 來通知 Observable 發射資料的方式。Subscriber 有個函式 request(n),呼叫該函式用來通知 Observable 現在 Subscriber 準備接受下面 n 個數據了。在 Subscriber 的 onStart 函式裡面呼叫 request 函式則就開啟了reactive pull backpressure。這並不是傳統的 pull 模型,並不會阻塞呼叫。只是 Subscriber 通知 Observable 當前 Subscriber 的處理能力。 通過呼叫 request 可以發射更多的資料。
結構示意圖:
觀察者可以根據自身實際情況按需拉取資料,而不是被動接收(也就相當於告訴上游觀察者把速度慢下來),最終實現了上游被觀察者傳送事件的速度的控制,實現了背壓的策略。
示例程式碼如下:
class MySubscriber extends Subscriber<T> {
@Override
public void onStart() {
request(1); //要在onStart中通知被觀察者先發送一個事件
}
@Override
public void onCompleted() {
...
}
@Override
public void onError(Throwable e) {
...
}
@Override
public void onNext(T n) {
...
request(1); //處理完畢之後,在通知被觀察者傳送下一個事件
}
}
//被觀察者將產生100000個事件
Observable observable=Observable.range(1,100000);
observable.observeOn(Schedulers.newThread())
.subscribe(new MySubscriber());
在 onStart 函式中呼叫 request(1) 開啟了 backpressure 模式,告訴 Observable 一次只發射一個數據。在 onNext 裡面處理完該資料後,可以請求下一個資料。通過 quest(Long.MAX_VALUE) 可以取消 backpressure 模式。
實際上,在上面的程式碼中,你也可以不需要呼叫request(n)方法去拉取資料,程式依然能完美執行,這是因為range –> observeOn,這一段中間過程本身就是響應式拉取資料,observeOn這個操作符內部有一個緩衝區,Android環境下長度是16,它會告訴range最多傳送16個事件,充滿緩衝區即可。不過話說回來,在觀察者中使用request(n)這個方法可以使背壓的策略表現得更加直觀,更便於理解。
如果你足夠細心,會發現,在開頭展示異常情況的程式碼中,使用的是interval這個操作符,但是在這裡使用了range操作符,為什麼呢?
這是因為interval操作符本身並不支援背壓策略,它並不響應request(n),也就是說,它傳送事件的速度是不受控制的,而range這類操作符是支援背壓的,它傳送事件的速度可以被控制。
那麼到底什麼樣的Observable是支援背壓的呢?
Hot and Cold Observables
需要說明的時,Hot Observables 和cold Observables並不是嚴格的概念區分,它只是對於兩類Observable形象的描述.
- Cold Observables:指的是那些在訂閱之後才開始傳送事件的Observable(每個Subscriber都能接收到完整的事件)。
- Hot Observables:指的是那些在建立了Observable之後,(不管是否訂閱)就開始傳送事件的Observable
其實也有建立了Observable之後呼叫諸如publish()方法就可以開始傳送事件的,這裡咱們暫且忽略。
我們一般使用的都是Cold Observable,除非特殊需求,才會使用Hot Observable,在這裡,Hot Observable這一類是不支援背壓的,而是Cold Observable這一類中也有一部分並不支援背壓(比如interval,timer等操作符建立的Observable)。
說明: 都是Observable,結果有的支援背壓,有的不支援,這就是RxJava1.X的一個問題。在2.0中,這種問題已經解決了,以後談到2.0時再細說。
在那些不支援背壓策略的操作符中使用響應式拉取資料的話,還是會丟擲MissingBackpressureException。
doOnRequested
在副作用一節討論 doOn_ 函式的時候,我們沒有討論 doOnRequested 這個函式:
public final Observable<T> doOnRequest(Action1<java.lang.Long> onRequest)
當 Subscriber 請求更多的時候的時候, doOnRequest 就會被呼叫。引數中的值為請求的數量。
當前 doOnRequest 還是一個 beta 測試版本的 api。 所以在開發過程中儘量避免使用。下面來演示一下這個 api:
Observable.range(0, 3)
.doOnRequest(i -> System.out.println("Requested " + i))
.subscribe(System.out::println);
結果:
Requested 9223372036854775807
0
1
2
可以看到 subscriber 在開始的時候,請求了最大數量的資料。這意味著沒有使用 backpressure 模型。只有當一個 Subscriber 實現了 backpressure 的時候,Subscribe 才能使用該功能。下面是一個在外部實現 控制backpressure 的示例:
public class ControlledPullSubscriber<T> extends Subscriber<T> {
private final Action1<T> onNextAction;
private final Action1<Throwable> onErrorAction;
private final Action0 onCompletedAction;
public ControlledPullSubscriber(
Action1<T> onNextAction,
Action1<Throwable> onErrorAction,
Action0 onCompletedAction) {
this.onNextAction = onNextAction;
this.onErrorAction = onErrorAction;
this.onCompletedAction = onCompletedAction;
}
public ControlledPullSubscriber(
Action1<T> onNextAction,
Action1<Throwable> onErrorAction) {
this(onNextAction, onErrorAction, () -> {});
}
public ControlledPullSubscriber(Action1<T> onNextAction) {
this(onNextAction, e -> {}, () -> {});
}
@Override
public void onStart() {
request(0);
}
@Override
public void onCompleted() {
onCompletedAction.call();
}
@Override
public void onError(Throwable e) {
onErrorAction.call(e);
}
@Override
public void onNext(T t) {
onNextAction.call(t);
}
public void requestMore(int n) {
request(n);
}
}
上面的實現中,如果不主動呼叫 requestMore 函式,則 Observable 是不會發射資料的。
ControlledPullSubscriber<Integer> puller =
new ControlledPullSubscriber<Integer>(System.out::println);
Observable.range(0, 3)
.doOnRequest(i -> System.out.println("Requested " + i))
.subscribe(puller);
puller.requestMore(2);
puller.requestMore(1);
結果:
Requested 0
Requested 2
0
1
Requested 1
2
ControlledPullSubscriber 在onStart 中告訴 Observable 先不要發射資料。然後我們分別請求 2個數據和1 個數據。
Rx 操作函式內部使用佇列和緩衝來實現 backpressure ,從而避免儲存無限量的資料。大量資料的緩衝應該使用專門的操作函式來處理,例如:cache、buffer 等。 zip 函式就是一個示例,第一個 Observable 可能在第二個 Observable 發射資料之前就發射了一個或者多個數據。所以 zip 需要一個較小的緩衝來匹配兩個 Observable,從而避免操作失敗。因此, zip 內部使用了一個 128 個數據的小緩衝。
Observable.range(0, 300)
.doOnRequest(i -> System.out.println("Requested " + i))
.zipWith(
Observable.range(10, 300),
(i1, i2) -> i1 + " - " + i2)
.take(300)
.subscribe();
結果:
Requested 128
Requested 90
Requested 90
Requested 90
zip 操作函式一開始請求足夠(128)的資料來填充緩衝並處理這些資料。這裡 zip 操作函式具體緩衝的資料並不是主要的。讀者應該記住,在 Rx 中不管開發者有沒有主動啟用該功能,有些操作函式內部會使用該功能。這樣可以保證 Rx 資料流更加穩定可擴充套件。
Backpressure 策略
很多 Rx 操作函式內部都使用了 backpressure 從而避免過多的資料填滿內部的佇列。這樣處理慢的消費者就會把這種情況傳遞給前面的消費者,前面的消費者開始緩衝資料直到他也快取滿為止再告訴他前面的消費者。Backpressure 並沒有消除這種情況。只是讓錯誤延遲發生,我們還是需要處理這種情況。
Rx 中有操作函式可以用來處理這種消費者處理不過來的情況。
onBackpressureBuffer
onBackpressureBuffer 會快取所有當前無法消費的資料,直到 Observer 可以處理為止。
你可以指定緩衝的數量,如果緩衝滿了則會導致資料流失敗。
Observable.interval(1, TimeUnit.MILLISECONDS)
.onBackpressureBuffer(1000)
.observeOn(Schedulers.newThread())
.subscribe(
i -> {
System.out.println(i);
try {
Thread.sleep(100);
} catch (Exception e) { }
},
System.out::println
);
結果:
0
1
2
3
4
5
6
7
8
9
10
11
rx.exceptions.MissingBackpressureException: Overflowed buffer of 1000
上面的示例,生產者比消費者快 100 倍。使用 1000個緩衝來處理這種消費者比較慢的情況。當消費者消費 11個數據的時候,緩衝區滿了,生產者生產了 1100個數據。資料流就丟擲異常了。
onBackpressureDrop
如果消費者無法處理資料,則 onBackpressureDrop 就把該資料丟棄了。
Observable.interval(1, TimeUnit.MILLISECONDS)
.onBackpressureDrop()
.observeOn(Schedulers.newThread())
.subscribe(
i -> {
System.out.println(i);
try {
Thread.sleep(100);
} catch (Exception e) { }
},
System.out::println);
結果:
0
1
2
...
126
127
12861
12862
...
這個示例中,前面 128 個數據正常的被處理的,這是應為 observeOn 在切換執行緒的時候, 使用了一個 128 個數據的小緩衝。
最後我們總結一下:
- 背壓是一種策略,具體措施是下游觀察者通知上游的被觀察者傳送事件
- 背壓策略很好的解決了非同步環境下被觀察者和觀察者速度不一致的問題
- 在RxJava1.X中,同樣是Observable,有的不支援背壓策略,導致某些情況下,顯得特別麻煩,出了問題也很難排查,使得RxJava的學習曲線變得十份陡峭。
這篇部落格並不是為了讓你學習在RxJava1.X中如何使用背壓(如果你之前不瞭解背壓的話),因為在RxJava1.X中,背壓的設計並不十分完美。而是希望你對背壓有一個全面清晰的認識,對於它在RxJava1.X中的設計缺陷有所瞭解即可。可喜的是,RxJava2.X中解決了背壓的問題,推出了Flowable(Observable在RxJava2.0中新的實現),而且其中的操作符全部都實現了背壓,是不是很期待呢?