RxJava 2.x 之聚合操作符
阿新 • • 發佈:2018-11-01
聚合操作符
- startWith操作符
- startWithArray操作符
- concat/concatArray操作符
- merge/mergeArray操作符
- concatDelayError/mergeDelayError操作符
- zip操作符
- combineLatest操作符
- combineLatestDelayError操作符
- reduce操作符
- count操作符
- collect操作符
startWith
startWith操作符主要用來在傳送元素之前追加資料
Observable.just(1, 2, 3, 4, 5)
.startWith(Observable.just(7, 8, 9))
.subscribeWith(new CommonObserver<>());
輸出:
onNext 7
onNext 8
onNext 9
onNext 1
onNext 2
onNext 3
onNext 4
onNext 5
onComplete
Process finished with exit code 0
startWithArray
startWithArray操作符主要用來發送元素之前追加多個元素
Observable. just(1, 2, 3, 4, 5)
.startWithArray(7, 8, 9)
.subscribeWith(new CommonObserver<>());
輸出:
onNext 7
onNext 8
onNext 9
onNext 1
onNext 2
onNext 3
onNext 4
onNext 5
onComplete
Process finished with exit code 0
concat/concatArray
concat和concatArray操作符序列執行
Observable.concat (
Observable.just(1, 2),
Observable.just(3, 4, 5),
Observable.just(7, 8, 9))
.subscribeWith(new CommonObserver<>());
輸出:
onNext 1
onNext 2
onNext 3
onNext 4
onNext 5
onNext 7
onNext 8
onNext 9
onComplete
Process finished with exit code 0
注意: concat最多連線4個,而concatArray可以連線多個
merge/mergeArray
merge和mergeArray操作符主要用於合併多個被觀察者並行執行
Observable.merge(
Observable.intervalRange(0, 3, 1, 1, TimeUnit.SECONDS),
Observable.intervalRange(3, 3, 1, 1, TimeUnit.SECONDS)
).subscribeWith(new CommonObserver<>());
輸出:
onNext 0
onNext 3
onNext 1
onNext 4
onNext 2
onNext 5
onComplete
Process finished with exit code 0
注意: merge最多連線4個,而mergeArray可以連線多個
concatDelayError/mergeDelayError
使用concat和merge操作符時,如果遇到其中一個被觀察者發出onError事件則會馬上終止其他被觀察者的事件,如果希望onError事件推遲到其他被觀察者都結束後才觸發,可以使用對應的concatDelayError或者mergeDelayError操作符
zip
zip操作符主要用來將多個觀察者壓縮成一個單獨的操作
Observable.zip(
Observable.just(1, 2),
Observable.just(7, 9),
(int1, int2) -> int1 + int2)
.subscribeWith(new CommonObserver<>());
輸出:
onNext 8
onNext 11
onComplete
Process finished with exit code 0
combineLatest
combineLatest操作符主要用於對同一個時間線上合併最後的元素
Observable.combineLatest(
Observable.just(1, 2, 3),
Observable.intervalRange(0, 5, 1, 1, TimeUnit.SECONDS),
(int1, int2) -> int1 + int2)
.subscribeWith(new CommonObserver<>());
輸出:
onNext 3
onNext 4
onNext 5
onNext 6
onNext 7
onComplete
Process finished with exit code 0
combineLatestDelayError
combineLatestDelayError操作符主要處理delayError的問題
reduce
reduce操作符主要用來將所有元素聚合到一起
Observable.just(1, 2, 3)
.reduce((last, item) -> {
//先執行1+2,然後用1+2的結果和3相加,最後輸出6,相當於把三個元素聚合在一起
return last + item;
})
.subscribe(new CommonComsumer<>());
輸出:
accept 6
Process finished with exit code 0
count
count操作符主要用來統計被觀察者傳送了多少個元素
Observable.just(1, 2, 3)
.count()
.subscribe(new CommonComsumer<>());
輸出:
accept 3
Process finished with exit code 0
collect
collect和reduce操作符類似,不過它是需要自己定義收集的容器和收集邏輯
Observable.just(1, 2, 3)
.collect(new Callable<ArrayList<Integer>>() {
@Override
public ArrayList<Integer> call() throws Exception {
return new ArrayList<>();
}
}, new BiConsumer<ArrayList<Integer>, Integer>() {
@Override
public void accept(ArrayList<Integer> list, Integer value) throws Exception {
System.out.println("add element " + value);
list.add(value);
}
})
.subscribe(new CommonComsumer<>());
輸出:
add element 1
add element 2
add element 3
accept [1, 2, 3]
Process finished with exit code 0