Rxjava2 組合 / 合併操作符
- 組合 多個被觀察者(
Observable
) & 合併需要傳送的事件
1 組合多個被觀察者
concat() / concatArray()
- 組合多個被觀察者一起傳送資料,合併後 按傳送順序序列執行
二者區別:組合被觀察者的數量,即concat()
組合被觀察者數量≤4個,而concatArray()
則>0個
merge() / mergeArray()
- 組合多個被觀察者一起傳送資料,合併後 按時間線並行執行
concatDelayError() / mergeDelayError()
concatArrayDelayError()/ mergeArrayDelayError()
2 合併多個事件
Zip()
- 合併 多個被觀察者(
Observable
)傳送的事件,生成一個新的事件序列(即組合過後的事件序列),並最終傳送
- 特別注意:
- 事件組合方式 = 嚴格按照原先事件序列 進行對位合併
- 最終合併的事件數量 = 多個被觀察者(
Observable
)中數量最少的數量
<-- 建立第1個被觀察者 -->
Observable<Integer> observable1 = Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
Log.d(TAG, "被觀察者1傳送了事件1");
emitter.onNext(1);
// 為了方便展示效果,所以在傳送事件後加入2s的延遲
Thread.sleep(1000);
Log.d(TAG, "被觀察者1傳送了事件2");
emitter.onNext(2);
Thread.sleep(1000);
Log.d(TAG, "被觀察者1傳送了事件3");
emitter.onNext(3);
Thread.sleep(1000);
emitter.onComplete();
}
}).subscribeOn(Schedulers.io()); // 設定被觀察者1在工作執行緒1中工作
<-- 建立第2個被觀察者 -->
Observable<String> observable2 = Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
Log.d(TAG, "被觀察者2傳送了事件A");
emitter.onNext("A");
Thread.sleep(1000);
Log.d(TAG, "被觀察者2傳送了事件B");
emitter.onNext("B");
Thread.sleep(1000);
Log.d(TAG, "被觀察者2傳送了事件C");
emitter.onNext("C");
Thread.sleep(1000);
Log.d(TAG, "被觀察者2傳送了事件D");
emitter.onNext("D");
Thread.sleep(1000);
emitter.onComplete();
}
}).subscribeOn(Schedulers.newThread());// 設定被觀察者2在工作執行緒2中工作
// 假設不作執行緒控制,則該兩個被觀察者會在同一個執行緒中工作,即傳送事件存在先後順序,而不是同時傳送
<-- 使用zip變換操作符進行事件合併 -->
// 注:建立BiFunction物件傳入的第3個引數 = 合併後資料的資料型別
Observable.zip(observable1, observable2, new BiFunction<Integer, String, String>() {
@Override
public String apply(Integer integer, String string) throws Exception {
return integer + string;
}
}).subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "onSubscribe");
}
@Override
public void onNext(String value) {
Log.d(TAG, "最終接收到的事件 = " + value);
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "onError");
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete");
}
});
特別注意:
- 儘管被觀察者2的事件
D
沒有事件與其合併,但還是會繼續傳送 - 若在被觀察者1 & 被觀察者2的事件序列最後傳送
onComplete()
事件,則被觀察者2的事件D也不會發送,測試結果如下
combineLatest()
當兩個Observables
中的任何一個傳送了資料後,將先發送了資料的Observables
的最新(最後)一個數據 與 另外一個Observable
傳送的每個資料結合,最終基於該函式的結果傳送資料
與Zip()
的區別:Zip()
= 按個數合併,即1對1合併;
CombineLatest()
= 按時間合併,即在同一個時間點上合併
Observable.combineLatest(
Observable.just(1L, 2L, 3L), // 第1個傳送資料事件的Observable
Observable.intervalRange(0, 3, 1, 1, TimeUnit.SECONDS), // 第2個傳送資料事件的Observable:從0開始傳送、共傳送3個數據、第1次事件延遲傳送時間 = 1s、間隔時間 = 1s
new BiFunction<Long, Long, Long>() {
@Override
public Long apply(Long o1, Long o2) throws Exception {
// o1 = 第1個Observable傳送的最新(最後)1個數據
// o2 = 第2個Observable傳送的每1個數據
Log.e(TAG, "資料是: "+ o1 + " "+ o2);
return o1 + o2;
// 合併的邏輯 = 相加
// 即第1個Observable傳送的最後1個數據 與 第2個Observable傳送的每1個數據進行相加
}
}).subscribe(new Consumer<Long>() {
@Override
public void accept(Long s) throws Exception {
Log.e(TAG, "結果是: "+s);
}
});
combineLatestDelayError()
作用類似於concatDelayError()
/ mergeDelayError()
,即錯誤處理
reduce()
把被觀察者需要傳送的事件聚合成1個事件 & 傳送
本質都是前2個數據聚合,然後與後1個數據繼續進行聚合,依次類推
Observable.just(1,2,3,4)
.reduce(new BiFunction<Integer, Integer, Integer>() {
// 在該複寫方法中複寫聚合的邏輯
@Override
public Integer apply(@NonNull Integer s1, @NonNull Integer s2) throws Exception {
Log.e(TAG, "本次計算的資料是: "+s1 +" 乘 "+ s2);
return s1 * s2;
// 原理:第1次取前2個數據相乘,之後每次獲取到的資料 = 返回的資料x原始下1個數據每
}
}).subscribe(new Consumer<Integer>() {
@Override
public void accept(@NonNull Integer s) throws Exception {
Log.e(TAG, "最終計算的結果是: "+s);
}
});
collect()
將被觀察者Observable
傳送的資料事件收集到一個數據結構裡
3 傳送事件前追加發送事件
startWith() / startWithArray()
在一個被觀察者傳送事件前,追加發送一個數據 / 一些新的被觀察者
4 統計傳送事件數量
count()
統計被觀察者傳送事件的數量
Observable.just(1, 2, 3, 4)
.count()
.subscribe(new Consumer<Long>() {
@Override
public void accept(Long aLong) throws Exception {
Log.e(TAG, "傳送的事件數量 = "+aLong);
}
});
傳送的事件數量 = 4