1. 程式人生 > >RxJava 2.x 之聚合操作符

RxJava 2.x 之聚合操作符

聚合操作符

startWith

startWith操作符主要用來在傳送元素之前追加資料

Observable.just(1,
2, 3, 4, 5) .startWith(Observable.just(7, 8, 9)) .subscribeWith(new CommonObserver<>());

輸出:

onNext 7
onNext 8
onNext 9
onNext 1
onNext 2
onNext 3
onNext 4
onNext 5
onComplete

Process finished with exit code 0

startWithArray

startWithArray操作符主要用來發送元素之前追加多個元素

Observable.
just(1, 2, 3, 4, 5) .startWithArray(7, 8, 9) .subscribeWith(new CommonObserver<>());

輸出:

onNext 7
onNext 8
onNext 9
onNext 1
onNext 2
onNext 3
onNext 4
onNext 5
onComplete

Process finished with exit code 0

concat/concatArray

concat和concatArray操作符序列執行

Observable.concat
( Observable.just(1, 2), Observable.just(3, 4, 5), Observable.just(7, 8, 9)) .subscribeWith(new CommonObserver<>());

輸出:

onNext 1
onNext 2
onNext 3
onNext 4
onNext 5
onNext 7
onNext 8
onNext 9
onComplete

Process finished with exit code 0

注意: concat最多連線4個,而concatArray可以連線多個

merge/mergeArray

merge和mergeArray操作符主要用於合併多個被觀察者並行執行

Observable.merge(
          Observable.intervalRange(0, 3, 1, 1, TimeUnit.SECONDS),
          Observable.intervalRange(3, 3, 1, 1, TimeUnit.SECONDS)
        ).subscribeWith(new CommonObserver<>());

輸出:

onNext 0
onNext 3
onNext 1
onNext 4
onNext 2
onNext 5
onComplete

Process finished with exit code 0

注意: merge最多連線4個,而mergeArray可以連線多個

concatDelayError/mergeDelayError

使用concat和merge操作符時,如果遇到其中一個被觀察者發出onError事件則會馬上終止其他被觀察者的事件,如果希望onError事件推遲到其他被觀察者都結束後才觸發,可以使用對應的concatDelayError或者mergeDelayError操作符

zip

zip操作符主要用來將多個觀察者壓縮成一個單獨的操作

Observable.zip(
          Observable.just(1, 2),
          Observable.just(7, 9),
          (int1, int2) -> int1 + int2)
          .subscribeWith(new CommonObserver<>());

輸出:

onNext 8
onNext 11
onComplete

Process finished with exit code 0

combineLatest

combineLatest操作符主要用於對同一個時間線上合併最後的元素

Observable.combineLatest(
          Observable.just(1, 2, 3),
          Observable.intervalRange(0, 5, 1, 1, TimeUnit.SECONDS),
          (int1, int2) -> int1 + int2)
          .subscribeWith(new CommonObserver<>());

輸出:

onNext 3
onNext 4
onNext 5
onNext 6
onNext 7
onComplete

Process finished with exit code 0

combineLatestDelayError

combineLatestDelayError操作符主要處理delayError的問題

reduce

reduce操作符主要用來將所有元素聚合到一起

Observable.just(1, 2, 3)
          .reduce((last, item) -> {
          //先執行1+2,然後用1+2的結果和3相加,最後輸出6,相當於把三個元素聚合在一起
          return last + item;
          })
          .subscribe(new CommonComsumer<>());

輸出:

accept 6

Process finished with exit code 0

count

count操作符主要用來統計被觀察者傳送了多少個元素

Observable.just(1, 2, 3)
          .count()
          .subscribe(new CommonComsumer<>());

輸出:

accept 3

Process finished with exit code 0

collect

collect和reduce操作符類似,不過它是需要自己定義收集的容器和收集邏輯

Observable.just(1, 2, 3)
          .collect(new Callable<ArrayList<Integer>>() {
                @Override
                public ArrayList<Integer> call() throws Exception {
                    return new ArrayList<>();
                }
          }, new BiConsumer<ArrayList<Integer>, Integer>() {
                @Override
                public void accept(ArrayList<Integer> list, Integer value) throws Exception {
                    System.out.println("add element " + value);
                    list.add(value);
                }
          })
          .subscribe(new CommonComsumer<>());

輸出:

add element 1
add element 2
add element 3
accept [1, 2, 3]

Process finished with exit code 0

複習文件
https://github.com/byhook/rxjava2-study

參考:
https://maxwell-nc.github.io/android/rxjava2-4.html