對RxJava2理解的總結
前言:
如果你在看過諸多RxJava2相關的文章,仍然對RxJava2沒有一個整體的認識的話,可以看一下這篇文章。全篇,以問答的形式,由簡入繁,一步步帶你認識RxJava2。如果你還沒有學過RxJava2,那可以參考這篇 :RxJava 2.x 入門教程
1、什麼是RxJava2?
答:RxJava2是一個工具,也是一個庫。
2、什麼是工具或是庫?
答:工具,就是用來做事的。比如,剪刀可以用來剪紙。
3、那麼,RxJava2是用來做什麼的工具?
答:RxJava2是一個基於觀察者模式,以響應流的形式處理非同步請求的工具。
4、那它是怎麼處理的?
答:因為RxJava2基於觀察者模式,所以,得先建立兩個物件:被觀察者和觀察者。然後,再通過多種資料的處理方法,比如:過濾操作、組合操作、轉換操作、背壓策略等等,來實現對資料的非同步請求處理。
5、Observable和Flowable,有什麼區別?
答:Observable不支援背壓,Flowable支援背壓。
6、那什麼是背壓?
答:因為觀察者模式,是被觀察者(上游)通知觀察者(下游)事件的改變,所以,不免會出現這樣的問題:上游通知事件改變的速度,遠遠大於下游處理事件通知的速度。比方說:上游,從資料庫裡拿使用者資料。下游,對使用者資料進行處理。但上游,拿的速度過快,下游,處理的速度過慢時。上游的資料就會不斷的積壓,最後導致快取爆了。這個時候,就得有背壓支援處理。背壓,通過不同的策略,來控制上游的事件處理速度。。。這樣,如果下游在1秒內只能處理10條,他就告訴上游我只要10條。就不會出現快取爆了的情況。那剩下的其它堆積事件怎麼處理?有不同的背壓策略處理它們,比如丟棄,丟擲異常,加大快取。簡言之,背壓就是一種能夠控制上游事件通知的速度的策略。
7,聽起來好抽象,能用程式碼輔助解釋下?
答:那先看Observable的:
// 模仿事件積壓
private void doObservable() {
Observable.create(new ObservableOnSubscribe<Double>() {
@Override
public void subscribe(ObservableEmitter<Double> emitter) throws Exception {
for (double i = 0; i < 1_000_000_000_000D; ++i) {
emitter.onNext(1_000_000_000_000D);
}
emitter.onComplete();
}
}).observeOn(Schedulers.newThread())
.subscribeOn(Schedulers.newThread())
.subscribe(new Observer<Double>() {
@Override
public void onSubscribe(Disposable d) {
// d.dispose();
}
@Override
public void onNext(Double aDouble) {
try {
log("integer-1=" + aDouble + RxJava2Demo.getThreadName());
// 模仿事件積壓
Thread.sleep(100_000);
}catch (Exception e){
log("onNextError=" + e.getMessage());
}
log("integer-2=" + aDouble + RxJava2Demo.getThreadName());
}
@Override
public void onError(Throwable t) {
log("onError=" + t.getMessage() + RxJava2Demo.getThreadName());
}
@Override
public void onComplete() {
log("onComplete=" + RxJava2Demo.getThreadName());
}
});
}
Observable會限制上游通知事件的速度為1,即每次都會處理完onNext裡面的程式碼。不管你的onNext執行了多久,都得等到onNext執行完,再呼叫subscribe(),繼續下一個事件的通知。這樣,就會有一個問題,當上遊的事件積累到一定程度的時候,就會爆發OOM。要解決這個問題,就需要用Flowable,這個支援背壓的API。注意,因為安卓系統會不斷的進行GC操作,而且這裡只是一個double,我的手機也有6G執行記憶體,所以,崩潰的概率是不可保證的,我的手機在執行好幾分鐘後,崩了。為了確保可以崩潰,你可以自己另行測試,做一些資料庫或檔案讀取的,載入大資料的操作。
再看Flowable的程式碼
private void doFlowable() {
Flowable.create((FlowableOnSubscribe<Double>) emitter -> {
for (double i = 0; i < 128; ++i) {
emitter.onNext(i);
}
emitter.onComplete();
}, BackpressureStrategy.ERROR)
.subscribeOn(Schedulers.newThread())
.observeOn(Schedulers.newThread())
.subscribe(new Subscriber<Double>() {
@Override
public void onSubscribe(Subscription s) {
s.request(128);
}
@Override
public void onNext(Double aDouble) {
log("integer=" + aDouble + RxJava2Demo.getThreadName());
try {
Thread.sleep(1_500);
} catch (Exception e) {
log("onNext=" + e.getMessage());
}
}
@Override
public void onError(Throwable t) {
log("onError=" + t.getMessage() + RxJava2Demo.getThreadName());
}
@Override
public void onComplete() {
log("onComplete=" + RxJava2Demo.getThreadName());
}
});
}
Flowable支援背壓,背壓有多種策略,這裡用的是ERROR。支援背壓,也即意味著,它有能力去處理事件的堆積。但是,Flowable的快取池只有128個。所以,一次性發送的事件也不能超過128個,否則會丟擲異常,觸發onError()方法。不同的策略,對於積壓的事件,有不同的處理。具體,可以參考一些其它的部落格:背壓詳解。
明白了,謝謝你的分享。
答:不用謝,有問題再繼續說。