1. 程式人生 > >Android RxJava操作符的學習---條件 / 布林操作符

Android RxJava操作符的學習---條件 / 布林操作符

3.6 條件 / 布林操作符

3.6.1. 作用

通過設定函式,判斷被觀察者(Observable)傳送的事件是否符合條件


3.6.2. 型別

RxJava2中,條件 / 布林操作符的型別包括:

  • 下面,我將對每個操作符進行詳細講解

3.6.3. 具體操作符詳解

  • 注:在使用RxJava 2操作符前,記得在專案的Gradle中新增依賴:
dependencies {
      compile 'io.reactivex.rxjava2:rxandroid:2.0.1'
      compile 'io.reactivex.rxjava2:rxjava:2.0.7'
      // 注:RxJava2 與 RxJava1 不能共存,即依賴不能同時存在
}

3.6.3.1 all()

  • 作用
    判斷髮送的每項資料是否都滿足 設定的函式條件

若滿足,返回 true;否則,返回 false

  • 具體程式碼

  Observable.just(1,2,3,4,5,6)
                .all(new Predicate<Integer>(){
                    @Override
                    public boolean test( Integer integer) throws Exception {
                        return (integer<=10);
                        // 該函式用於判斷Observable傳送的10個數據是否都滿足integer<=10
                    }
                }).subscribe(new Consumer<Boolean>() {
            @Override
            public void accept(Boolean aBoolean) throws Exception {
                Log.d(TAG,"result is "+ aBoolean);
                // 輸出返回結果
            }

        });


  • 測試結果

因為所有資料都滿足函式內條件 (每項資料<=10)

3.6.3.2 takeWhile()

  • 作用
    判斷髮送的每項資料是否滿足 設定函式條件

若傳送的資料滿足該條件,則傳送該項資料;否則不傳送

  • 具體程式碼
// 1. 每1s傳送1個數據 = 從0開始,遞增1,即0、1、2、3
        Observable.interval(1, TimeUnit.SECONDS)
                // 2. 通過takeWhile傳入一個判斷條件
                .takeWhile(new Predicate<Long>(){
                    @Override
                    public boolean test( Long integer) throws Exception {
                        return (integer<3);
                        // 當傳送的資料滿足<3時,才傳送Observable的資料
                    }
                }).subscribe(new Observer<Long>() {
            @Override
            public void onSubscribe(Disposable d) {
            }

            @Override
            public void onNext(Long value) {
                Log.d(TAG,"傳送了事件 "+ value);
            }

            @Override
            public void onError(Throwable e) {
            }

            @Override
            public void onComplete() {
            }
        });
  • 測試結果

3.6.3.3 skipWhile()

  • 作用
    判斷髮送的每項資料是否滿足 設定函式條件

直到該判斷條件 = false時,才開始傳送Observable的資料

  • 具體使用
// 1. 每隔1s傳送1個數據 = 從0開始,每次遞增1
        Observable.interval(1, TimeUnit.SECONDS)
                // 2. 通過skipWhile()設定判斷條件
                .skipWhile(new Predicate<Long>(){
                    @Override
                    public boolean test( Long aLong) throws Exception {
                        return (aLong<5);
                        // 直到判斷條件不成立 = false = 發射的資料≥5,才開始傳送資料
                    }
                }).subscribe(new Observer<Long>() {
            @Override
            public void onSubscribe(Disposable d) {
            }

            @Override
            public void onNext(Long value) {
                Log.d(TAG,"傳送了事件 "+ value);
            }

            @Override
            public void onError(Throwable e) {
            }

            @Override
            public void onComplete() {
            }
        });
  • 測試結果

3.6.3.4 takeUntil()

  • 作用
    執行到某個條件時,停止傳送事件
  • 具體使用
// 1. 每1s傳送1個數據 = 從0開始,遞增1,即0、1、2、3
        Observable.interval(1, TimeUnit.SECONDS)
                // 2. 通過takeUntil的Predicate傳入判斷條件
                .takeUntil(new Predicate<Long>(){
                    @Override
                    public boolean test( Long integer) throws Exception {
                        return (integer>3);
                        // 返回true時,就停止傳送事件
                        // 當傳送的資料滿足>3時,就停止傳送Observable的資料
                    }
                }).subscribe(new Observer<Long>() {
            @Override
            public void onSubscribe(Disposable d) {
            }

            @Override
            public void onNext(Long value) {
                Log.d(TAG,"傳送了事件 "+ value);
            }

            @Override
            public void onError(Throwable e) {
            }

            @Override
            public void onComplete() {
            }
        });

  • 測試結果

該判斷條件也可以是Observable,即 等到 takeUntil() 傳入的Observable開始傳送資料,(原始)第1個Observable的資料停止傳送資料

// (原始)第1個Observable:每隔1s傳送1個數據 = 從0開始,每次遞增1
        Observable.interval(1, TimeUnit.SECONDS)
                // 第2個Observable:延遲5s後開始傳送1個Long型資料
                .takeUntil(Observable.timer(5, TimeUnit.SECONDS))
                .subscribe(new Observer<Long>() {
                    @Override
                    public void onSubscribe(Disposable d) {
                        Log.d(TAG, "開始採用subscribe連線");
                    }

                    @Override
                    public void onNext(Long value) {
                        Log.d(TAG, "接收到了事件"+ value  );
                    }

                    @Override
                    public void onError(Throwable e) {
                        Log.d(TAG, "對Error事件作出響應");
                    }

                    @Override
                    public void onComplete() {
                        Log.d(TAG, "對Complete事件作出響應");
                    }

                });
  • 測試結果

