Rxjava2(三)合併操作符
1.concat
private void concat() { final Integer[] items={1,2,3,4}; Observable.concat(Observable.just(1,2,3),Observable.just(4,5)) .subscribe(new Consumer<Integer>() { @Override public void accept(Integer integer) throws Exception { LogUtils.syso("======accept==="+integer); } }); }
作用:組合多個被觀察者一起傳送資料,合併後 按傳送順序序列執行 注意:concat()
組合被觀察者數量≤4個
2.concatArray
private void concatArray() { final Integer[] items={1,2,3,4}; Observable.concatArray(Observable.just(1,2,3), Observable.just(4,5), Observable.just(6,7), Observable.just(8,9), Observable.just(10,11), Observable.just(12,13)) .subscribe(new Consumer<Integer>() { @Override public void accept(Integer integer) throws Exception { LogUtils.syso("======accept==="+integer); } }); }
- 作用
組合多個被觀察者一起傳送資料,合併後 按傳送順序序列執行 注意:concatArray()
則可>4個
3. merge
private void merge() { Observable.merge(Observable.intervalRange(1,3,2,1, TimeUnit.SECONDS), Observable.intervalRange(4,3,2,1, TimeUnit.SECONDS)) .subscribe(new Consumer<Long>() { @Override public void accept(Long aLong) throws Exception { LogUtils.syso("====aLong===="+aLong); } }); }
- 作用
組合多個被觀察者一起傳送資料,合併後 按時間線並行執行
組合被觀察者的數量,即merge()
組合被觀察者數量≤4個
4.mergeArray
private void mergeArray() { Observable.mergeArray(Observable.intervalRange(1,3,2,1, TimeUnit.SECONDS), Observable.intervalRange(4,3,2,1, TimeUnit.SECONDS), Observable.intervalRange(7,3,2,1, TimeUnit.SECONDS), Observable.intervalRange(10,3,2,1, TimeUnit.SECONDS), Observable.intervalRange(13,3,2,1, TimeUnit.SECONDS)) .subscribe(new Consumer<Long>() { @Override public void accept(Long aLong) throws Exception { LogUtils.syso("====aLong===="+aLong); } }); }
- 二者區別:組合被觀察者的數量,即
merge()
組合被觀察者數量≤4個,而mergeArray()
則可>4個 - 區別上述
concat()
操作符:同樣是組合多個被觀察者一起傳送資料,但concat()
操作符合並後是按傳送順序序列執行
5.concatDelayError
測試結果:第1個被觀察者傳送Error事件後,第2個被觀察者則不會繼續傳送事件
那麼如果希望onError事件推遲到其他觀察者傳送事件結束
private void concatArrayDelayError() { Observable.concatArrayDelayError( Observable.create(new ObservableOnSubscribe<Integer>() { @Override public void subscribe(ObservableEmitter<Integer> e) throws Exception { e.onNext(1); e.onNext(2); e.onNext(3); e.onError(new NullPointerException()); e.onComplete(); } }),Observable.just(4,5,6)) .subscribe(new Consumer<Integer>() { @Override public void accept(Integer integer) throws Exception { LogUtils.syso("=====accept=====" + integer); } }, new Consumer<Throwable>() { @Override public void accept(Throwable throwable) throws Exception { LogUtils.syso("=====accept=throwable====" + throwable.getMessage()); } }); }
達到預期的效果
小結:
6.zip (合併多個事件)
該型別的操作符主要是對多個被觀察者中的事件進行合併處理
作用:
合併 多個被觀察者(Observable
)傳送的事件,生成一個新的事件序列(即組合過後的事件序列),並最終傳送
假設這樣一種場景,我們利用github api開發一個app,在user介面,我既要請求user基本資訊,又要列舉user下的event資料,為此,我準備使用Retrofit來做網路請求。
雖然在後臺有兩次請求,但是在前臺,我們希望使用者開啟這個頁面,然後等待載入,然後顯示。使用者只有一次等待載入的過程。所以說,我們需要等待這兩個請求都返回結果了,再開始顯示資料。
怎麼辦?自己寫判斷兩個都載入已完成的程式碼嗎?邏輯好像也不是很複雜,但是程式碼看起來就沒有那麼高大上了啊。
其實既然你都用過了還有,那麼直覺上你應該意識到也許RxJava可以解決這個問題。沒錯,就是RxJava,使用zip操作符。
private void zip() { Observable.zip(Observable.just(1, 2, 3), Observable.just("one", "two", "three"), new BiFunction<Integer, String, String>() { @Override public String apply(Integer integer, String s) throws Exception { return integer+s; } }).subscribe(new Consumer<String>() { @Override public void accept(String s) throws Exception { LogUtils.syso("======返回的結果======="+s); } }); }
7.conbineLatest
- 作用
當兩個Observables
中的任何一個傳送了資料後,將先發送了資料的Observables
的最新(最後)一個數據 與 另外一個Observable
傳送的每個資料結合,最終基於該函式的結果傳送資料 - 與
Zip()
的區別:Zip()
= 按個數合併,即1對1合併;CombineLatest()
= 按時間合併,即在同一個時間點上合併
private void combineLatest() { Observable.combineLatest(Observable.just(1, 2, 3), Observable.just("one", "two", "three"), new BiFunction<Integer, String, String>() { @Override public String apply(Integer integer, String s) throws Exception { return integer+s; } }).subscribe(new Consumer<String>() { @Override public void accept(String s) throws Exception { LogUtils.syso("======返回的結果======="+s); } }); } }
8.reduce
- 作用
把被觀察者需要傳送的事件聚合成1個事件 & 傳送
private void reduce() { Observable.just(1, 2, 3) .reduce(new BiFunction<Integer, Integer, Integer>() { @Override public Integer apply(Integer integer, Integer integer2) throws Exception { Log.e("REDUCE", "本次計算的資料是: "+integer +" 加"+ integer2); return integer+integer2; } }).subscribe(new Consumer<Integer>() { @Override public void accept(Integer integer) throws Exception { LogUtils.syso("=====result===="+integer); } }); }
9.collect
作用
將被觀察者Observable
傳送的資料事件收集到一個數據結構裡
private void collect() { Observable.just(1,2,3,4) .collect( // 1. 建立資料結構(容器),用於收集被觀察者傳送的資料 new Callable<List<Integer>>() { @Override public List<Integer> call() throws Exception { return new ArrayList<>(); } },// 2. 對傳送的資料進行收集 new BiConsumer<List<Integer>, Integer>() { /** * mList 容器, * integer 後者資料 * * */ @Override public void accept(List<Integer> mList, Integer integer) throws Exception { mList.add(integer); } }).subscribe(new Consumer<List<Integer>>() { @Override public void accept(List<Integer> integers) throws Exception { LogUtils.syso("====result======"+integers); } }); }
10.startWith/startWithArray
作用
在一個被觀察者傳送事件前,追加發送一些資料 / 一個新的被觀察者
Observable.just(1,2,3,4) .startWith(5) .subscribe(new Consumer<Integer>() { @Override public void accept(Integer integer) throws Exception { LogUtils.syso("=======result======="+integer); } });
Observable.just(1,2,3,4) .startWithArray(7,8,9) .subscribe(new Consumer<Integer>() { @Override public void accept(Integer integer) throws Exception { LogUtils.syso("======resutl======="+integer); } });
11.count
作用
統計被觀察者傳送事件的數量
private void count() { Observable.just(1,2,3,4) .count() .subscribe(new Consumer<Long>() { @Override public void accept(Long aLong) throws Exception { LogUtils.syso("========傳送事件的個數======"+aLong); } }); }