1. 程式人生 > >Android RxJava2(三)組合操作符

Android RxJava2(三)組合操作符

Rxjava由於其基於事件流的鏈式呼叫、邏輯簡潔 & 使用簡單的特點,深受各大 Android開發者的歡迎。因此在學習過程中全面的瞭解了下RxJava的組合操作符。

merge()

原理圖:
這裡寫圖片描述
方法:

public static <T> Observable<T> merge(ObservableSource<? extends T> source1, ObservableSource<? extends T> source2)

作用:
將兩個Observable發射的事件序列組合併成一個時間序列,就想是一個Observable發射的一樣,合併後資料是無序的。
程式碼:

        //這裡的ob1和ob2將在下述所有示例中使用,將不再特殊描述
        final String[] str = new String[]{"a","b","c"};
        final int[] ints = new int[]{1,2,3,4,5};
        Observable ob1 = Observable.interval(500, TimeUnit.MILLISECONDS).map(new Function<Long,String>() {

            @Override
            public String apply
(Long aLong) throws Exception { return str[aLong.intValue()]; } }).take(str.length); Observable ob2 = Observable.interval(300,TimeUnit.MILLISECONDS) .map(new Function<Long,Integer>() { @Override public
Integer apply(Long aLong) throws Exception { return ints[aLong.intValue()]; } }).take(ints.length); Observable.merge(ob1,ob2).subscribe(new Observer() { @Override public void onSubscribe(Disposable d) { } @Override public void onNext(Object value) { Log.e("---", String.valueOf(value)); } @Override public void onError(Throwable e) { } @Override public void onComplete() { } });

上述程式碼中第一個Observable作用是每500毫秒從字串陣列中取一個元素,第二個Observable的作用是每300毫秒從整型陣列中取一個元素,列印結果為:

06-06 22:48:53.410 17668-17707/ E/—: 1
06-06 22:48:53.610 17668-17706/ E/—: a
06-06 22:48:53.710 17668-17707/ E/—: 2
06-06 22:48:54.010 17668-17707/ E/—: 3
06-06 22:48:54.110 17668-17706/ E/—: b
06-06 22:48:54.310 17668-17707/ E/—: 4
06-06 22:48:54.610 17668-17706/ E/—: c
06-06 22:48:54.610 17668-17707/ E/—: 5

mergeArray()

原理圖:
這裡寫圖片描述
方法:

public static <T> Observable<T> mergeArray(ObservableSource<? extends T>... sources) 

作用:
作用和merge類似,只不過是組合多個Observable

concat()

原理圖:
這裡寫圖片描述
方法:

public static <T> Observable<T> concat(ObservableSource<? extends T> source1, ObservableSource<? extends T> source2)
...
 public static <T> Observable<T> concat(
            ObservableSource<? extends T> source1, ObservableSource<? extends T> source2,
            ObservableSource<? extends T> source3, ObservableSource<? extends T> source4)

作用:
功能和merge類似,也是用於將多個Observable合併,最多支援4個,但是concat是有序的,也就是說前一個Observable沒發射完是不會發射後一個Observable的資料的。
程式碼:
將上述程式碼裡面的merge直接換成concat

       Observable.concat(ob1,ob2).subscribe(new Observer() {
            @Override
            public void onSubscribe(Disposable d) {
            }

            @Override
            public void onNext(Object value) {
                Log.e("---", String.valueOf(value));
            }

            @Override
            public void onError(Throwable e) {
            }

            @Override
            public void onComplete() {
            }
        });

程式碼執行結果如下:

06-06 23:01:16.189 27785-27822/ E/---: a
06-06 23:01:16.689 27785-27822/ E/---: b
06-06 23:01:17.189 27785-27822/ E/---: c
06-06 23:01:17.490 27785-27872/ E/---: 1
06-06 23:01:17.790 27785-27872/ E/---: 2
06-06 23:01:18.090 27785-27872/ E/---: 3
06-06 23:01:18.390 27785-27872/ E/---: 4
06-06 23:01:18.690 27785-27872/ E/---: 5

concatArray()

方法:

 public static <T> Observable<T> concatArray(ObservableSource<? extends T>... sources)

作用:
作用同concat類似,將多個Observable進行資料合併

mergeArrayDelayError() & concatArrayDelayError()

方法:

public static <T> Observable<T> concatArrayDelayError(ObservableSource<? extends T>... sources)
public static <T> Observable<T> mergeArrayDelayError(ObservableSource<? extends T>... sources)

