1. 程式人生 > >Rxjava2 組合 / 合併操作符

Rxjava2 組合 / 合併操作符

  • 組合 多個被觀察者(Observable) & 合併需要傳送的事件

1 組合多個被觀察者

concat() / concatArray()

  • 組合多個被觀察者一起傳送資料,合併後 按傳送順序序列執行

二者區別:組合被觀察者的數量,即concat()組合被觀察者數量≤4個,而concatArray()則>0個

merge() / mergeArray()

  • 組合多個被觀察者一起傳送資料,合併後 按時間線並行執行

concatDelayError() / mergeDelayError()

concatArrayDelayError()/ mergeArrayDelayError()

2 合併多個事件   

Zip() 

  • 合併 多個被觀察者(Observable)傳送的事件,生成一個新的事件序列(即組合過後的事件序列),並最終傳送
  • 特別注意:
  1. 事件組合方式 = 嚴格按照原先事件序列 進行對位合併
  2. 最終合併的事件數量 = 多個被觀察者(Observable)中數量最少的數量

<-- 建立第1個被觀察者 -->
        Observable<Integer> observable1 = Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                Log.d(TAG, "被觀察者1傳送了事件1");
                emitter.onNext(1);
                // 為了方便展示效果,所以在傳送事件後加入2s的延遲
                Thread.sleep(1000);

                Log.d(TAG, "被觀察者1傳送了事件2");
                emitter.onNext(2);
                Thread.sleep(1000);

                Log.d(TAG, "被觀察者1傳送了事件3");
                emitter.onNext(3);
                Thread.sleep(1000);

                emitter.onComplete();
            }
        }).subscribeOn(Schedulers.io()); // 設定被觀察者1在工作執行緒1中工作

<-- 建立第2個被觀察者 -->
        Observable<String> observable2 = Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> emitter) throws Exception {
                Log.d(TAG, "被觀察者2傳送了事件A");
                emitter.onNext("A");
                Thread.sleep(1000);

                Log.d(TAG, "被觀察者2傳送了事件B");
                emitter.onNext("B");
                Thread.sleep(1000);

                Log.d(TAG, "被觀察者2傳送了事件C");
                emitter.onNext("C");
                Thread.sleep(1000);

                Log.d(TAG, "被觀察者2傳送了事件D");
                emitter.onNext("D");
                Thread.sleep(1000);

                emitter.onComplete();
            }
        }).subscribeOn(Schedulers.newThread());// 設定被觀察者2在工作執行緒2中工作
        // 假設不作執行緒控制,則該兩個被觀察者會在同一個執行緒中工作,即傳送事件存在先後順序,而不是同時傳送

<-- 使用zip變換操作符進行事件合併 -->
// 注:建立BiFunction物件傳入的第3個引數 = 合併後資料的資料型別
        Observable.zip(observable1, observable2, new BiFunction<Integer, String, String>() {
            @Override
            public String apply(Integer integer, String string) throws Exception {
                return  integer + string;
            }
        }).subscribe(new Observer<String>() {
            @Override
            public void onSubscribe(Disposable d) {
                Log.d(TAG, "onSubscribe");
            }

            @Override
            public void onNext(String value) {
                Log.d(TAG, "最終接收到的事件 =  " + value);
            }

            @Override
            public void onError(Throwable e) {
                Log.d(TAG, "onError");
            }

            @Override
            public void onComplete() {
                Log.d(TAG, "onComplete");
            }
        });

特別注意:

  1. 儘管被觀察者2的事件D沒有事件與其合併,但還是會繼續傳送
  2. 若在被觀察者1 & 被觀察者2的事件序列最後傳送onComplete()事件,則被觀察者2的事件D也不會發送,測試結果如下

 

combineLatest()

當兩個Observables中的任何一個傳送了資料後,將先發送了資料的Observables 的最新(最後)一個數據 與 另外一個Observable傳送的每個資料結合,最終基於該函式的結果傳送資料

Zip()的區別:Zip() = 按個數合併,即1對1合併;

           CombineLatest() = 按時間合併,即在同一個時間點上合併

Observable.combineLatest(
                    Observable.just(1L, 2L, 3L), // 第1個傳送資料事件的Observable
                    Observable.intervalRange(0, 3, 1, 1, TimeUnit.SECONDS), // 第2個傳送資料事件的Observable:從0開始傳送、共傳送3個數據、第1次事件延遲傳送時間 = 1s、間隔時間 = 1s
                    new BiFunction<Long, Long, Long>() {
                @Override
                public Long apply(Long o1, Long o2) throws Exception {
                    // o1 = 第1個Observable傳送的最新(最後)1個數據
                    // o2 = 第2個Observable傳送的每1個數據
                    Log.e(TAG, "資料是: "+ o1 + " "+ o2);
                    return o1 + o2;
                    // 合併的邏輯 = 相加
                    // 即第1個Observable傳送的最後1個數據 與 第2個Observable傳送的每1個數據進行相加
                }
            }).subscribe(new Consumer<Long>() {
                @Override
                public void accept(Long s) throws Exception {
                    Log.e(TAG, "結果是: "+s);
                }
            });

combineLatestDelayError() 

作用類似於concatDelayError() / mergeDelayError() ,即錯誤處理

reduce()

把被觀察者需要傳送的事件聚合成1個事件 & 傳送

本質都是前2個數據聚合,然後與後1個數據繼續進行聚合,依次類推

Observable.just(1,2,3,4)
                .reduce(new BiFunction<Integer, Integer, Integer>() {
                    // 在該複寫方法中複寫聚合的邏輯
                    @Override
                    public Integer apply(@NonNull Integer s1, @NonNull Integer s2) throws Exception {
                        Log.e(TAG, "本次計算的資料是: "+s1 +" 乘 "+ s2);
                        return s1 * s2;
                        // 原理:第1次取前2個數據相乘,之後每次獲取到的資料 = 返回的資料x原始下1個數據每
                    }
                }).subscribe(new Consumer<Integer>() {
            @Override
            public void accept(@NonNull Integer s) throws Exception {
                Log.e(TAG, "最終計算的結果是: "+s);

            }
        });

collect()

將被觀察者Observable傳送的資料事件收集到一個數據結構裡

3 傳送事件前追加發送事件

startWith() / startWithArray()

在一個被觀察者傳送事件前,追加發送一個數據 / 一些新的被觀察者

4 統計傳送事件數量

count()

統計被觀察者傳送事件的數量

Observable.just(1, 2, 3, 4)
                  .count()
                  .subscribe(new Consumer<Long>() {
                    @Override
                    public void accept(Long aLong) throws Exception {
                        Log.e(TAG, "傳送的事件數量 =  "+aLong);

                    }
                });

傳送的事件數量 = 4