1. 程式人生 > >RX操作之輔助操作符二(doonunsubscribe、doOnCompleted、doOnError、doOnTerminate、finallyDo、delay、delaySubscription)

RX操作之輔助操作符二(doonunsubscribe、doOnCompleted、doOnError、doOnTerminate、finallyDo、delay、delaySubscription)

一、doOnUnSubscribe

取消訂閱時的監聽

 Observable<Integer> observable = Observable.just(1,2,3,4,5,6);
        Subscriber<Integer> subscriber = new Subscriber<Integer>() {

            @Override
            public void onNext(Integer v) {
                Log.e(TAG,"onNext................."+v);
            }

            @Override
            public void onCompleted() {
                Log.e(TAG, "onCompleted.................");
            }

            @Override
            public void onError(Throwable e) {
                Log.e(TAG, "onError.....................");
            }
        };

        observable
                .doOnUnsubscribe(new Action0() {
                    @Override
                    public void call() {
                        Log.e(TAG, "觀察者取消訂閱了它生成的Observable.....................");
                    }
                })
                .subscribe(subscriber);

執行結果:


二、doOnCompleted

Observable正常終止時的監聽

 Observable<Integer> observable = Observable.just(1,2,3,4,5,6);
        Subscriber<Integer> subscriber = new Subscriber<Integer>() {

            @Override
            public void onNext(Integer v) {
                Log.e(TAG,"onNext................."+v);
            }

            @Override
            public void onCompleted() {
                Log.e(TAG, "onCompleted.................");
            }

            @Override
            public void onError(Throwable e) {
                Log.e(TAG, "onError.....................");
            }
        };

        observable
                .doOnCompleted(new Action0() {
                    @Override
                    public void call() {
                        Log.e(TAG, "Observable正常終止了.....................");

                    }
                })
                .subscribe(subscriber);

執行結果:


三、doOnError

出錯時的監聽

 Observable<Integer> observable =  Observable.create(new Observable.OnSubscribe<Integer>() {

            @Override
            public void call(Subscriber<? super Integer> subscriber) {
                for (int i = 0; i < 5; i++) {
                    if(i == 3){
                        subscriber.onError(new Throwable("EROOR"));
                    }else {
                        subscriber.onNext(i);
                    }
                    try {
                        Thread.sleep(1000);
                    } catch (Exception e) {

                    }
                }
                subscriber.onCompleted();
            }
        });
        Subscriber<Integer> subscriber = new Subscriber<Integer>() {

            @Override
            public void onNext(Integer v) {
                Log.e(TAG,"onNext................."+v);
            }

            @Override
            public void onCompleted() {
                Log.e(TAG, "onCompleted.................");
            }

            @Override
            public void onError(Throwable e) {
                Log.e(TAG, "onError.....................");
            }
        };

        observable
                .doOnError(new Action1<Throwable>() {
                    @Override
                    public void call(Throwable throwable) {
                        Log.e(TAG, "出錯了....................."+throwable.toString());
                    }
                })
                .subscribe(subscriber);

執行結果:


四、doOnTerminate

訂閱即將被終止時的監聽,無論是正常終止還是異常終止

 Observable<Integer> observable =  Observable.create(new Observable.OnSubscribe<Integer>() {

            @Override
            public void call(Subscriber<? super Integer> subscriber) {
                for (int i = 0; i < 5; i++) {
                    if(i == 3){
                        subscriber.onError(new Throwable("EROOR"));
                    }else {
                        subscriber.onNext(i);
                    }
                    try {
                        Thread.sleep(1000);
                    } catch (Exception e) {

                    }
                }
                subscriber.onCompleted();
            }
        });
        Subscriber<Integer> subscriber = new Subscriber<Integer>() {

            @Override
            public void onNext(Integer v) {
                Log.e(TAG,"onNext................."+v);
            }

            @Override
            public void onCompleted() {
                Log.e(TAG, "onCompleted.................");
            }

            @Override
            public void onError(Throwable e) {
                Log.e(TAG, "onError.....................");
            }
        };

        observable
                .doOnTerminate(new Action0() {
                    @Override
                    public void call() {
                        Log.e(TAG, "訂閱即將被終止.....................");
                    }
                })
                .subscribe(subscriber);