作用:
在mergeArray()和concatArray()兩個方法中,如果其中一個Observable傳送了一個Error事件,那麼就會停止傳送事件,如果想onError()事件延遲到所有Observable都發送完事件後再執行,就可以使用mergeArrayDelayError()和concatArrayDelayError()
程式碼:
下面通過程式碼測試下如果中途傳送onError,Observable是否會中斷髮送

       Observable.mergeArray(ob1,Observable.create(new ObservableOnSubscribe() {
           @Override
           public void subscribe(ObservableEmitter e) throws Exception {
               e.onNext(1);
               e.onError(new NumberFormatException());
           }
       })).subscribe(new Observer() {
            @Override
            public void onSubscribe(Disposable d) {
            }

            @Override
            public void onNext(Object value) {
                Log.e("---", String.valueOf(value));
            }

            @Override
            public void onError(Throwable e) {
                Log.e("---","--onError");
            }

            @Override
            public void onComplete() {
            }
        });

上述執行結果為如下:

06-06 23:18:58.930 8575-8575/ E/---: 1
06-06 23:18:58.930 8575-8575/ E/---: --onError

可以發現使用mergeArray()時如果中途傳送onError()會中斷資料的傳送,下面將mergeArray改成mergeArrayDelayError

       Observable.mergeArrayDelayError(ob1,Observable.create(new ObservableOnSubscribe() {
           @Override
           public void subscribe(ObservableEmitter e) throws Exception {
               e.onNext(1);
               e.onError(new NumberFormatException());
           }
       })).subscribe(new Observer() {
            @Override
            public void onSubscribe(Disposable d) {
            }

            @Override
            public void onNext(Object value) {
                Log.e("---", String.valueOf(value));
            }

            @Override
            public void onError(Throwable e) {
                Log.e("---","onError");
            }

            @Override
            public void onComplete() {
            }
        });

更改之後的執行結果如下:

06-06 23:14:18.191 5668-5668/ E/---: 1
06-06 23:14:18.691 5668-5700/ E/---: a
06-06 23:14:19.191 5668-5700/ E/---: b
06-06 23:14:19.691 5668-5700/ E/---: c
06-06 23:14:19.691 5668-5700/ E/---: --onError

startWith() & startWithArray()

原理圖:
這裡寫圖片描述
方法:

public final Observable<T> startWith(ObservableSource<? extends T> other)
public final Observable<T> startWithArray(T... items)

作用:
用於在源Observable發射的資料前插入另一個Observable發射的資料
程式碼:

       ob1.startWith(ob2).subscribe(new Observer() {
            @Override
            public void onSubscribe(Disposable d) {
            }

            @Override
            public void onNext(Object value) {
                Log.e("---", String.valueOf(value));
            }

            @Override
            public void onError(Throwable e) {
            }

            @Override
            public void onComplete() {
            }
        });

程式碼執行結果如下:

06-06 23:07:16.014 32379-32405/ E/---: 1
06-06 23:07:16.314 32379-32405/ E/---: 2
06-06 23:07:16.614 32379-32405/ E/---: 3
06-06 23:07:16.913 32379-32405/ E/---: 4
06-06 23:07:17.214 32379-32405/ E/---: 5
06-06 23:07:17.716 32379-32444/ E/---: a
06-06 23:07:18.215 32379-32444/ E/---: b
06-06 23:07:18.714 32379-32444/ E/---: c

zip()

原理圖:
這裡寫圖片描述
方法:

public static <T1, T2, R> Observable<R> zip(
            ObservableSource<? extends T1> source1, ObservableSource<? extends T2> source2,
            BiFunction<? super T1, ? super T2, ? extends R> zipper)

作用:
用來合併兩個Observable發射的事件,根據BiFunction函式生成一個新的值發射出去。當其中一個Observable傳送資料結束或者出現異常後,另一個Observable也將停止傳送資料。也就是說正常的情況下資料長度會與兩個Observable中最少事件的數量一樣。
程式碼:
簡單的將兩個Observable的資料進行拼接

       Observable.zip(ob1, ob2, new BiFunction<String,Integer,String>() {

           @Override
           public String apply(String s, Integer integer) throws Exception {
               return s+integer;
           }
       }).subscribe(new Observer() {
            @Override
            public void onSubscribe(Disposable d) {
            }

            @Override
            public void onNext(Object value) {
                Log.e("---", String.valueOf(value));
            }

            @Override
            public void onError(Throwable e) {
                Log.e("---","--onError");
            }

            @Override
            public void onComplete() {
            }
        });