當第 5s 時,第2個 Observable 開始傳送資料,於是(原始)第1個 Observable 停止傳送資料

3.6.3.5 skipUntil()

  • 作用
    等到 skipUntil() 傳入的Observable開始傳送資料,(原始)第1個Observable的資料才開始傳送資料

  • 具體使用

                // (原始)第1個Observable:每隔1s傳送1個數據 = 從0開始,每次遞增1
        Observable.interval(1, TimeUnit.SECONDS)
                // 第2個Observable:延遲5s後開始傳送1個Long型資料
                .skipUntil(Observable.timer(5, TimeUnit.SECONDS))
                .subscribe(new Observer<Long>() {
                    @Override
                    public void onSubscribe(Disposable d) {
                        Log.d(TAG, "開始採用subscribe連線");
                    }

                    @Override
                    public void onNext(Long value) {
                        Log.d(TAG, "接收到了事件"+ value  );
                    }

                    @Override
                    public void onError(Throwable e) {
                        Log.d(TAG, "對Error事件作出響應");
                    }

                    @Override
                    public void onComplete() {
                        Log.d(TAG, "對Complete事件作出響應");
                    }

                });
  • 測試結果

5s後( skipUntil() 傳入的Observable開始傳送資料),(原始)第1個Observable的資料才開始傳送

3.6.3.6 SequenceEqual()

  • 作用
    判定兩個Observables需要傳送的資料是否相同

若相同,返回 true;否則,返回 false

  • 具體使用
Observable.sequenceEqual(
                Observable.just(4,5,6),
                Observable.just(4,5,6)
        )
                .subscribe(new Consumer<Boolean>() {
                    @Override
                    public void accept( Boolean aBoolean) throws Exception {
                        Log.d(TAG,"2個Observable是否相同:"+ aBoolean);
                        // 輸出返回結果
                    }
                });
  • 測試結果

3.6.3.7 contains()

  • 作用
    判斷髮送的資料中是否包含指定資料
  1. 若包含,返回 true;否則,返回 false
  2. 內部實現 = exists()
  • 具體程式碼
Observable.just(1,2,3,4,5,6)
                .contains(4)
                .subscribe(new Consumer<Boolean>() {
            @Override
            public void accept(Boolean aBoolean) throws Exception {
                Log.d(TAG,"result is "+ aBoolean);
                // 輸出返回結果
            }

        });
  • 測試結果

因為傳送的資料中包含4

3.6.3.8 isEmpty()

  • 作用
    判斷髮送的資料是否為空

若為空,返回 true;否則,返回 false

  • 具體程式碼
Observable.just(1,2,3,4,5,6)
                .isEmpty() // 判斷髮送的資料中是否為空
                .subscribe(new Consumer<Boolean>() {
                    @Override
                    public void accept(Boolean aBoolean) throws Exception {
                        Log.d(TAG,"result is "+ aBoolean);
                        // 輸出返回結果
                    }
            // 輸出返回結果

    });
  • 測試結果

因為傳送的資料不為空

3.6.3.9 amb()

  • 作用
    當需要傳送多個 Observable時,只發送 先發送資料的Observable的資料,而其餘 Observable則被丟棄。

  • 具體程式碼

       // 設定2個需要傳送的Observable & 放入到集合中
        List<ObservableSource<Integer>> list= new ArrayList <>();
        // 第1個Observable延遲1秒發射資料
        list.add( Observable.just(1,2,3).delay(1,TimeUnit.SECONDS));
        // 第2個Observable正常傳送資料
        list.add( Observable.just(4,5,6));

        // 一共需要傳送2個Observable的資料
        // 但由於使用了amba(),所以僅傳送先發送資料的Observable
        // 即第二個(因為第1個延時了)
        Observable.amb(list).subscribe(new Consumer<Integer>() {
            @Override
            public void accept(Integer integer) throws Exception {
                Log.e(TAG, "接收到了事件 "+integer);
            }
        });
  • 測試結果

即只發送了先發送資料的Observable的資料 = 4,5,6

3.6.3.10 defaultIfEmpty()

  • 作用
    在不傳送任何有效事件( Next事件)、僅傳送了 Complete 事件的前提下,傳送一個預設值

  • 具體使用

Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> e) throws Exception {
                // 不傳送任何有效事件
                //  e.onNext(1);
                //  e.onNext(2);

                // 僅傳送Complete事件
                e.onComplete();
            }
        }).defaultIfEmpty(10) // 若僅傳送了Complete事件,預設傳送 值 = 10
                .subscribe(new Observer<Integer>() {
                    @Override
                    public void onSubscribe(Disposable d) {
                        Log.d(TAG, "開始採用subscribe連線");
                    }

                    @Override
                    public void onNext(Integer value) {
                        Log.d(TAG, "接收到了事件"+ value  );
                    }

                    @Override
                    public void onError(Throwable e) {
                        Log.d(TAG, "對Error事件作出響應");
                    }

                    @Override
                    public void onComplete() {
                        Log.d(TAG, "對Complete事件作出響應");
                    }
                });
  • 測試結果

3.6.4. 總結

  • 下面,我將用一張圖總結 RxJava2 中常用的條件 / 布林操作符