執行結果:



五、finallyDo

訂閱完成之後的監聽,無論是正常終止還是異常終止

 Observable<Integer> observable =  Observable.create(new Observable.OnSubscribe<Integer>() {

            @Override
            public void call(Subscriber<? super Integer> subscriber) {
                for (int i = 0; i < 5; i++) {
                    if(i == 3){
                        subscriber.onError(new Throwable("EROOR"));
                    }else {
                        subscriber.onNext(i);
                    }
                    try {
                        Thread.sleep(1000);
                    } catch (Exception e) {

                    }
                }
                subscriber.onCompleted();
            }
        });
        Subscriber<Integer> subscriber = new Subscriber<Integer>() {

            @Override
            public void onNext(Integer v) {
                Log.e(TAG,"onNext................."+v);
            }

            @Override
            public void onCompleted() {
                Log.e(TAG, "onCompleted.................");
            }

            @Override
            public void onError(Throwable e) {
                Log.e(TAG, "onError.....................");
            }
        };

        observable
                .finallyDo(new Action0() {
                    @Override
                    public void call() {
                        Log.e(TAG, "訂閱已經終止.....................");
                    }
                })
                .subscribe(subscriber);

執行結果:


六、delay

延遲一段指定的時間再發射來自Observable的發射物。Delay操作符讓原始Observable在發射每項資料之前都暫停一段指定的時間段。效果是Observable發射的資料項在時間上向前整體平移了一個增量。

5s之後資料才打印出來

Integer[]items = {1,2,3,4,5};
        Observable<Integer>observable = Observable.from(items).delay(5,TimeUnit.SECONDS).observeOn(Schedulers.newThread()).subscribeOn(Schedulers.newThread());
        Subscriber<Integer> subscriber = new Subscriber<Integer>() {

            @Override
            public void onNext(Integer v) {
                Log.e(TAG,"onNext................."+v);
            }

            @Override
            public void onCompleted() {
                Log.e(TAG, "onCompleted.................");
            }

            @Override
            public void onError(Throwable e) {
                Log.e(TAG, "onError.....................");
            }
        };

        observable.subscribe(subscriber);
執行結果:



七、delaySubscription

延遲訂閱源Observable

 Integer[]items = {1,2,3,4,5};
        Observable<Integer>observable = Observable.from(items);
        Subscriber<Integer> subscriber = new Subscriber<Integer>() {

            @Override
            public void onNext(Integer v) {
                Log.e(TAG,"onNext................."+v);
            }

            @Override
            public void onCompleted() {
                Log.e(TAG, "onCompleted.................");
            }

            @Override
            public void onError(Throwable e) {
                Log.e(TAG, "onError.....................");
            }
        };

        observable
                .delaySubscription(2,TimeUnit.SECONDS)
                .doOnSubscribe(new Action0() {
                    @Override
                    public void call() {
                        Log.e(TAG, "觀察者訂閱了它生成的Observable.....................");
                    }
                }).subscribe(subscriber);

訂閱後,延遲了2s後onNext函式才被呼叫

執行結果:


八、timeInterval

返回聯絡發射的observable的時間間隔

Observable<TimeInterval<Long>>observable = Observable.interval(1,TimeUnit.SECONDS).take(5).timeInterval();
        Subscriber<TimeInterval<Long>> subscriber = new Subscriber<TimeInterval<Long>>() {

            @Override
            public void onNext(TimeInterval<Long> v) {
                Log.e(TAG,"onNext................."+v.getValue()+"....................停了"+v.getIntervalInMilliseconds()+"毫s發射了一條資料");
            }

            @Override
            public void onCompleted() {
                Log.e(TAG, "onCompleted.................");
            }

            @Override
            public void onError(Throwable e) {
                Log.e(TAG, "onError.....................");
            }
        };

        observable.subscribe(subscriber);


執行結果: