1. 程式人生 > >Rxjava2(三)合併操作符

Rxjava2(三)合併操作符

1.concat

private void concat() {
    final Integer[] items={1,2,3,4};
Observable.concat(Observable.just(1,2,3),Observable.just(4,5))
          .subscribe(new Consumer<Integer>() {
              @Override
public void accept(Integer integer) throws Exception {
                  LogUtils.syso("======accept==="
+integer); } }); }

作用:組合多個被觀察者一起傳送資料,合併後 按傳送順序序列執行  注意:concat()組合被觀察者數量≤4個

2.concatArray

private void concatArray() {
    final Integer[] items={1,2,3,4};
Observable.concatArray(Observable.just(1,2,3),
Observable.just(4,5),
Observable.just(6,7),
Observable.just(8,9),
Observable.just(10,11),
Observable.just
(12,13)) .subscribe(new Consumer<Integer>() { @Override public void accept(Integer integer) throws Exception { LogUtils.syso("======accept==="+integer); } }); }

  • 作用
    組合多個被觀察者一起傳送資料,合併後 按傳送順序序列執行  注意:concatArray()則可>4個

3. merge

private void merge() {
    Observable.merge
(Observable.intervalRange(1,3,2,1, TimeUnit.SECONDS), Observable.intervalRange(4,3,2,1, TimeUnit.SECONDS)) .subscribe(new Consumer<Long>() { @Override public void accept(Long aLong) throws Exception { LogUtils.syso("====aLong===="+aLong); } }); }

  • 作用
    組合多個被觀察者一起傳送資料,合併後 按時間線並行執行

組合被觀察者的數量,即merge()組合被觀察者數量≤4個

4.mergeArray

private void mergeArray() {
    Observable.mergeArray(Observable.intervalRange(1,3,2,1, TimeUnit.SECONDS),
Observable.intervalRange(4,3,2,1, TimeUnit.SECONDS),
Observable.intervalRange(7,3,2,1, TimeUnit.SECONDS),
Observable.intervalRange(10,3,2,1, TimeUnit.SECONDS),
Observable.intervalRange(13,3,2,1, TimeUnit.SECONDS))
            .subscribe(new Consumer<Long>() {
                @Override
public void accept(Long aLong) throws Exception {
                    LogUtils.syso("====aLong===="+aLong);
}
            });
}

  1. 二者區別:組合被觀察者的數量,即merge()組合被觀察者數量≤4個,而mergeArray()則可>4個
  2. 區別上述concat()操作符:同樣是組合多個被觀察者一起傳送資料,但concat()操作符合並後是按傳送順序序列執行

5.concatDelayError



測試結果:第1個被觀察者傳送Error事件後,第2個被觀察者則不會繼續傳送事件

那麼如果希望onError事件推遲到其他觀察者傳送事件結束

private void concatArrayDelayError() {

  Observable.concatArrayDelayError( Observable.create(new ObservableOnSubscribe<Integer>() {
      @Override
public void subscribe(ObservableEmitter<Integer> e) throws Exception {
            e.onNext(1);
e.onNext(2);
e.onNext(3);
e.onError(new NullPointerException());
e.onComplete();
}
  }),Observable.just(4,5,6))
          .subscribe(new Consumer<Integer>() {
              @Override
public void accept(Integer integer) throws Exception {
                  LogUtils.syso("=====accept=====" + integer);
}
          }, new Consumer<Throwable>() {
              @Override
public void accept(Throwable throwable) throws Exception {
                  LogUtils.syso("=====accept=throwable====" + throwable.getMessage());
}
          });
}

達到預期的效果

小結:

6.zip (合併多個事件)

該型別的操作符主要是對多個被觀察者中的事件進行合併處理

作用:
合併 多個被觀察者(Observable)傳送的事件,生成一個新的事件序列(即組合過後的事件序列),並最終傳送


假設這樣一種場景,我們利用github api開發一個app,在user介面,我既要請求user基本資訊,又要列舉user下的event資料,為此,我準備使用Retrofit來做網路請求。

雖然在後臺有兩次請求,但是在前臺,我們希望使用者開啟這個頁面,然後等待載入,然後顯示。使用者只有一次等待載入的過程。所以說,我們需要等待這兩個請求都返回結果了,再開始顯示資料。

怎麼辦?自己寫判斷兩個都載入已完成的程式碼嗎?邏輯好像也不是很複雜,但是程式碼看起來就沒有那麼高大上了啊。

其實既然你都用過了還有,那麼直覺上你應該意識到也許RxJava可以解決這個問題。沒錯,就是RxJava,使用zip操作符。

private void zip() {
    Observable.zip(Observable.just(1, 2, 3),
Observable.just("one", "two", "three"),
            new BiFunction<Integer, String, String>() {
                @Override
public String apply(Integer integer, String s) throws Exception {
                    return integer+s;
}
            }).subscribe(new Consumer<String>() {
        @Override
public void accept(String s) throws Exception {
            LogUtils.syso("======返回的結果======="+s);
}
    });
}

7.conbineLatest

  • 作用
    當兩個Observables中的任何一個傳送了資料後,將先發送了資料的Observables 的最新(最後)一個數據 與 另外一個Observable傳送的每個資料結合,最終基於該函式的結果傳送資料
  • Zip()的區別:Zip() = 按個數合併,即1對1合併;CombineLatest() = 按時間合併,即在同一個時間點上合併
   private void combineLatest() {
        Observable.combineLatest(Observable.just(1, 2, 3),
Observable.just("one", "two", "three"),
                new BiFunction<Integer, String, String>() {
                    @Override
public String apply(Integer integer, String s) throws Exception {
                        return integer+s;
}
                }).subscribe(new Consumer<String>() {
            @Override
public void accept(String s) throws Exception {
                LogUtils.syso("======返回的結果======="+s);
}
        });
}
}

8.reduce

  • 作用
    把被觀察者需要傳送的事件聚合成1個事件 & 傳送
聚合的邏輯根據需求撰寫,但本質都是前2個數據聚合,然後與後1個數據繼續進行聚合,依次類推
private void reduce() {
    Observable.just(1, 2, 3)
            .reduce(new BiFunction<Integer, Integer, Integer>() {
                @Override
public Integer apply(Integer integer, Integer integer2) throws Exception {
                    Log.e("REDUCE", "本次計算的資料是: "+integer +" "+ integer2);
                    return integer+integer2;
}
            }).subscribe(new Consumer<Integer>() {
        @Override
public void accept(Integer integer) throws Exception {
            LogUtils.syso("=====result===="+integer);
}
    });
}

9.collect

作用
將被觀察者Observable傳送的資料事件收集到一個數據結構裡

private void collect() {
   Observable.just(1,2,3,4)

           .collect(
                   // 1. 建立資料結構(容器),用於收集被觀察者傳送的資料
new Callable<List<Integer>>() {
               @Override
public List<Integer> call() throws Exception {
                   return new ArrayList<>();
}
           },// 2. 對傳送的資料進行收集
new BiConsumer<List<Integer>, Integer>() {
                   /**
                    *  mList 容器,
*  integer 後者資料
*
                    * */
@Override
public void accept(List<Integer> mList, Integer integer) throws Exception {
                   mList.add(integer);
}
           }).subscribe(new Consumer<List<Integer>>() {
       @Override
public void accept(List<Integer> integers) throws Exception {
            LogUtils.syso("====result======"+integers);
}
   });
}

10.startWith/startWithArray

  • 作用
    在一個被觀察者傳送事件前,追加發送一些資料 / 一個新的被觀察者


Observable.just(1,2,3,4)
        .startWith(5)
        .subscribe(new Consumer<Integer>() {
            @Override
public void accept(Integer integer) throws Exception {
                 LogUtils.syso("=======result======="+integer);
}
        });

Observable.just(1,2,3,4)
        .startWithArray(7,8,9)
        .subscribe(new Consumer<Integer>() {
            @Override
public void accept(Integer integer) throws Exception {
                 LogUtils.syso("======resutl======="+integer);
}
        });

11.count

  • 作用
    統計被觀察者傳送事件的數量

private void count() {
    Observable.just(1,2,3,4)
            .count()
            .subscribe(new Consumer<Long>() {
                @Override
public void accept(Long aLong) throws Exception {
                    LogUtils.syso("========傳送事件的個數======"+aLong);
}
            });
}