Android RxJava2(三)組合操作符
Rxjava由於其基於事件流的鏈式呼叫、邏輯簡潔 & 使用簡單的特點,深受各大 Android開發者的歡迎。因此在學習過程中全面的瞭解了下RxJava的組合操作符。
merge()
原理圖:
方法:
public static <T> Observable<T> merge(ObservableSource<? extends T> source1, ObservableSource<? extends T> source2)
作用:
將兩個Observable發射的事件序列組合併成一個時間序列,就想是一個Observable發射的一樣,合併後資料是無序的。
程式碼:
//這裡的ob1和ob2將在下述所有示例中使用,將不再特殊描述
final String[] str = new String[]{"a","b","c"};
final int[] ints = new int[]{1,2,3,4,5};
Observable ob1 = Observable.interval(500, TimeUnit.MILLISECONDS).map(new Function<Long,String>() {
@Override
public String apply (Long aLong) throws Exception {
return str[aLong.intValue()];
}
}).take(str.length);
Observable ob2 = Observable.interval(300,TimeUnit.MILLISECONDS)
.map(new Function<Long,Integer>() {
@Override
public Integer apply(Long aLong) throws Exception {
return ints[aLong.intValue()];
}
}).take(ints.length);
Observable.merge(ob1,ob2).subscribe(new Observer() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Object value) {
Log.e("---", String.valueOf(value));
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
上述程式碼中第一個Observable作用是每500毫秒從字串陣列中取一個元素,第二個Observable的作用是每300毫秒從整型陣列中取一個元素,列印結果為:
06-06 22:48:53.410 17668-17707/ E/—: 1
06-06 22:48:53.610 17668-17706/ E/—: a
06-06 22:48:53.710 17668-17707/ E/—: 2
06-06 22:48:54.010 17668-17707/ E/—: 3
06-06 22:48:54.110 17668-17706/ E/—: b
06-06 22:48:54.310 17668-17707/ E/—: 4
06-06 22:48:54.610 17668-17706/ E/—: c
06-06 22:48:54.610 17668-17707/ E/—: 5
mergeArray()
原理圖:
方法:
public static <T> Observable<T> mergeArray(ObservableSource<? extends T>... sources)
作用:
作用和merge類似,只不過是組合多個Observable
concat()
原理圖:
方法:
public static <T> Observable<T> concat(ObservableSource<? extends T> source1, ObservableSource<? extends T> source2)
...
public static <T> Observable<T> concat(
ObservableSource<? extends T> source1, ObservableSource<? extends T> source2,
ObservableSource<? extends T> source3, ObservableSource<? extends T> source4)
作用:
功能和merge類似,也是用於將多個Observable合併,最多支援4個,但是concat是有序的,也就是說前一個Observable沒發射完是不會發射後一個Observable的資料的。
程式碼:
將上述程式碼裡面的merge直接換成concat
Observable.concat(ob1,ob2).subscribe(new Observer() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Object value) {
Log.e("---", String.valueOf(value));
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
程式碼執行結果如下:
06-06 23:01:16.189 27785-27822/ E/---: a
06-06 23:01:16.689 27785-27822/ E/---: b
06-06 23:01:17.189 27785-27822/ E/---: c
06-06 23:01:17.490 27785-27872/ E/---: 1
06-06 23:01:17.790 27785-27872/ E/---: 2
06-06 23:01:18.090 27785-27872/ E/---: 3
06-06 23:01:18.390 27785-27872/ E/---: 4
06-06 23:01:18.690 27785-27872/ E/---: 5
concatArray()
方法:
public static <T> Observable<T> concatArray(ObservableSource<? extends T>... sources)
作用:
作用同concat類似,將多個Observable進行資料合併
mergeArrayDelayError() & concatArrayDelayError()
方法:
public static <T> Observable<T> concatArrayDelayError(ObservableSource<? extends T>... sources)
public static <T> Observable<T> mergeArrayDelayError(ObservableSource<? extends T>... sources)
作用:
在mergeArray()和concatArray()兩個方法中,如果其中一個Observable傳送了一個Error事件,那麼就會停止傳送事件,如果想onError()事件延遲到所有Observable都發送完事件後再執行,就可以使用mergeArrayDelayError()和concatArrayDelayError()
程式碼:
下面通過程式碼測試下如果中途傳送onError,Observable是否會中斷髮送
Observable.mergeArray(ob1,Observable.create(new ObservableOnSubscribe() {
@Override
public void subscribe(ObservableEmitter e) throws Exception {
e.onNext(1);
e.onError(new NumberFormatException());
}
})).subscribe(new Observer() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Object value) {
Log.e("---", String.valueOf(value));
}
@Override
public void onError(Throwable e) {
Log.e("---","--onError");
}
@Override
public void onComplete() {
}
});
上述執行結果為如下:
06-06 23:18:58.930 8575-8575/ E/---: 1
06-06 23:18:58.930 8575-8575/ E/---: --onError
可以發現使用mergeArray()時如果中途傳送onError()會中斷資料的傳送,下面將mergeArray改成mergeArrayDelayError
Observable.mergeArrayDelayError(ob1,Observable.create(new ObservableOnSubscribe() {
@Override
public void subscribe(ObservableEmitter e) throws Exception {
e.onNext(1);
e.onError(new NumberFormatException());
}
})).subscribe(new Observer() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Object value) {
Log.e("---", String.valueOf(value));
}
@Override
public void onError(Throwable e) {
Log.e("---","onError");
}
@Override
public void onComplete() {
}
});
更改之後的執行結果如下:
06-06 23:14:18.191 5668-5668/ E/---: 1
06-06 23:14:18.691 5668-5700/ E/---: a
06-06 23:14:19.191 5668-5700/ E/---: b
06-06 23:14:19.691 5668-5700/ E/---: c
06-06 23:14:19.691 5668-5700/ E/---: --onError
startWith() & startWithArray()
原理圖:
方法:
public final Observable<T> startWith(ObservableSource<? extends T> other)
public final Observable<T> startWithArray(T... items)
作用:
用於在源Observable發射的資料前插入另一個Observable發射的資料
程式碼:
ob1.startWith(ob2).subscribe(new Observer() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Object value) {
Log.e("---", String.valueOf(value));
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
程式碼執行結果如下:
06-06 23:07:16.014 32379-32405/ E/---: 1
06-06 23:07:16.314 32379-32405/ E/---: 2
06-06 23:07:16.614 32379-32405/ E/---: 3
06-06 23:07:16.913 32379-32405/ E/---: 4
06-06 23:07:17.214 32379-32405/ E/---: 5
06-06 23:07:17.716 32379-32444/ E/---: a
06-06 23:07:18.215 32379-32444/ E/---: b
06-06 23:07:18.714 32379-32444/ E/---: c
zip()
原理圖:
方法:
public static <T1, T2, R> Observable<R> zip(
ObservableSource<? extends T1> source1, ObservableSource<? extends T2> source2,
BiFunction<? super T1, ? super T2, ? extends R> zipper)
作用:
用來合併兩個Observable發射的事件,根據BiFunction函式生成一個新的值發射出去。當其中一個Observable傳送資料結束或者出現異常後,另一個Observable也將停止傳送資料。也就是說正常的情況下資料長度會與兩個Observable中最少事件的數量一樣。
程式碼:
簡單的將兩個Observable的資料進行拼接
Observable.zip(ob1, ob2, new BiFunction<String,Integer,String>() {
@Override
public String apply(String s, Integer integer) throws Exception {
return s+integer;
}
}).subscribe(new Observer() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Object value) {
Log.e("---", String.valueOf(value));
}
@Override
public void onError(Throwable e) {
Log.e("---","--onError");
}
@Override
public void onComplete() {
}
});
執行結果如下:
06-06 23:29:56.547 14888-14926/ E/---: a1
06-06 23:29:57.049 14888-14926/ E/---: b2
06-06 23:29:57.547 14888-14926/ E/---: c3
combineLatest() & combineLatestDelayError()
原理圖:
可能上面這張圖不是太好理解,可以看下面這張圖
方法:
public static <T1, T2, R> Observable<R> combineLatest(ObservableSource<? extends T1> source1, ObservableSource<? extends T2> source2, BiFunction<? super T1, ? super T2, ? extends R> combiner)
作用:
用於將兩個Observable最近發射的資料經BiFunction函式的規則進行組合,combineLatest()傳送事件的序列是與傳送的時間線有關的。拿上圖解釋當傳送A之後會從上一個Observ拿最近傳送的1進行組合生成‘1A’,當傳送2時拿第二個Observable最近傳送的資料B組合成‘2B’,接下來到事件C時還是取第一個Observable最近傳送的時間2進行組合成‘2C’,以此類推。
程式碼:
Observable.combineLatest(ob1, ob2, new BiFunction<String,Integer,String>() {
@Override
public String apply(String s, Integer integer) throws Exception {
return s + integer;
}
}).subscribe(new Observer() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Object value) {
}
@Override
public void onError(Throwable e) {
Log.e("---","--onError");
}
@Override
public void onComplete() {
}
});
執行結果如下:
06-06 23:39:58.087 22418-22447/ E/---: a1
06-06 23:39:58.190 22418-22448/ E/---: a2
06-06 23:39:58.488 22418-22448/ E/---: a3
06-06 23:39:58.589 22418-22447/ E/---: b3
06-06 23:39:58.788 22418-22448/ E/---: b4
06-06 23:39:59.088 22418-22447/ E/---: c4
06-06 23:39:59.088 22418-22448/ E/---: c5
reduce()
方法:
public final Maybe<T> reduce(BiFunction<T, T, T> reducer)
作用:
與scan()操作符類似,作用是將資料以一定的邏輯聚合起來,這兩個的區別在於scan()沒處理一次資料將會發送一個事件給觀察者,但是reduce()會將所有資料聚合在一起之後才會傳送給觀察者,還有一點區別就是scan的返回值是Observable,而reduce的返回值是Maybe
程式碼:
Observable.just(1,2,3,4,5).reduce(new BiFunction<Integer, Integer, Integer>() {
@Override
public Integer apply(Integer integer, Integer integer2) throws Exception {
return integer + integer2;
}
}).subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.e("---",integer+"");
}
});
上述程式碼的作用就是對陣列資料進行相加處理,最終輸出資料為15
count()
方法:
public final Single<Long> count()
作用:
統計要傳送事件的總數
程式碼:
Observable.just(1,1,2,2).count().subscribe(new Consumer<Long>() {
@Override
public void accept(Long aLong) throws Exception {
Log.e("---",aLong+"");
}
});
執行結果為:
E/---: 4
collect()
方法:
public final <U> Single<U> collect(Callable<? extends U> initialValueSupplier, BiConsumer<? super U, ? super T> collector)
作用:
收集資料到一個可變的資料結構中
程式碼:
Observable.just("1","2","3","2")
.collect(new Callable<List<Integer>>() { //建立資料結構
@Override
public List<Integer> call() throws Exception {
return new ArrayList<Integer>();
}
}, new BiConsumer<List<Integer>, String>() {//收集器
@Override
public void accept(List<Integer> integers, String s) throws Exception {
integers.add(Integer.valueOf(s));
}
}).subscribe(new Consumer<List<Integer>>() {
@Override
public void accept(List<Integer> integers) throws Exception {
Log.e("---",integers+"");
}
});
列印結果為:
E/---: [1, 2, 3, 2]