1. 程式人生 > >對RxJava2理解的總結

對RxJava2理解的總結

前言:
如果你在看過諸多RxJava2相關的文章,仍然對RxJava2沒有一個整體的認識的話,可以看一下這篇文章。全篇,以問答的形式,由簡入繁,一步步帶你認識RxJava2。如果你還沒有學過RxJava2,那可以參考這篇 :RxJava 2.x 入門教程

1、什麼是RxJava2?
答:RxJava2是一個工具,也是一個庫。

2、什麼是工具或是庫?
答:工具,就是用來做事的。比如,剪刀可以用來剪紙。

3、那麼,RxJava2是用來做什麼的工具?
答:RxJava2是一個基於觀察者模式,以響應流的形式處理非同步請求的工具。

4、那它是怎麼處理的?
答:因為RxJava2基於觀察者模式,所以,得先建立兩個物件:被觀察者和觀察者。然後,再通過多種資料的處理方法,比如:過濾操作、組合操作、轉換操作、背壓策略等等,來實現對資料的非同步請求處理。

5、Observable和Flowable,有什麼區別?
答:Observable不支援背壓,Flowable支援背壓。

6、那什麼是背壓?
答:因為觀察者模式,是被觀察者(上游)通知觀察者(下游)事件的改變,所以,不免會出現這樣的問題:上游通知事件改變的速度,遠遠大於下游處理事件通知的速度。比方說:上游,從資料庫裡拿使用者資料。下游,對使用者資料進行處理。但上游,拿的速度過快,下游,處理的速度過慢時。上游的資料就會不斷的積壓,最後導致快取爆了。這個時候,就得有背壓支援處理。背壓,通過不同的策略,來控制上游的事件處理速度。。。這樣,如果下游在1秒內只能處理10條,他就告訴上游我只要10條。就不會出現快取爆了的情況。那剩下的其它堆積事件怎麼處理?有不同的背壓策略處理它們,比如丟棄,丟擲異常,加大快取。簡言之,背壓就是一種能夠控制上游事件通知的速度的策略。

7,聽起來好抽象,能用程式碼輔助解釋下?
答:那先看Observable的:

   // 模仿事件積壓
    private void doObservable() {

        Observable.create(new ObservableOnSubscribe<Double>() {
            @Override
            public void subscribe(ObservableEmitter<Double> emitter) throws Exception {
                for (double
i = 0; i < 1_000_000_000_000D; ++i) { emitter.onNext(1_000_000_000_000D); } emitter.onComplete(); } }).observeOn(Schedulers.newThread()) .subscribeOn(Schedulers.newThread()) .subscribe(new Observer<Double>() { @Override public void onSubscribe(Disposable d) { // d.dispose(); } @Override public void onNext(Double aDouble) { try { log("integer-1=" + aDouble + RxJava2Demo.getThreadName()); // 模仿事件積壓 Thread.sleep(100_000); }catch (Exception e){ log("onNextError=" + e.getMessage()); } log("integer-2=" + aDouble + RxJava2Demo.getThreadName()); } @Override public void onError(Throwable t) { log("onError=" + t.getMessage() + RxJava2Demo.getThreadName()); } @Override public void onComplete() { log("onComplete=" + RxJava2Demo.getThreadName()); } }); }

Observable會限制上游通知事件的速度為1,即每次都會處理完onNext裡面的程式碼。不管你的onNext執行了多久,都得等到onNext執行完,再呼叫subscribe(),繼續下一個事件的通知。這樣,就會有一個問題,當上遊的事件積累到一定程度的時候,就會爆發OOM。要解決這個問題,就需要用Flowable,這個支援背壓的API。注意,因為安卓系統會不斷的進行GC操作,而且這裡只是一個double,我的手機也有6G執行記憶體,所以,崩潰的概率是不可保證的,我的手機在執行好幾分鐘後,崩了。為了確保可以崩潰,你可以自己另行測試,做一些資料庫或檔案讀取的,載入大資料的操作。

再看Flowable的程式碼

 private void doFlowable() {

        Flowable.create((FlowableOnSubscribe<Double>) emitter -> {

            for (double i = 0; i < 128; ++i) {
                emitter.onNext(i);
            }

            emitter.onComplete();
        }, BackpressureStrategy.ERROR)
                .subscribeOn(Schedulers.newThread())
                .observeOn(Schedulers.newThread())
                .subscribe(new Subscriber<Double>() {
                    @Override
                    public void onSubscribe(Subscription s) {
                        s.request(128);
                    }

                    @Override
                    public void onNext(Double aDouble) {

                        log("integer=" + aDouble + RxJava2Demo.getThreadName());
                        try {
                            Thread.sleep(1_500);
                        } catch (Exception e) {
                            log("onNext=" + e.getMessage());
                        }

                    }

                    @Override
                    public void onError(Throwable t) {
                        log("onError=" + t.getMessage() + RxJava2Demo.getThreadName());
                    }

                    @Override
                    public void onComplete() {
                        log("onComplete=" + RxJava2Demo.getThreadName());
                    }
                });
    }

Flowable支援背壓,背壓有多種策略,這裡用的是ERROR。支援背壓,也即意味著,它有能力去處理事件的堆積。但是,Flowable的快取池只有128個。所以,一次性發送的事件也不能超過128個,否則會丟擲異常,觸發onError()方法。不同的策略,對於積壓的事件,有不同的處理。具體,可以參考一些其它的部落格:背壓詳解

明白了,謝謝你的分享。
答:不用謝,有問題再繼續說。