1. 程式人生 > >Android RxJava操作符的學習---組合 / 合併操作符

Android RxJava操作符的學習---組合 / 合併操作符

3.3 組合 / 合併操作符

3.3.1. 作用
組合 多個被觀察者(Observable) & 合併需要傳送的事件

  • 應用場景

    1. 組合多個被觀察者
    2. 合併多個事件
    3. 傳送事件前追加發送事件
    4. 統計傳送事件數量

3.3.2. 型別

根據上述應用場景,常見的組合 / 合併操作符 主要有: 

 

3.3.3. 應用場景 & 對應操作符 介紹

注:在使用RxJava 2操作符前,記得在專案的Gradle中新增依賴:

dependencies {
      compile 'io.reactivex.rxjava2:rxandroid:2.0.1'
      compile 'io.reactivex.rxjava2:rxjava:2.0.7'
      // 注:RxJava2 與 RxJava1 不能共存,即依賴不能同時存在
}

3.3.3.1 組合多個被觀察者

該型別的操作符的作用 = 組合多個被觀察者

concat() / concatArray()

  • 作用
    組合多個被觀察者一起傳送資料,合併後 按傳送順序序列執行

二者區別:組合被觀察者的數量,即concat()組合被觀察者數量≤4個,而concatArray()則可>4個

  • 具體使用
// concat():組合多個被觀察者(≤4個)一起傳送資料
        // 注:序列執行
        Observable.concat(Observable.just(1, 2, 3),
                           Observable.just(4, 5, 6),
                           Observable.just(7, 8, 9),
                           Observable.just(10, 11, 12))
                  .subscribe(new Observer<Integer>() {
                    @Override
                    public void onSubscribe(Disposable d) {

                    }

                    @Override
                    public void onNext(Integer value) {
                        Log.d(TAG, "接收到了事件"+ value  );
                    }

                    @Override
                    public void onError(Throwable e) {
                        Log.d(TAG, "對Error事件作出響應");
                    }

                    @Override
                    public void onComplete() {
                        Log.d(TAG, "對Complete事件作出響應");
                    }
                });

// concatArray():組合多個被觀察者一起傳送資料(可>4個)
        // 注:序列執行
        Observable.concatArray(Observable.just(1, 2, 3),
                           Observable.just(4, 5, 6),
                           Observable.just(7, 8, 9),
                           Observable.just(10, 11, 12),
                           Observable.just(13, 14, 15))
                  .subscribe(new Observer<Integer>() {
                    @Override
                    public void onSubscribe(Disposable d) {

                    }

                    @Override
                    public void onNext(Integer value) {
                        Log.d(TAG, "接收到了事件"+ value  );
                    }

                    @Override
                    public void onError(Throwable e) {
                        Log.d(TAG, "對Error事件作出響應");
                    }

                    @Override
                    public void onComplete() {
                        Log.d(TAG, "對Complete事件作出響應");
                    }
                });

  • 測試結果

 

 

merge() / mergeArray()

  • 作用
    組合多個被觀察者一起傳送資料,合併後 按時間線並行執行
  1. 二者區別:組合被觀察者的數量,即merge()組合被觀察者數量≤4個,而mergeArray()則可>4個
  2. 區別上述concat()操作符:同樣是組合多個被觀察者一起傳送資料,但concat()操作符合並後是按傳送順序序列執行
  • 具體使用
// merge():組合多個被觀察者(<4個)一起傳送資料
        // 注:合併後按照時間線並行執行
        Observable.merge(
                Observable.intervalRange(0, 3, 1, 1, TimeUnit.SECONDS), // 從0開始傳送、共傳送3個數據、第1次事件延遲傳送時間 = 1s、間隔時間 = 1s
                Observable.intervalRange(2, 3, 1, 1, TimeUnit.SECONDS)) // 從2開始傳送、共傳送3個數據、第1次事件延遲傳送時間 = 1s、間隔時間 = 1s
                  .subscribe(new Observer<Long>() {
                    @Override
                    public void onSubscribe(Disposable d) {

                    }

                    @Override
                    public void onNext(Long value) {
                        Log.d(TAG, "接收到了事件"+ value  );
                    }

                    @Override
                    public void onError(Throwable e) {
                        Log.d(TAG, "對Error事件作出響應");
                    }

                    @Override
                    public void onComplete() {
                        Log.d(TAG, "對Complete事件作出響應");
                    }
                });

// mergeArray() = 組合4個以上的被觀察者一起傳送資料,此處不作過多演示,類似concatArray()

  • 測試結果

兩個被觀察者傳送事件並行執行,輸出結果 = 0,2 -> 1,3 -> 2,4

concatDelayError() / mergeDelayError()

  • 作用

  • 具體使用

a. 無使用concatDelayError()的情況

Observable.concat(
                Observable.create(new ObservableOnSubscribe<Integer>() {
                    @Override
                    public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {

                        emitter.onNext(1);
                        emitter.onNext(2);
                        emitter.onNext(3);
                        emitter.onError(new NullPointerException()); // 傳送Error事件,因為無使用concatDelayError,所以第2個Observable將不會發送事件
                        emitter.onComplete();
                    }
                }),
                Observable.just(4, 5, 6))
                .subscribe(new Observer<Integer>() {
                    @Override
                    public void onSubscribe(Disposable d) {

                    }
                    @Override
                    public void onNext(Integer value) {
                        Log.d(TAG, "接收到了事件"+ value  );
                    }

                    @Override
                    public void onError(Throwable e) {
                        Log.d(TAG, "對Error事件作出響應");
                    }

                    @Override
                    public void onComplete() {
                        Log.d(TAG, "對Complete事件作出響應");
                    }
                });

測試結果:第1個被觀察者傳送Error事件後,第2個被觀察者則不會繼續傳送事件

 

<-- 使用了concatDelayError()的情況 -->
Observable.concatArrayDelayError(
                Observable.create(new ObservableOnSubscribe<Integer>() {
                    @Override
                    public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {

                        emitter.onNext(1);
                        emitter.onNext(2);
                        emitter.onNext(3);
                        emitter.onError(new NullPointerException()); // 傳送Error事件,因為使用了concatDelayError,所以第2個Observable將會發送事件,等傳送完畢後,再發送錯誤事件
                        emitter.onComplete();
                    }
                }),
                Observable.just(4, 5, 6))
                .subscribe(new Observer<Integer>() {
                    @Override
                    public void onSubscribe(Disposable d) {

                    }
                    @Override
                    public void onNext(Integer value) {
                        Log.d(TAG, "接收到了事件"+ value  );
                    }

                    @Override
                    public void onError(Throwable e) {
                        Log.d(TAG, "對Error事件作出響應");
                    }

                    @Override
                    public void onComplete() {
                        Log.d(TAG, "對Complete事件作出響應");
                    }
                });

測試結果:第1個被觀察者的Error事件將在第2個被觀察者傳送完事件後再繼續傳送

mergeDelayError()操作符同理,此處不作過多演示


3.3.3.2 合併多個事件

該型別的操作符主要是對多個被觀察者中的事件進行合併處理。

Zip()

  • 作用
    合併 多個被觀察者(Observable)傳送的事件,生成一個新的事件序列(即組合過後的事件序列),並最終傳送

  • 原理
    具體請看下圖

  • 特別注意:
  1. 事件組合方式 = 嚴格按照原先事件序列 進行對位合併
  2. 最終合併的事件數量 = 多個被觀察者(Observable)中數量最少的數量

即如下圖

 

  • 具體使用
<-- 建立第1個被觀察者 -->
        Observable<Integer> observable1 = Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                Log.d(TAG, "被觀察者1傳送了事件1");
                emitter.onNext(1);
                // 為了方便展示效果,所以在傳送事件後加入2s的延遲
                Thread.sleep(1000);

                Log.d(TAG, "被觀察者1傳送了事件2");
                emitter.onNext(2);
                Thread.sleep(1000);

                Log.d(TAG, "被觀察者1傳送了事件3");
                emitter.onNext(3);
                Thread.sleep(1000);

                emitter.onComplete();
            }
        }).subscribeOn(Schedulers.io()); // 設定被觀察者1在工作執行緒1中工作

<-- 建立第2個被觀察者 -->
        Observable<String> observable2 = Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> emitter) throws Exception {
                Log.d(TAG, "被觀察者2傳送了事件A");
                emitter.onNext("A");
                Thread.sleep(1000);

                Log.d(TAG, "被觀察者2傳送了事件B");
                emitter.onNext("B");
                Thread.sleep(1000);

                Log.d(TAG, "被觀察者2傳送了事件C");
                emitter.onNext("C");
                Thread.sleep(1000);

                Log.d(TAG, "被觀察者2傳送了事件D");
                emitter.onNext("D");
                Thread.sleep(1000);

                emitter.onComplete();
            }
        }).subscribeOn(Schedulers.newThread());// 設定被觀察者2在工作執行緒2中工作
        // 假設不作執行緒控制,則該兩個被觀察者會在同一個執行緒中工作,即傳送事件存在先後順序,而不是同時傳送

<-- 使用zip變換操作符進行事件合併 -->
// 注:建立BiFunction物件傳入的第3個引數 = 合併後資料的資料型別
        Observable.zip(observable1, observable2, new BiFunction<Integer, String, String>() {
            @Override
            public String apply(Integer integer, String string) throws Exception {
                return  integer + string;
            }
        }).subscribe(new Observer<String>() {
            @Override
            public void onSubscribe(Disposable d) {
                Log.d(TAG, "onSubscribe");
            }

            @Override
            public void onNext(String value) {
                Log.d(TAG, "最終接收到的事件 =  " + value);
            }

            @Override
            public void onError(Throwable e) {
                Log.d(TAG, "onError");
            }

            @Override
            public void onComplete() {
                Log.d(TAG, "onComplete");
            }
        });
  • 測試結果

 

 D事件有時沒出現,都報錯

2018-11-03 16:29:47.988 3767-3786/com.example.administrator.hello E/AndroidRuntime: FATAL EXCEPTION: RxNewThreadScheduler-1
    Process: com.example.administrator.hello, PID: 3767
    java.lang.InterruptedException
        at java.lang.Thread.sleep(Native Method)
        at java.lang.Thread.sleep(Thread.java:371)
        at java.lang.Thread.sleep(Thread.java:313)
        at com.example.administrator.hello.MainActivity$2.subscribe(MainActivity.java:797)
        at io.reactivex.internal.operators.observable.ObservableCreate.subscribeActual(ObservableCreate.java:40)
        at io.reactivex.Observable.subscribe(Observable.java:10179)
        at io.reactivex.internal.operators.observable.ObservableSubscribeOn$1.run(ObservableSubscribeOn.java:39)
        at io.reactivex.Scheduler$1.run(Scheduler.java:134)
        at io.reactivex.internal.schedulers.ScheduledRunnable.run(ScheduledRunnable.java:59)
        at io.reactivex.internal.schedulers.ScheduledRunnable.call(ScheduledRunnable.java:51)
        at java.util.concurrent.FutureTask.run(FutureTask.java:237)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:272)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1133)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:607)
        at java.lang.Thread.run(Thread.java:761)

把休眠時間延長也不行,執行緒休眠的問題,好像導致堵塞,去掉這2個休眠就好了 

 Log.d(TAG, "被觀察者2傳送了事件C");
                emitter.onNext("C");
              //  Thread.sleep(2000);

                Log.d(TAG, "被觀察者2傳送了事件D");
                emitter.onNext("D");
            //   Thread.sleep(2000);

特別注意:

  1. 儘管被觀察者2的事件D沒有事件與其合併,但還是會繼續傳送
  2. 若在被觀察者1 & 被觀察者2的事件序列最後傳送onComplete()事件,則被觀察者2的事件D也不會發送,測試結果如下

 

  • 因為Zip()操作符較為複雜 & 難理解,此處將用1張圖總結

combineLatest()

  • 作用
    當兩個Observables中的任何一個傳送了資料後,將先發送了資料的Observables 的最新(最後)一個數據 與 另外一個Observable傳送的每個資料結合,最終基於該函式的結果傳送資料

Zip()的區別:Zip() = 按個數合併,即1對1合併;CombineLatest() = 按時間合併,即在同一個時間點上合併

  • 具體使用
Observable.combineLatest(
                    Observable.just(1L, 2L, 3L), // 第1個傳送資料事件的Observable
                    Observable.intervalRange(0, 3, 1, 1, TimeUnit.SECONDS), // 第2個傳送資料事件的Observable:從0開始傳送、共傳送3個數據、第1次事件延遲傳送時間 = 1s、間隔時間 = 1s
                    new BiFunction<Long, Long, Long>() {
                @Override
                public Long apply(Long o1, Long o2) throws Exception {
                    // o1 = 第1個Observable傳送的最新(最後)1個數據
                    // o2 = 第2個Observable傳送的每1個數據
                    Log.e(TAG, "合併的資料是: "+ o1 + " "+ o2);
                    return o1 + o2;
                    // 合併的邏輯 = 相加
                    // 即第1個Observable傳送的最後1個數據 與 第2個Observable傳送的每1個數據進行相加
                }
            }).subscribe(new Consumer<Long>() {
                @Override
                public void accept(Long s) throws Exception {
                    Log.e(TAG, "合併的結果是: "+s);
                }
            });
  • 測試結果

combineLatestDelayError()

