Android RxJava操作符的學習---組合 / 合併操作符
3.3 組合 / 合併操作符
3.3.1. 作用
組合 多個被觀察者(Observable
) & 合併需要傳送的事件
-
應用場景
- 組合多個被觀察者
- 合併多個事件
- 傳送事件前追加發送事件
- 統計傳送事件數量
3.3.2. 型別
根據上述應用場景,常見的組合 / 合併操作符 主要有:
3.3.3. 應用場景 & 對應操作符 介紹
注:在使用RxJava 2
操作符前,記得在專案的Gradle
中新增依賴:
dependencies { compile 'io.reactivex.rxjava2:rxandroid:2.0.1' compile 'io.reactivex.rxjava2:rxjava:2.0.7' // 注:RxJava2 與 RxJava1 不能共存,即依賴不能同時存在 }
3.3.3.1 組合多個被觀察者
該型別的操作符的作用 = 組合多個被觀察者
concat() / concatArray()
- 作用
組合多個被觀察者一起傳送資料,合併後 按傳送順序序列執行
二者區別:組合被觀察者的數量,即
concat()
組合被觀察者數量≤4個,而concatArray()
則可>4個
- 具體使用
// concat():組合多個被觀察者(≤4個)一起傳送資料 // 注:序列執行 Observable.concat(Observable.just(1, 2, 3), Observable.just(4, 5, 6), Observable.just(7, 8, 9), Observable.just(10, 11, 12)) .subscribe(new Observer<Integer>() { @Override public void onSubscribe(Disposable d) { } @Override public void onNext(Integer value) { Log.d(TAG, "接收到了事件"+ value ); } @Override public void onError(Throwable e) { Log.d(TAG, "對Error事件作出響應"); } @Override public void onComplete() { Log.d(TAG, "對Complete事件作出響應"); } }); // concatArray():組合多個被觀察者一起傳送資料(可>4個) // 注:序列執行 Observable.concatArray(Observable.just(1, 2, 3), Observable.just(4, 5, 6), Observable.just(7, 8, 9), Observable.just(10, 11, 12), Observable.just(13, 14, 15)) .subscribe(new Observer<Integer>() { @Override public void onSubscribe(Disposable d) { } @Override public void onNext(Integer value) { Log.d(TAG, "接收到了事件"+ value ); } @Override public void onError(Throwable e) { Log.d(TAG, "對Error事件作出響應"); } @Override public void onComplete() { Log.d(TAG, "對Complete事件作出響應"); } });
- 測試結果
merge() / mergeArray()
- 作用
組合多個被觀察者一起傳送資料,合併後 按時間線並行執行
- 二者區別:組合被觀察者的數量,即
merge()
組合被觀察者數量≤4個,而mergeArray()
則可>4個- 區別上述
concat()
操作符:同樣是組合多個被觀察者一起傳送資料,但concat()
操作符合並後是按傳送順序序列執行
- 具體使用
// merge():組合多個被觀察者(<4個)一起傳送資料 // 注:合併後按照時間線並行執行 Observable.merge( Observable.intervalRange(0, 3, 1, 1, TimeUnit.SECONDS), // 從0開始傳送、共傳送3個數據、第1次事件延遲傳送時間 = 1s、間隔時間 = 1s Observable.intervalRange(2, 3, 1, 1, TimeUnit.SECONDS)) // 從2開始傳送、共傳送3個數據、第1次事件延遲傳送時間 = 1s、間隔時間 = 1s .subscribe(new Observer<Long>() { @Override public void onSubscribe(Disposable d) { } @Override public void onNext(Long value) { Log.d(TAG, "接收到了事件"+ value ); } @Override public void onError(Throwable e) { Log.d(TAG, "對Error事件作出響應"); } @Override public void onComplete() { Log.d(TAG, "對Complete事件作出響應"); } }); // mergeArray() = 組合4個以上的被觀察者一起傳送資料,此處不作過多演示,類似concatArray()
- 測試結果
兩個被觀察者傳送事件並行執行,輸出結果 = 0,2 -> 1,3 -> 2,4
concatDelayError() / mergeDelayError()
- 作用
- 具體使用
a. 無使用concatDelayError()的情況
Observable.concat(
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
emitter.onNext(1);
emitter.onNext(2);
emitter.onNext(3);
emitter.onError(new NullPointerException()); // 傳送Error事件,因為無使用concatDelayError,所以第2個Observable將不會發送事件
emitter.onComplete();
}
}),
Observable.just(4, 5, 6))
.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Integer value) {
Log.d(TAG, "接收到了事件"+ value );
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "對Error事件作出響應");
}
@Override
public void onComplete() {
Log.d(TAG, "對Complete事件作出響應");
}
});
測試結果:第1個被觀察者傳送Error事件後,第2個被觀察者則不會繼續傳送事件
<-- 使用了concatDelayError()的情況 -->
Observable.concatArrayDelayError(
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
emitter.onNext(1);
emitter.onNext(2);
emitter.onNext(3);
emitter.onError(new NullPointerException()); // 傳送Error事件,因為使用了concatDelayError,所以第2個Observable將會發送事件,等傳送完畢後,再發送錯誤事件
emitter.onComplete();
}
}),
Observable.just(4, 5, 6))
.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Integer value) {
Log.d(TAG, "接收到了事件"+ value );
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "對Error事件作出響應");
}
@Override
public void onComplete() {
Log.d(TAG, "對Complete事件作出響應");
}
});
測試結果:第1個被觀察者的Error事件將在第2個被觀察者傳送完事件後再繼續傳送
mergeDelayError()
操作符同理,此處不作過多演示
3.3.3.2 合併多個事件
該型別的操作符主要是對多個被觀察者中的事件進行合併處理。
Zip()
-
作用
合併 多個被觀察者(Observable
)傳送的事件,生成一個新的事件序列(即組合過後的事件序列),並最終傳送 -
原理
具體請看下圖
- 特別注意:
- 事件組合方式 = 嚴格按照原先事件序列 進行對位合併
- 最終合併的事件數量 = 多個被觀察者(
Observable
)中數量最少的數量
即如下圖
- 具體使用
<-- 建立第1個被觀察者 -->
Observable<Integer> observable1 = Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
Log.d(TAG, "被觀察者1傳送了事件1");
emitter.onNext(1);
// 為了方便展示效果,所以在傳送事件後加入2s的延遲
Thread.sleep(1000);
Log.d(TAG, "被觀察者1傳送了事件2");
emitter.onNext(2);
Thread.sleep(1000);
Log.d(TAG, "被觀察者1傳送了事件3");
emitter.onNext(3);
Thread.sleep(1000);
emitter.onComplete();
}
}).subscribeOn(Schedulers.io()); // 設定被觀察者1在工作執行緒1中工作
<-- 建立第2個被觀察者 -->
Observable<String> observable2 = Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
Log.d(TAG, "被觀察者2傳送了事件A");
emitter.onNext("A");
Thread.sleep(1000);
Log.d(TAG, "被觀察者2傳送了事件B");
emitter.onNext("B");
Thread.sleep(1000);
Log.d(TAG, "被觀察者2傳送了事件C");
emitter.onNext("C");
Thread.sleep(1000);
Log.d(TAG, "被觀察者2傳送了事件D");
emitter.onNext("D");
Thread.sleep(1000);
emitter.onComplete();
}
}).subscribeOn(Schedulers.newThread());// 設定被觀察者2在工作執行緒2中工作
// 假設不作執行緒控制,則該兩個被觀察者會在同一個執行緒中工作,即傳送事件存在先後順序,而不是同時傳送
<-- 使用zip變換操作符進行事件合併 -->
// 注:建立BiFunction物件傳入的第3個引數 = 合併後資料的資料型別
Observable.zip(observable1, observable2, new BiFunction<Integer, String, String>() {
@Override
public String apply(Integer integer, String string) throws Exception {
return integer + string;
}
}).subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "onSubscribe");
}
@Override
public void onNext(String value) {
Log.d(TAG, "最終接收到的事件 = " + value);
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "onError");
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete");
}
});
- 測試結果
D事件有時沒出現,都報錯
2018-11-03 16:29:47.988 3767-3786/com.example.administrator.hello E/AndroidRuntime: FATAL EXCEPTION: RxNewThreadScheduler-1
Process: com.example.administrator.hello, PID: 3767
java.lang.InterruptedException
at java.lang.Thread.sleep(Native Method)
at java.lang.Thread.sleep(Thread.java:371)
at java.lang.Thread.sleep(Thread.java:313)
at com.example.administrator.hello.MainActivity$2.subscribe(MainActivity.java:797)
at io.reactivex.internal.operators.observable.ObservableCreate.subscribeActual(ObservableCreate.java:40)
at io.reactivex.Observable.subscribe(Observable.java:10179)
at io.reactivex.internal.operators.observable.ObservableSubscribeOn$1.run(ObservableSubscribeOn.java:39)
at io.reactivex.Scheduler$1.run(Scheduler.java:134)
at io.reactivex.internal.schedulers.ScheduledRunnable.run(ScheduledRunnable.java:59)
at io.reactivex.internal.schedulers.ScheduledRunnable.call(ScheduledRunnable.java:51)
at java.util.concurrent.FutureTask.run(FutureTask.java:237)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:272)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1133)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:607)
at java.lang.Thread.run(Thread.java:761)
把休眠時間延長也不行,執行緒休眠的問題,好像導致堵塞,去掉這2個休眠就好了
Log.d(TAG, "被觀察者2傳送了事件C");
emitter.onNext("C");
// Thread.sleep(2000);
Log.d(TAG, "被觀察者2傳送了事件D");
emitter.onNext("D");
// Thread.sleep(2000);
特別注意:
- 儘管被觀察者2的事件
D
沒有事件與其合併,但還是會繼續傳送 - 若在被觀察者1 & 被觀察者2的事件序列最後傳送
onComplete()
事件,則被觀察者2的事件D也不會發送,測試結果如下
- 因為
Zip()
操作符較為複雜 & 難理解,此處將用1張圖總結
combineLatest()
- 作用
當兩個Observables
中的任何一個傳送了資料後,將先發送了資料的Observables
的最新(最後)一個數據 與 另外一個Observable
傳送的每個資料結合,最終基於該函式的結果傳送資料
與
Zip()
的區別:Zip()
= 按個數合併,即1對1合併;CombineLatest()
= 按時間合併,即在同一個時間點上合併
- 具體使用
Observable.combineLatest(
Observable.just(1L, 2L, 3L), // 第1個傳送資料事件的Observable
Observable.intervalRange(0, 3, 1, 1, TimeUnit.SECONDS), // 第2個傳送資料事件的Observable:從0開始傳送、共傳送3個數據、第1次事件延遲傳送時間 = 1s、間隔時間 = 1s
new BiFunction<Long, Long, Long>() {
@Override
public Long apply(Long o1, Long o2) throws Exception {
// o1 = 第1個Observable傳送的最新(最後)1個數據
// o2 = 第2個Observable傳送的每1個數據
Log.e(TAG, "合併的資料是: "+ o1 + " "+ o2);
return o1 + o2;
// 合併的邏輯 = 相加
// 即第1個Observable傳送的最後1個數據 與 第2個Observable傳送的每1個數據進行相加
}
}).subscribe(new Consumer<Long>() {
@Override
public void accept(Long s) throws Exception {
Log.e(TAG, "合併的結果是: "+s);
}
});
- 測試結果
combineLatestDelayError()
作用類似於concatDelayError()
/ mergeDelayError()
,即錯誤處理,此處不作過多描述
reduce()
- 作用
把被觀察者需要傳送的事件聚合成1個事件 & 傳送
聚合的邏輯根據需求撰寫,但本質都是前2個數據聚合,然後與後1個數據繼續進行聚合,依次類推
- 具體使用
Observable.just(1,2,3,4)
.reduce(new BiFunction<Integer, Integer, Integer>() {
// 在該複寫方法中複寫聚合的邏輯
@Override
public Integer apply(@NonNull Integer s1, @NonNull Integer s2) throws Exception {
Log.e(TAG, "本次計算的資料是: "+s1 +" 乘 "+ s2);
return s1 * s2;
// 本次聚合的邏輯是:全部資料相乘起來
// 原理:第1次取前2個數據相乘,之後每次獲取到的資料 = 返回的資料x原始下1個數據每
}
}).subscribe(new Consumer<Integer>() {
@Override
public void accept(@NonNull Integer s) throws Exception {
Log.e(TAG, "最終計算的結果是: "+s);
}
});
- 測試結果
collect()
-
作用
將被觀察者Observable
傳送的資料事件收集到一個數據結構裡 -
具體使用
Observable.just(1, 2, 3 ,4, 5, 6)
.collect(
// 1. 建立資料結構(容器),用於收集被觀察者傳送的資料
new Callable<ArrayList<Integer>>() {
@Override
public ArrayList<Integer> call() throws Exception {
return new ArrayList<>();
}
// 2. 對傳送的資料進行收集
}, new BiConsumer<ArrayList<Integer>, Integer>() {
@Override
public void accept(ArrayList<Integer> list, Integer integer)
throws Exception {
// 引數說明:list = 容器,integer = 後者資料
list.add(integer);
// 對傳送的資料進行收集
}
}).subscribe(new Consumer<ArrayList<Integer>>() {
@Override
public void accept(@NonNull ArrayList<Integer> s) throws Exception {
Log.e(TAG, "本次傳送的資料是: "+s);
}
});
- 測試結果
3.3.3.3 傳送事件前追加發送事件
startWith() / startWithArray()
-
作用
在一個被觀察者傳送事件前,追加發送一些資料 / 一個新的被觀察者 -
具體使用
<-- 在一個被觀察者傳送事件前,追加發送一些資料 -->
// 注:追加資料順序 = 後呼叫先追加
Observable.just(4, 5, 6)
.startWith(0) // 追加單個數據 = startWith()
.startWithArray(1, 2, 3) // 追加多個數據 = startWithArray()
.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Integer value) {
Log.d(TAG, "接收到了事件"+ value );
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "對Error事件作出響應");
}
@Override
public void onComplete() {
Log.d(TAG, "對Complete事件作出響應");
}
});
<-- 在一個被觀察者傳送事件前,追加發送被觀察者 & 傳送資料 -->
// 注:追加資料順序 = 後呼叫先追加
Observable.just(4, 5, 6)
.startWith(Observable.just(1, 2, 3))
.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Integer value) {
Log.d(TAG, "接收到了事件"+ value );
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "對Error事件作出響應");
}
@Override
public void onComplete() {
Log.d(TAG, "對Complete事件作出響應");
}
});
- 測試結果
3.3.3.4 統計傳送事件數量
count()
-
作用
統計被觀察者傳送事件的數量 -
具體使用
// 注:返回結果 = Long型別
Observable.just(1, 2, 3, 4)
.count()
.subscribe(new Consumer<Long>() {
@Override
public void accept(Long aLong) throws Exception {
Log.e(TAG, "傳送的事件數量 = "+aLong);
}
});
- 測試結果
總結