執行結果如下:

06-06 23:29:56.547 14888-14926/ E/---: a1
06-06 23:29:57.049 14888-14926/ E/---: b2
06-06 23:29:57.547 14888-14926/ E/---: c3

combineLatest() & combineLatestDelayError()

原理圖:
這裡寫圖片描述
可能上面這張圖不是太好理解,可以看下面這張圖
這裡寫圖片描述
方法:

public static <T1, T2, R> Observable<R> combineLatest(ObservableSource<? extends T1> source1, ObservableSource<? extends T2> source2, BiFunction<? super T1, ? super T2, ? extends R> combiner)

作用:
用於將兩個Observable最近發射的資料經BiFunction函式的規則進行組合,combineLatest()傳送事件的序列是與傳送的時間線有關的。拿上圖解釋當傳送A之後會從上一個Observ拿最近傳送的1進行組合生成‘1A’,當傳送2時拿第二個Observable最近傳送的資料B組合成‘2B’,接下來到事件C時還是取第一個Observable最近傳送的時間2進行組合成‘2C’,以此類推。
程式碼:

       Observable.combineLatest(ob1, ob2, new BiFunction<String,Integer,String>() {

           @Override
           public String apply(String s, Integer integer) throws Exception {
               return s + integer;
           }
       }).subscribe(new Observer() {
            @Override
            public void onSubscribe(Disposable d) {
            }

            @Override
            public void onNext(Object value) {
            }

            @Override
            public void onError(Throwable e) {
                Log.e("---","--onError");
            }

            @Override
            public void onComplete() {
            }
        });

執行結果如下:

06-06 23:39:58.087 22418-22447/ E/---: a1
06-06 23:39:58.190 22418-22448/ E/---: a2
06-06 23:39:58.488 22418-22448/ E/---: a3
06-06 23:39:58.589 22418-22447/ E/---: b3
06-06 23:39:58.788 22418-22448/ E/---: b4
06-06 23:39:59.088 22418-22447/ E/---: c4
06-06 23:39:59.088 22418-22448/ E/---: c5

reduce()

方法:

public final Maybe<T> reduce(BiFunction<T, T, T> reducer)

作用:
與scan()操作符類似,作用是將資料以一定的邏輯聚合起來,這兩個的區別在於scan()沒處理一次資料將會發送一個事件給觀察者,但是reduce()會將所有資料聚合在一起之後才會傳送給觀察者,還有一點區別就是scan的返回值是Observable,而reduce的返回值是Maybe
程式碼:

        Observable.just(1,2,3,4,5).reduce(new BiFunction<Integer, Integer, Integer>() {
            @Override
            public Integer apply(Integer integer, Integer integer2) throws Exception {
                return integer + integer2;
            }
        }).subscribe(new Consumer<Integer>() {
            @Override
            public void accept(Integer integer) throws Exception {
                Log.e("---",integer+"");
            }
        });

上述程式碼的作用就是對陣列資料進行相加處理,最終輸出資料為15

count()

方法:

public final Single<Long> count() 

作用:
統計要傳送事件的總數
程式碼:

        Observable.just(1,1,2,2).count().subscribe(new Consumer<Long>() {
            @Override
            public void accept(Long aLong) throws Exception {
                Log.e("---",aLong+"");
            }
        });

執行結果為:

E/---: 4

collect()

方法:

public final <U> Single<U> collect(Callable<? extends U> initialValueSupplier, BiConsumer<? super U, ? super T> collector)

作用:
收集資料到一個可變的資料結構中
程式碼:

        Observable.just("1","2","3","2")
                .collect(new Callable<List<Integer>>() { //建立資料結構
                    @Override
                    public List<Integer> call() throws Exception {
                        return new ArrayList<Integer>();
                    }
                }, new BiConsumer<List<Integer>, String>() {//收集器
                    @Override
                    public void accept(List<Integer> integers, String s) throws Exception {
                        integers.add(Integer.valueOf(s));
                    }
                }).subscribe(new Consumer<List<Integer>>() {
            @Override
            public void accept(List<Integer> integers) throws Exception {
                Log.e("---",integers+"");
            }
        });

列印結果為:

E/---: [1, 2, 3, 2]