作用類似於concatDelayError() / mergeDelayError() ,即錯誤處理,此處不作過多描述

reduce()

  • 作用
    把被觀察者需要傳送的事件聚合成1個事件 & 傳送

聚合的邏輯根據需求撰寫,但本質都是前2個數據聚合,然後與後1個數據繼續進行聚合,依次類推

  • 具體使用
Observable.just(1,2,3,4)
                .reduce(new BiFunction<Integer, Integer, Integer>() {
                    // 在該複寫方法中複寫聚合的邏輯
                    @Override
                    public Integer apply(@NonNull Integer s1, @NonNull Integer s2) throws Exception {
                        Log.e(TAG, "本次計算的資料是: "+s1 +" 乘 "+ s2);
                        return s1 * s2;
                        // 本次聚合的邏輯是:全部資料相乘起來
                        // 原理:第1次取前2個數據相乘,之後每次獲取到的資料 = 返回的資料x原始下1個數據每
                    }
                }).subscribe(new Consumer<Integer>() {
            @Override
            public void accept(@NonNull Integer s) throws Exception {
                Log.e(TAG, "最終計算的結果是: "+s);

            }
        });
  • 測試結果

collect()

  • 作用
    將被觀察者Observable傳送的資料事件收集到一個數據結構裡

  • 具體使用

Observable.just(1, 2, 3 ,4, 5, 6)
                .collect(
                        // 1. 建立資料結構(容器),用於收集被觀察者傳送的資料
                        new Callable<ArrayList<Integer>>() {
                            @Override
                            public ArrayList<Integer> call() throws Exception {
                                return new ArrayList<>();
                            }
                            // 2. 對傳送的資料進行收集
                        }, new BiConsumer<ArrayList<Integer>, Integer>() {
                            @Override
                            public void accept(ArrayList<Integer> list, Integer integer)
                                    throws Exception {
                                // 引數說明:list = 容器,integer = 後者資料
                                list.add(integer);
                                // 對傳送的資料進行收集
                            }
                        }).subscribe(new Consumer<ArrayList<Integer>>() {
            @Override
            public void accept(@NonNull ArrayList<Integer> s) throws Exception {
                Log.e(TAG, "本次傳送的資料是: "+s);

            }
        });
  • 測試結果

3.3.3.3 傳送事件前追加發送事件

startWith() / startWithArray()

  • 作用
    在一個被觀察者傳送事件前,追加發送一些資料 / 一個新的被觀察者

  • 具體使用

<-- 在一個被觀察者傳送事件前,追加發送一些資料 -->
        // 注:追加資料順序 = 後呼叫先追加
        Observable.just(4, 5, 6)
                  .startWith(0)  // 追加單個數據 = startWith()
                  .startWithArray(1, 2, 3) // 追加多個數據 = startWithArray()
                  .subscribe(new Observer<Integer>() {
                    @Override
                    public void onSubscribe(Disposable d) {
                    }

                    @Override
                    public void onNext(Integer value) {
                        Log.d(TAG, "接收到了事件"+ value  );
                    }

                    @Override
                    public void onError(Throwable e) {
                        Log.d(TAG, "對Error事件作出響應");
                    }

                    @Override
                    public void onComplete() {
                        Log.d(TAG, "對Complete事件作出響應");
                    }
                });


<-- 在一個被觀察者傳送事件前,追加發送被觀察者 & 傳送資料 -->
        // 注:追加資料順序 = 後呼叫先追加
        Observable.just(4, 5, 6)
                .startWith(Observable.just(1, 2, 3))
                .subscribe(new Observer<Integer>() {
                    @Override
                    public void onSubscribe(Disposable d) {

                    }

                    @Override
                    public void onNext(Integer value) {
                        Log.d(TAG, "接收到了事件"+ value  );
                    }

                    @Override
                    public void onError(Throwable e) {
                        Log.d(TAG, "對Error事件作出響應");
                    }

                    @Override
                    public void onComplete() {
                        Log.d(TAG, "對Complete事件作出響應");
                    }
                });

  • 測試結果

 

3.3.3.4 統計傳送事件數量

count()

  • 作用
    統計被觀察者傳送事件的數量

  • 具體使用

// 注:返回結果 = Long型別
        Observable.just(1, 2, 3, 4)
                  .count()
                  .subscribe(new Consumer<Long>() {
                    @Override
                    public void accept(Long aLong) throws Exception {
                        Log.e(TAG, "傳送的事件數量 =  "+aLong);

                    }
                });
  • 測試結果

 總結