1. 程式人生 > >3章 RxJava操作符

3章 RxJava操作符

CSDN學院課程地址

3. RxJava操作符

RxJava操作符也是其精髓之一,可以通過一個簡單的操作符,實現複雜的業務邏輯,甚至還可以將操作符組合起來(即RxJava的組合過程),完成更為複雜的業務需求。比如我們前面用到的.create()

.subscribeOn().observeOn().subscribe()都是RxJava的操作符之一,下面我們將對RxJava的操作符進行分析

掌握RxJava操作符前,首先要學會看得懂RxJava的圖片,圖片是RxJava主導的精髓,下面我們通過例子說明

這張圖片我們先要分清楚概念上的東西,上下兩行橫向的直線區域代表著事件流,上面一行(上游)是我們的被觀察者Observable,下面一行(下游)是我們的觀察者Observer,事件流就是從上游的被觀察者傳送給下游的觀察者的。而中間一行的flatMap區域則是我們的操作符部分,它可以對我們的資料進行變換操作。最後,資料流則是圖片上的圓形、方形、菱形等區域,也是從上游流向下游的,不同的形狀代表著不同的資料型別

這張圖片並不是表示沒有被觀察者Observable,而是Create方法本身就是建立了被觀察者,所以可以將被觀察者的上游省略。在進行事件的onNext()分發後,執行onComplete()事件,這樣就表示事件流已經結束,後續如果上游繼續發事件,則下游表示不接收。當事件流的onCompleted()或者onError()正好被呼叫過一次後,此後就不能再呼叫觀察者的任何其它回撥方法

在理解RxJava操作符之前,需要將這幾個概念弄明白,整個操作符的章節都是圍繞這幾個概念進行的

  • 事件流:通過發射器發射的事件,從發射事件到結束事件的過程,這一過程稱為事件流
  • 資料流:通過發射器發射的資料,從資料輸入到資料輸出的過程,這一過程稱為資料流
  • 被觀察者:事件流的上游,即Observable,事件流開始的地方和資料流發射的地方
  • 觀察者:事件流的下游,即Observer,事件流結束的地方和資料流接收的地方

3.1 Creating Observables (建立操作符)

1、create

Observable最原始的建立方式,創建出一個最簡單的事件流,可以使用發射器發射特定的資料型別

public static void main(String[] args) {
    Observable
            .create(new ObservableOnSubscribe<Integer>() {
                @Override
                public void subscribe(@NonNull ObservableEmitter<Integer> e) throws Exception {
                    for (int i = 1; i < 5; i++) {
                        e.onNext(i);
                    }
                    e.onComplete();
                }
            })
            .subscribe(new Consumer<Integer>() {
                @Override
                public void accept(Integer integer) throws Exception {
                    System.out.println("onNext=" + integer);
                }
            }, new Consumer<Throwable>() {
                @Override
                public void accept(Throwable throwable) throws Exception {

                }
            }, new Action() {
                @Override
                public void run() throws Exception {
                    System.out.println("onComplete");
                }
            });
}

輸出

onNext=1
onNext=2
onNext=3
onNext=4
onComplete

2、from

建立一個事件流併發出特定型別的資料流,其發射的資料流型別有如下幾個操作符

public static void main(String[] args) {
    Observable.fromArray(new Integer[]{1, 2, 3, 4, 5})
            .subscribe(new Consumer<Integer>() {
                @Override
                public void accept(Integer integer) throws Exception {
                    System.out.println("onNext=" + integer);
                }
            });
}

輸出

onNext=1
onNext=2
onNext=3
onNext=4
onNext=5

3、just

just操作符和from操作符很像,只是方法的引數有所差別,它可以接受多個引數

public static void main(String[] args) {
    Observable.just(1, 2, 3, 4, 5)
            .subscribe(new Consumer<Integer>() {
                @Override
                public void accept(Integer integer) throws Exception {
                    System.out.println("onNext=" + integer);
                }
            });
}

輸出

onNext=1
onNext=2
onNext=3
onNext=4
onNext=5

4、defer

defer與just的區別是,just是直接將發射當前的資料流,而defer會等到訂閱的時候,才會去執行它的call()回撥,再去發射當前的資料流。複雜點的理解就是:defer操作符是將一組資料流在原有的事件流基礎上快取一個新的事件流,直到有人訂閱的時候,才會建立它快取的事件流

public static void main(String[] args) {

    i = 10;

    Observable<Integer> just = Observable.just(i, i);
    Observable<Object> defer = Observable.defer(new Callable<ObservableSource<?>>() {
        @Override
        public ObservableSource<?> call() throws Exception {
            //快取新的事件流
            return Observable.just(i, i);
        }
    });

    i = 15;

    just.subscribe(new Consumer<Integer>() {
        @Override
        public void accept(Integer integer) throws Exception {
            System.out.println("onNext=" + integer);
        }
    });

    defer.subscribe(new Consumer<Object>() {
        @Override
        public void accept(Object o) throws Exception {
            System.out.println("onNext=" + (int) o);
        }
    });

    i = 20;

    defer.subscribe(new Consumer<Object>() {
        @Override
        public void accept(Object o) throws Exception {
            System.out.println("onNext=" + (int) o);
        }
    });
}

輸出

onNext=10
onNext=10
onNext=15
onNext=15
onNext=20
onNext=20

5、interval

interval操作符是按固定的時間間隔發射一個無限遞增的整數資料流,由於這段程式碼的的延時操作都是非阻塞型的,所以在Java上執行會導致JVM的立馬停止,只能把這段程式碼放在Android來執行,interval預設在computation排程器上執行

public void interval() {
    Observable.interval(1, TimeUnit.SECONDS)
            .subscribe(new Consumer<Long>() {
                @Override
                public void accept(Long aLong) throws Exception {
                    System.out.println("onNext=" + aLong);
                }
            });
}

輸出

onNext=1
onNext=2
onNext=3
onNext=4
......

6、range

range操作符發射一個範圍內的有序整數資料流,你可以指定範圍的起始和長度

public static void main(String[] args) {
    Observable.range(1, 5)
            .subscribe(new Consumer<Integer>() {
                @Override
                public void accept(Integer integer) throws Exception {
                    System.out.println("onNext=" + integer);
                }
            });
}

輸出

onNext=1
onNext=2
onNext=3
onNext=4
onNext=5

7、repeat

repeat操作符可以重複傳送指定次數的某個事件流,repeat操作符預設在trampoline排程器上執行

public static void main(String[] args) {
    Observable.just(1).repeat(5)
            .subscribe(new Consumer<Integer>() {
                @Override
                public void accept(Integer integer) throws Exception {
                    System.out.println("onNext=" + integer);
                }
            });
}

輸出

onNext=1
onNext=1
onNext=1
onNext=1
onNext=1

8、timer

timer操作符可以建立一個延時的事件流,由於這段程式碼的的延時操作都是非阻塞型的,所以在Java上執行會導致JVM的立馬停止,只能把這段程式碼放在Android來執行,預設在computation排程器上執行

public void timer() {
    Observable.timer(5, TimeUnit.SECONDS)
            .subscribe(new Consumer<Long>() {
                @Override
                public void accept(Long aLong) throws Exception {
                    System.out.println("onNext=" + aLong);
                }
            });
}

輸出

onNext=0

9、小結

  1. create():建立最簡單的事件流
  2. from():建立事件流,可傳送不同型別的資料流
  3. just():建立事件流,可傳送多個引數的資料流
  4. defer():建立事件流,可快取可啟用事件流
  5. interval():建立延時重複的事件流
  6. range():建立事件流,可傳送範圍內的資料流
  7. repeat():建立可重複次數的事件流
  8. timer():建立一次延時的事件流

補充:interval()、timer()、delay()的區別

  1. interval():用於建立事件流,週期性重複傳送
  2. timer():用於建立事件流,延時傳送一次
  3. delay():用於事件流中,可以延時某次事件流的傳送

3.2 Transforming Observables (轉換操作符)

1、map

map操作符可以將資料流進行型別轉換

public static void main(String[] args) {
    Observable.just(1).map(new Function<Integer, String>() {
        @Override
        public String apply(Integer integer) throws Exception {
            return "傳送過來的資料會被變成字串" + integer;
        }
    })
            .subscribe(new Consumer<String>() {
                @Override
                public void accept(String s) throws Exception {
                    System.out.println("onNext=" + s);
                }
            });
}

輸出

onNext=傳送過來的資料會被變成字串1

2、flatMap

flatMap操作符將資料流進行型別轉換,然後將新的資料流傳遞給新的事件流進行分發,這裡通過模擬請求登入的延時操作進行說明,由於這段程式碼的的延時操作都是非阻塞型的,所以在Java上執行會導致JVM的立馬停止,只能把這段程式碼放在Android來執行

public void flatMap() {
    Observable.just(new UserParams("hensen", "123456")).flatMap(new Function<UserParams, ObservableSource<String>>() {
        @Override
        public ObservableSource<String> apply(UserParams userParams) throws Exception {
            return Observable.just(userParams.username + "登入成功").delay(2, TimeUnit.SECONDS);
        }
    }).subscribe(new Consumer<String>() {
        @Override
        public void accept(String s) throws Exception {
            System.out.println(s);
        }
    });
}

public static class UserParams {

    public UserParams(String username, String password) {
        this.username = username;
        this.password = password;
    }

    public String username;
    public String password;
}

輸出

hensen登入成功

補充:

  • concatMap與flatMap功能一樣,唯一的區別就是concatMap是有序的,flatMap是亂序的

3、groupBy

groupBy操作符可以將發射出來的資料項進行分組,並將分組後的資料項儲存在具有key-value對映的事件流中。groupBy具體的分組規則由groupBy操作符傳遞進來的函式引數Function所決定的,它可以將key和value按照Function的返回值進行分組,返回一個具有分組規則的事件流GroupedObservable,注意這裡分組出來的事件流是按照原始事件流的順序輸出的,我們可以通過sorted()對資料項進行排序,然後輸出有序的資料流。

public static void main(String[] args) {
    Observable.just("java", "c++", "c", "c#", "javaScript", "Android")
            .groupBy(new Function<String, Character>() {
                @Override
                public Character apply(String s) throws Exception {
                    return s.charAt(0);//按首字母分組
                }
            })
            .subscribe(new Consumer<GroupedObservable<Character, String>>() {
                @Override
                public void accept(final GroupedObservable<Character, String> characterStringGroupedObservable) throws Exception {
                    //排序後,直接訂閱輸出key和value
                    characterStringGroupedObservable.sorted().subscribe(new Consumer<String>() {
                        @Override
                        public void accept(String s) throws Exception {
                            System.out.println("onNext= key:" + characterStringGroupedObservable.getKey() + " value:" + s);
                        }
                    });
                }
            });
}

輸出

onNext= key:A value:Android
onNext= key:c value:c
onNext= key:c value:c#
onNext= key:c value:c++
onNext= key:j value:java
onNext= key:j value:javaScript

4、scan

scan操作符會對發射的資料和上一輪發射的資料進行函式處理,並返回的資料供下一輪使用,持續這個過程來產生剩餘的資料流。其應用場景有簡單的累加計算,判斷所有資料的最小值等

public static void main(String[] args) {
    Observable.just(8, 2, 13, 1, 15).scan(new BiFunction<Integer, Integer, Integer>() {
        @Override
        public Integer apply(Integer integer, Integer integer2) throws Exception {
            return integer < integer2 ? integer : integer2;
        }
    })
            .subscribe(new Consumer<Integer>() {
                @Override
                public void accept(Integer item) throws Exception {
                    System.out.println("onNext=" + item);
                }
            });
}

輸出

onNext=8
onNext=2
onNext=2
onNext=1
onNext=1

5、buffer

buffer操作符可以將發射出來的資料流,在給定的快取池中進行快取,當快取池中的資料項溢滿時,則將快取池的資料項進行輸出,重複上述過程,直到將發射出來的資料全部發射出去。如果發射出來的資料不夠快取池的大小,則按照當前發射出來的數量進行輸出。如果對buffer操作符設定了skip引數,則buffer每次快取池溢滿時,會跳過指定的skip資料項,然後再進行快取和輸出。

Observable.just(1, 2, 3, 4, 5, 6, 7, 8, 9)
        .buffer(5).subscribe(new Consumer<List<Integer>>() {
    @Override
    public void accept(List<Integer> integers) throws Exception {
        System.out.println("onNext=" + integers.toString());
    }
});

輸出

onNext=[1, 2, 3, 4, 5]
onNext=[6, 7, 8, 9]

6、window

window操作符和buffer操作符在功能上實現的效果是一樣的,但window操作符最大區別在於同樣是快取一定數量的資料項,window操作符最終發射出來的是新的事件流integerObservable,而buffer操作符發射出來的是新的資料流,也就是說,window操作符發射出來新的事件流中的資料項,還可以經過Rxjava其他操作符進行處理。

public static void main(String[] args) {
    Observable.just(1, 2, 3, 4, 5, 6, 7, 8, 9)
            .window(2, 1).subscribe(new Consumer<Observable<Integer>>() {
        @Override
        public void accept(Observable<Integer> integerObservable) throws Exception {
            integerObservable.subscribe(new Consumer<Integer>() {
                @Override
                public void accept(Integer integer) throws Exception {
                    System.out.println("onNext=" + integer);
                }
            });
        }
    });
}

輸出

onNext=1
onNext=2
onNext=2
onNext=3
onNext=3
onNext=4
onNext=4
onNext=5
onNext=5
onNext=6
onNext=6
onNext=7
onNext=7
onNext=8
onNext=8
onNext=9
onNext=9

7、小結

  1. map():對資料流的型別進行轉換
  2. flatMap():對資料流的型別進行包裝成另一個數據流
  3. groupby():對所有的資料流進行分組
  4. scan():對上一輪處理過後的資料流進行函式處理
  5. buffer():快取發射的資料流到一定數量,隨後發射出資料流集合
  6. window():快取發射的資料流到一定數量,隨後發射出新的事件流

3.3 Filtering Observables (過濾操作符)

1、debounce

debounce操作符會去過濾掉髮射速率過快的資料項,下面的例子onNext事件可以想象成按鈕的點選事件,如果在2秒種內頻繁的點選,則其點選事件會被忽略,當i為3的除數的時候,發射的事件的時間會超過規定忽略事件的時間,那麼則允許觸發點選事件。這就有點像我們頻繁點選按鈕,但始終只會觸發一次點選事件,這樣就不會導致重複去響應點選事件

public static void main(String[] args) {
    Observable.create(new ObservableOnSubscribe<Integer>() {
        @Override
        public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
            for (int i = 0; i < 100; i++) {
                if (i % 3 == 0) {
                    Thread.sleep(3000);
                } else {
                    Thread.sleep(1000);
                }
                emitter.onNext(i);
            }
        }
    }).debounce(2, TimeUnit.SECONDS)
            .subscribe(new Consumer<Integer>() {
                @Override
                public void accept(Integer integer) throws Exception {
                    System.out.println("onNext=" + integer);
                }
            });
}

輸出

onNext=2
onNext=5
onNext=8
onNext=11
onNext=14
......

2、distinct

distinct操作符會過濾重複傳送的資料項

public static void main(String[] args) {
    Observable.just(1, 2, 3, 4, 1, 2, 3).distinct()
            .subscribe(new Consumer<Integer>() {
                @Override
                public void accept(Integer integer) throws Exception {
                    System.out.println("onNext=" + integer);
                }
            });
}

輸出

onNext=1
onNext=2
onNext=3
onNext=4

3、elementAt

elementAt操作符只取指定的角標的事件

public static void main(String[] args) {
    Observable.just(1, 2, 3, 4, 1, 2, 3).elementAt(0)
            .subscribe(new Consumer<Integer>() {
                @Override
                public void accept(Integer integer) throws Exception {
                    System.out.println("onNext=" + integer);
                }
            });
}

輸出

onNext=1

4、filter

filter操作符可以過濾指定函式的資料項

public static void main(String[] args) {
    Observable.just(1, 2, 3, 4, 1, 2, 3)
            .filter(new Predicate<Integer>() {
                @Override
                public boolean test(Integer integer) throws Exception {
                    return integer > 2;
                }
            })
            .subscribe(new Consumer<Integer>() {
                @Override
                public void accept(Integer integer) throws Exception {
                    System.out.println("onNext=" + integer);
                }
            });
}

輸出

onNext=3
onNext=4
onNext=3

5、first

first操作符只發射第一項資料項

public static void main(String[] args) {
    Observable.just(1, 2, 3, 4, 1, 2, 3)
            .first(7)
            .subscribe(new Consumer<Integer>() {
                @Override
                public void accept(Integer integer) throws Exception {
                    System.out.println("onNext=" + integer);
                }
            });
}

輸出

onNext=1

6、ignoreElements

ignoreElements操作符不發射任何資料,只發射事件流的終止通知

public static void main(String[] args) {
    Observable.just(1, 2, 3, 4, 1, 2, 3)
            .ignoreElements()
            .subscribe(new Action() {
                @Override
                public void run() throws Exception {
                    System.out.println("onComplete");
                }
            });
}

輸出

onComplete

7、last

last操作符只發射最後一項資料

public static void main(String[] args) {
    Observable.just(1, 2, 3, 4, 1, 2, 3)
            .last(7)
            .subscribe(new Consumer<Integer>() {
                @Override
                public void accept(Integer integer) throws Exception {
                    System.out.println("onNext=" + integer);
                }
            });
}

輸出

onNext=3

8、sample

sample操作符會在指定的事件內從資料項中採集所需要的資料,由於這段程式碼的的延時操作都是非阻塞型的,所以在Java上執行會導致JVM的立馬停止,只能把這段程式碼放在Android來執行

public void sample() {
    Observable.interval(1, TimeUnit.SECONDS)
            .sample(2, TimeUnit.SECONDS)
            .subscribe(new Consumer<Long>() {
                @Override
                public void accept(Long aLong) throws Exception {
                    System.out.println("onNext=" + aLong);
                }
            });
}

輸出

onNext=2
onNext=4
onNext=6
onNext=8

9、skip

skip操作符可以忽略事件流發射的前N項資料項,只保留之後的資料

public static void main(String[] args) {
    Observable.just(1, 2, 3, 4, 5, 6, 7, 8)
            .skip(3)
            .subscribe(new Consumer<Integer>() {
                @Override
                public void accept(Integer i) throws Exception {
                    System.out.println("onNext=" + i);
                }
            });
}

輸出

onNext=4
onNext=5
onNext=6
onNext=7
onNext=8

10、skipLast

skipLast操作符可以抑制事件流發射的後N項資料

public static void main(String[] args) {
    Observable.just(1, 2, 3, 4, 5, 6, 7, 8)
            .skipLast(3)
            .subscribe(new Consumer<Integer>() {
                @Override
                public void accept(Integer i) throws Exception {
                    System.out.println("onNext=" + i);
                }
            });
}

輸出

onNext=1
onNext=2
onNext=3
onNext=4
onNext=5

11、take

take操作符可以在事件流中只發射前面的N項資料

public static void main(String[] args) {
    Observable.just(1, 2, 3, 4, 5, 6, 7, 8)
            .take(3)
            .subscribe(new Consumer<Integer>() {
                @Override
                public void accept(Integer i) throws Exception {
                    System.out.println("onNext=" + i);
                }
            });
}

輸出

onNext=1
onNext=2
onNext=3

12、takeLast

takeLast操作符事件流只發射資料流的後N項資料項,忽略前面的資料項

public static void main(String[] args) {
    Observable.just(1, 2, 3, 4, 5, 6, 7, 8)
            .takeLast(3)
            .subscribe(new Consumer<Integer>() {
                @Override
                public void accept(Integer i) throws Exception {
                    System.out.println("onNext=" + i);
                }
            });
}

輸出

onNext=6
onNext=7
onNext=8

還有一個操作符叫takeLastBuffer,它和takeLast類似,,唯一的不同是它把所有的資料項收集到一個List再發射,而不是依次發射一個

13、小結

  1. debounce():事件流只發射規定範圍時間內的資料項
  2. distinct():事件流只發射不重複的資料項
  3. elementAt():事件流只發射第N個數據項
  4. filter():事件流只發射符合規定函式的資料項
  5. first():事件流只發射第一個資料項
  6. ignoreElements():忽略事件流的發射,只發射事件流的終止事件
  7. last():事件流只發射最後一項資料項
  8. sample():事件流對指定的時間間隔進行資料項的取樣
  9. skip():事件流忽略前N個數據項
  10. skipLast():事件流忽略後N個數據項
  11. take():事件流只發射前N個數據項
  12. takeLast():事件流只發射後N個數據項

3.4 Combining Observables (組合操作符)

1、merge/concat

merge操作符可以合併兩個事件流,如果在merge操作符上增加延時傳送的操作,那麼就會導致其發射的資料項是無序的,會跟著發射的時間點進行合併。雖然是將兩個事件流合併成一個事件流進行發射,但在最終的一個事件流中,發射出來的卻是兩次資料流。由於concat操作符和merge操作符的效果是一樣的,這裡只舉一例

merge和concat的區別

  • merge():合併後發射的資料項是無序的
  • concat():合併後發射的資料項是有序的
public static void main(String[] args) {
    Observable<String> just1 = Observable.just("A", "B", "C", "D", "E");
    Observable<String> just2 = Observable.just("1", "2", "3", "4", "5");

    Observable.merge(just1, just2).subscribe(new Consumer<Serializable>() {
        @Override
        public void accept(Serializable serializable) throws Exception {
            System.out.println("onNext=" + serializable.toString());
        }
    });
}

輸出

onNext=A
onNext=B
onNext=C
onNext=D
onNext=E
onNext=1
onNext=2
onNext=3
onNext=4
onNext=5

2、zip

zip操作符是將兩個資料流進行指定的函式規則合併

public static void main(String[] args) {
    Observable<String> just1 = Observable.just("A", "B", "C", "D", "E");
    Observable<String> just2 = Observable.just("1", "2", "3", "4", "5");

    Observable.zip(just1, just2, new BiFunction<String, String, String>() {
        @Override
        public String apply(String s, String s2) throws Exception {
            return s + s2;
        }
    }).subscribe(new Consumer<String>() {
        @Override
        public void accept(String s) throws Exception {
            System.out.println("onNext=" + s);
        }
    });
}

輸出

onNext=A1
onNext=B2
onNext=C3
onNext=D4
onNext=E5

3、startWith

startWith操作符是將另一個數據流合併到原資料流的開頭

public static void main(String[] args) {
    Observable<String> just1 = Observable.just("A", "B", "C", "D", "E");
    Observable<String> just2 = Observable.just("1", "2", "3", "4", "5");

    just1.startWith(just2).subscribe(new Consumer<String>() {
        @Override
        public void accept(String s) throws Exception {
            System.out.println("onNext=" + s);
        }
    });
}

輸出

onNext=1
onNext=2
onNext=3
onNext=4
onNext=5
onNext=A
onNext=B
onNext=C
onNext=D
onNext=E

4、join

join操作符是有時間期限的合併操作符,由於這段程式碼的的延時操作都是非阻塞型的,所以在Java上執行會導致JVM的立馬停止,只能把這段程式碼放在Android來執行

public void join() {
    Observable<String> just1 = Observable.just("A", "B", "C", "D", "E");
    Observable<Long> just2 = Observable.interval(1, TimeUnit.SECONDS);

    just1.join(just2, new Function<String, ObservableSource<Long>>() {
        @Override
        public ObservableSource<Long> apply(String s) throws Exception {
            return Observable.timer(3, TimeUnit.SECONDS);
        }
    }, new Function<Long, ObservableSource<Long>>() {
        @Override
        public ObservableSource<Long> apply(Long l) throws Exception {
            return Observable.timer(8, TimeUnit.SECONDS);
        }
    }, new BiFunction<String, Long, String>() {
        @Override
        public String apply(String s, Long l) throws Exception {
            return s + l;
        }
    }).subscribe(new Consumer<String>() {
        @Override
        public void accept(String s) throws Exception {
            System.out.println("onNext=" + s);
        }
    });
}

join操作符有三個函式需要設定

  • 第一個函式:規定just2的過期期限
  • 第二個函式:規定just1的過期期限
  • 第三個函式:規定just1和just2的合併規則

由於just2的期限只有3秒的時間,而just2延時1秒傳送一次,所以just2只發射了2次,其輸出的結果就只能和just2輸出的兩次進行合併,其輸出格式有點類似我們的排列組合

onNext=A0
onNext=B0
onNext=C0
onNext=D0
onNext=E0
onNext=A1
onNext=B1
onNext=C1
onNext=D1
onNext=E1

5、combineLatest

conbineLatest操作符會尋找其他事件流最近發射的資料流進行合併,由於這段程式碼的的延時操作都是非阻塞型的,所以在Java上執行會導致JVM的立馬停止,只能把這段程式碼放在Android來執行

public static String[] str = {"A", "B", "C", "D", "E"};

public void combineLatest() {
    Observable<String> just1 = Observable.interval(1, TimeUnit.SECONDS).map(new Function<Long, String>() {
        @Override
        public String apply(Long aLong) throws Exception {
            return str[(int) (aLong % 5)];
        }
    });
    Observable<Long> just2 = Observable.interval(1, TimeUnit.SECONDS);

    Observable.combineLatest(just1, just2, new BiFunction<String, Long, String>() {
        @Override
        public String apply(String s, Long l) throws Exception {
            return s + l;
        }
    }).subscribe(new Consumer<String>() {
        @Override
        public void accept(String s) throws Exception {
            System.out.println("onNext=" + s);
        }
    });
}

輸出

onNext=A0
onNext=B0
onNext=B1
onNext=C1
onNext=C2
onNext=D2
onNext=D3
onNext=E3
onNext=E4
onNext=A4
onNext=A5

6、小結

  1. merge()/concat():無序/有序的合併兩個資料流
  2. zip():兩個資料流的資料項合併成一個數據流一同發出
  3. startWith():將待合併的資料流放在自身前面一同發出
  4. join():將資料流進行排列組合發出,不過資料流都是有時間期限的
  5. combineLatest():合併最近發射出的資料項成資料流一同發出

3.5 Error Handling Operators(錯誤處理操作符)

1、onErrorReturn

onErrorReturn操作符表示當錯誤發生時,它會忽略onError的回撥且會發射一個新的資料項並回調onCompleted()

public static void main(String[] args) {
    Observable.create(new ObservableOnSubscribe<Integer>() {
        @Override
        public void subscribe(@NonNull ObservableEmitter<Integer> e) throws Exception {
            for (int i = 1; i < 5; i++) {
                if(i == 4){
                    e.onError(new Exception("onError crash"));
                }
                e.onNext(i);
            }
        }
    })
            .onErrorReturn(new Function<Throwable, Integer>() {
                @Override
                public Integer apply(Throwable throwable) throws Exception {
                    return -1;
                }
            })
            .subscribe(new Consumer<Integer>() {
                @Override
                public void accept(Integer integer) throws Exception {
                    System.out.println("onNext=" + integer);
                }
            }, new Consumer<Throwable>() {
                @Override
                public void accept(Throwable throwable) throws Exception {
                    System.out.println("onError");
                }
            }, new Action() {

                @Override
                public void run() throws Exception {
                    System.out.println("onComplete");
                }
            });
}

輸出

onNext=1
onNext=2
onNext=3
onNext=-1
onComplete

2、onErrorResumeNext

onErrorResumeNext操作符表示當錯誤發生時,它會忽略onError的回撥且會發射一個新的事件流並回調onCompleted()

public static void main(String[] args) {
    Observable.create(new ObservableOnSubscribe<Integer>() {
        @Override
        public void subscribe(@NonNull ObservableEmitter<Integer> e) throws Exception {
            for (int i = 1; i < 5; i++) {
                if(i == 4){
                    e.onError(new Exception("onError crash"));
                }
                e.onNext(i);
            }
        }
    })
            .onErrorResumeNext(new Function<Throwable, ObservableSource<? extends Integer>>() {
                @Override
                public ObservableSource<? extends Integer> apply(Throwable throwable) throws Exception {
                    return Observable.just(-1);
                }
            })
            .subscribe(new Consumer<Integer>() {
                @Override
                public void accept(Integer integer) throws Exception {
                    System.out.println("onNext=" + integer);
                }
            }, new Consumer<Throwable>() {
                @Override
                public void accept(Throwable throwable) throws Exception {
                    System.out.println("onError");
                }
            }, new Action() {

                @Override
                public void run() throws Exception {
                    System.out.println("onComplete");
                }
            });
}

輸出

onNext=1
onNext=2
onNext=3
onNext=-1
onComplete

3、onExceptionResumeNext

onExceptionResumeNext操作符表示當錯誤發生時,如果onError收到的Throwable不是一個Exception,它會回撥onError方法,且不會回撥備用的事件流,如果onError收到的Throwable是一個Exception,它會回撥備用的事件流進行資料的發射

public static void main(String[] args) {
    Observable.create(new ObservableOnSubscribe<Integer>() {
        @Override
        public void subscribe(@NonNull ObservableEmitter<Integer> e) throws Exception {
            for (int i = 1; i < 5; i++) {
                if(i == 4){
                    e.onError(new Exception("onException crash"));
                    //e.onError(new Error("onError crash"));
                }
                e.onNext(i);
            }
        }
    })
            .onExceptionResumeNext(new ObservableSource<Integer>() {
                @Override
                public void subscribe(Observer<? super Integer> observer) {
                    //備用事件流
                    observer.onNext(8);
                }
            })
            .subscribe(new Consumer<Integer>() {
                @Override
                public void accept(Integer integer) throws Exception {
                    System.out.println("onNext=" + integer);
                }
            }, new Consumer<Throwable>() {
                @Override
                public void accept(Throwable throwable) throws Exception {
                    System.out.println("onError");
                }
            }, new Action() {

                @Override
                public void run() throws Exception {
                    System.out.println("onComplete");
                }
            });
}

輸出

onNext=1
onNext=2
onNext=3
onNext=8

4、retry

retry操作符表示當錯誤發生時,發射器會重新發射

public static void main(String[] args) {
    Observable.create(new ObservableOnSubscribe<Integer>() {
        @Override
        public void subscribe(@NonNull ObservableEmitter<Integer> e) throws Exception {
            for (int i = 1; i < 5; i++) {
                if (i == 4) {
                    e.onError(new Exception("onError crash"));
                }
                e.onNext(i);
            }
        }
    })
            .retry(1)
            .onErrorReturn(new Function<Throwable, Integer>() {
                @Override
                public Integer apply(Throwable throwable) throws Exception {
                    return -1;
                }
            })
            .subscribe(new Consumer<Integer>() {
                @Override
                public void accept(Integer integer) throws Exception {
                    System.out.println("onNext=" + integer);
                }
            }, new Consumer<Throwable>() {
                @Override
                public void accept(Throwable throwable) throws Exception {
                    System.out.println("onError");
                }
            }, new Action() {

                @Override
                public void run() throws Exception {
                    System.out.println("onComplete");
                }
            });
}

輸出

onNext=1
onNext=2
onNext=3
onNext=1
onNext=2
onNext=3
onNext=-1
onComplete
  • retry():表示重試無限次
  • retry(long times):表示重試指定次數
  • retry(Func predicate):可以根據函式引數中的Throwable型別和重試次數決定本次需不需要重試

5、retryWhen

retryWhen操作符和retry操作符相似,區別在於retryWhen將錯誤Throwable傳遞給了函式進行處理併產生新的事件流進行處理,由於這段程式碼的的延時操作都是非阻塞型的,所以在Java上執行會導致JVM的立馬停止,只能把這段程式碼放在Android來執行

private static int retryCount = 0;
private static int maxRetries = 2;

public void retryWhen(){
    Observable.create(new ObservableOnSubscribe<Integer>() {
        @Override
        public void subscribe(@NonNull ObservableEmitter<Integer> e) throws Exception {
            for (int i = 1; i < 5; i++) {
                if (i == 4) {
                    e.onError(new Exception("onError crash"));
                }
                e.onNext(i);
            }
        }
    })
            .retryWhen(new Function<Observable<Throwable>, ObservableSource<?>>() {
                @Override
                public ObservableSource<?> apply(Observable<Throwable> throwableObservable) throws Exception {
                    return throwableObservable.flatMap(new Function<Throwable, ObservableSource<?>>() {
                        @Override
                        public ObservableSource<?> apply(Throwable throwable) throws Exception {
                            if (++retryCount <= maxRetries) {
                                // When this Observable calls onNext, the original Observable will be retried (i.e. re-subscribed).
                                System.out.println("get error, it will try after " + 1 + " seconds, retry count " + retryCount);
                                return Observable.timer(1, TimeUnit.SECONDS);
                            }
                            return Observable.error(throwable);
                        }
                    });
                }
            })
            .onErrorReturn(new Function<Throwable, Integer>() {
                @Override
                public Integer apply(Throwable throwable) throws Exception {
                    return -1;
                }
            })
            .subscribe(new Consumer<Integer>() {
                @Override
                public void accept(Integer integer) throws Exception {
                    System.out.println("onNext=" + integer);
                }
            }, new Consumer<Throwable>() {
                @Override
                public void accept(Throwable throwable) throws Exception {
                    System.out.println("onError");
                }
            }, new Action() {

                @Override
                public void run() throws Exception {
                    System.out.println("onComplete");
                }
            });
}

輸出

onNext=1
onNext=2
onNext=3
get error, it will try after 1 seconds, retry count 1
onNext=1
onNext=2
onNext=3
get error, it will try after 1 seconds, retry count 2
onNext=1
onNext=2
onNext=3
onNext=-1
onComplete

6、小結

  • onErrorReturn():當錯誤發生時,它會忽略onError的回撥且會發射一個新的資料項並回調onCompleted()
  • onErrorResumeNext():當錯誤發生時,它會忽略onError的回撥且會發射一個新的事件流並回調onCompleted()
  • onExceptionResumeNext():當錯誤發生時,如果onError收到的Throwable不是一個Exception,它會回撥onError方法,且不會回撥備用的事件流,如果onError收到的Throwable是一個Exception,它會回撥備用的事件流進行資料的發射
  • retry():當錯誤發生時,發射器會重新發射
  • retryWhen():當錯誤發生時,根據Tharowble型別決定發射器是否重新發射

3.6 Observable Utility Operators(輔助性操作符)

1、delay

delay操作符可以延時某次事件傳送的資料流,由於這段程式碼的的延時操作都是非阻塞型的,所以在Java上執行會導致JVM的立馬停止,只能把這段程式碼放在Android來執行

public void deley() {
    Observable.just(1, 2, 3, 4, 5).delay(2, TimeUnit.SECONDS)
            .subscribe(new Consumer<Integer>() {
                @Override
                public void accept(Integer integer) throws Exception {
                    System.out.println("onNext=" + integer);
                }
            });
}

輸出

onNext=1
onNext=2
onNext=3
onNext=4
onNext=5

delay和delaySubscription的效果是一樣的,只不過delay是對資料流的延時,而delaySubscription是對事件流的延時

2、do

do操作符可以監聽整個事件流的生命週期,do操作符分為多個型別,而且每個型別的作用都不同

  1. doOnNext():接收每次傳送的資料項
  2. doOnEach():接收每次傳送的資料項
  3. doOnSubscribe():當事件流被訂閱時被呼叫
  4. doOnDispose():當事件流被釋放時被呼叫
  5. doOnComplete():當事件流被正常終止時被呼叫
  6. doOnError():當事件流被異常終止時被呼叫
  7. doOnTerminate():當事件流被終止之前被呼叫,無論正常終止還是異常終止都會呼叫
  8. doFinally():當事件流被終止之後被呼叫,無論正常終止還是異常終止都會呼叫
public static void main(String[] args) {
    Observable.just(1, 2, 3)
            .doOnNext(new Consumer<Integer>() {
                @Override
                public void accept(Integer integer) throws Exception {
                    System.out.println("doOnNext");
                }
            })
            .doOnEach(new Consumer<Notification<Integer>>() {
                @Override
                public void accept(Notification<Integer> integerNotification) throws Exception {
                    System.out.println("doOnEach");
                }
            })
            .doOnSubscribe(new Consumer<Disposable>() {
                @Override
                public void accept(Disposable disposable) throws Exception {
                    System.out.println("doOnSubscribe");
                }
            })
            .doOnDispose(new Action() {
                @Override
                public void run() throws Exception {
                    System.out.println("doOnDispose");
                }
            })
            .doOnTerminate(new Action() {
                @Override
                public void run() throws Exception {
                    System.out.println("doOnTerminate");
                }
            })
            .doOnError(new Consumer<Throwable>() {
                @Override
                public void accept(Throwable throwable) throws Exception {
                    System.out.println("doOnError");
                }
            })
            .doOnComplete(new Action() {
                @Override
                public void run() throws Exception {
                    System.out.println("doOnComplete");
                }
            })
            .doFinally(new Action() {
                @Override
                public void run() throws Exception {
                    System.out.println("doFinally");
                }
            })
            .subscribe(new Consumer<Integer>() {
                @Override
                public void accept(Integer integer) throws Exception {
                    System.out.println("onNext=" + integer);
                }
            });
}

輸出

doOnSubscribe
doOnNext
doOnEach
onNext=1
doOnNext
doOnEach
onNext=2
doOnNext
doOnEach
onNext=3
doOnEach
doOnTerminate
doOnComplete
doFinally

3、materialize/dematerialize

materialize操作符將發射出的資料項轉換成為一個Notification物件,而dematerialize操作符則是跟materialize操作符相反,這兩個操作符有點類似我們Java物件的裝箱和拆箱功能

public static void main(String[] args) {
    Observable.just(1, 2, 3, 4, 5).materialize()
            .subscribe(new Consumer<Notification<Integer>>() {
                @Override
                public void accept(Notification<Integer> integerNotification) throws Exception {
                    System.out.println("onNext=" + integerNotification.getValue());
                }
            });
    
    Observable.just(1, 2, 3, 4, 5).materialize().dematerialize()
            .subscribe(new Consumer<Object>() {
                @Override
                public void accept(Object object) throws Exception {
                    System.out.println("onNext=" + object.toString());
                }
            });
}

輸出

onNext=1
onNext=2
onNext=3
onNext=4
onNext=5
onNext=null
onNext=1
onNext=2
onNext=3
onNext=4
onNext=5

輸出的時候,materialize會輸出多個null,是因為null的事件為onCompleted事件,而dematerialize把onCompleted事件給去掉了,這個原因也可以從圖片中看出來

4、serialize

serialize操作符可以將非同步執行的事件流進行同步操作,直到事件流結束

public static void main(String[] args) {
    Observable.just(1, 2, 3, 4, 5).serialize()
            .subscribe(new Consumer<Integer>() {
                @Override
                public void accept(Integer integer) throws Exception {
                    System.out.println("onNext=" + integer);
                }
            });
}

輸出

onNext=1
onNext=2
onNext=3
onNext=4
onNext=5

5、timeInterval

timeInterval操作符可以將發射的資料項轉換為帶有時間間隔的資料項,由於這段程式碼的的延時操作都是非阻塞型的,所以在Java上執行會導致JVM的立馬停止,只能把這段程式碼放在Android來執行

public void timeInterval(){
    Observable.interval(2, TimeUnit.SECONDS).timeInterval(TimeUnit.SECONDS)
            .subscribe(new Consumer<Timed<Long>>() {
                @Override
                public void accept(Timed<Long> longTimed) throws Exception {
                    System.out.println("onNext=" + longTimed.value() + " timeInterval=" + longTimed.time());
                }
            });
}

輸出

onNext=0 timeInterval=2
onNext=1 timeInterval=2
onNext=2 timeInterval=2
onNext=3 timeInterval=2
onNext=4 timeInterval=2

6、timeout

timeout操作符表示當發射的資料項超過了規定的限制時間,則發射onError事件,這裡直接讓程式超過規定的限制時間,由於這段程式碼的的延時操作都是非阻塞型的,所以在Java上執行會導致JVM的立馬停止,只能把這段程式碼放在Android來執行

public void timeOut(){
    Observable.interval(2, TimeUnit.SECONDS).timeout(1, TimeUnit.SECONDS)
            .subscribe(new Consumer<Long>() {
                @Override
                public void accept(Long aLong) throws Exception {
                    System.out.println("onNext=" + aLong);
                }
            }, new Consumer<Throwable>() {
                @Override
                public void accept(Throwable throwable) throws Exception {
                    System.out.println("onError");
                }
            });
}

輸出

onError

7、timestamp

timestamp操作符會給每個發射的資料項帶上時間戳,由於這段程式碼的的延時操作都是非阻塞型的,所以在Java上執行會導致JVM的立馬停止,只能把這段程式碼放在Android來執行

public void timeStamp() {
    Observable.interval(2, TimeUnit.SECONDS).timestamp(TimeUnit.MILLISECONDS)
            .subscribe(new Consumer<Timed<Long>>() {
                @Override
                public void accept(Timed<Long> longTimed) throws Exception {
                    System.out.println("onNext=" + longTimed.value() + " timeInterval=" + longTimed.time());

                }
            });
}

輸出

onNext=0 timeInterval=1525755132132
onNext=1 timeInterval=1525755134168
onNext=2 timeInterval=1525755136132
onNext=3 timeInterval=1525755138132

8、using

using操作符可以讓你的事件流存在一次性的資料項,即用完就將資源釋放掉

using操作符接受三個引數:

  • 一個使用者建立一次性資源的工廠函式
  • 一個用於建立一次性事件的工廠函式
  • 一個用於釋放資源的函式
public static class UserBean {
    String name;
    int age;

    public UserBean(String name, int age) {
        this.name = name;
        this.age = age;
    }
}

public static void main(String[] args) {
    Observable.using(new Callable<UserBean>() {
        @Override
        public UserBean call() throws Exception {
            //從網路中獲取某個物件
            return new UserBean("俊俊俊", 22);
        }
    }, new Function<UserBean, ObservableSource<?>>() {
        @Override
        public ObservableSource<?> apply(UserBean userBean) throws Exception {
            //拿出你想要的資源
            return Observable.just(userBean.name);
        }
    }, new Consumer<UserBean>() {
        @Override
        public void accept(UserBean userBean) throws Exception {
            //釋放物件
            userBean = null;
        }
    }).subscribe(new Consumer<Object>() {
        @Override
        public void accept(Object o) throws Exception {
            System.out.println("onNext=" + o.toString());
        }
    });
}

輸出

onNext=俊俊俊

9、to

to操作符可以將資料流中的資料項進行集合的轉換,to操作符分為多個型別,而且每個型別的作用都不同

  1. toList():轉換成List型別的集合
  2. toMap():轉換成Map型別的集合
  3. toMultimap():轉換成一對多(即<A型別,List<B型別>>)的Map型別的集合
  4. toSortedList():轉換成具有排序的List型別的集合
public static void main(String[] args) {
    Observable.just(1, 2, 3, 4, 5).toList()
            .subscribe(new Consumer<List<Integer>>() {
                @Override
                public void accept(List<Integer> integers) throws Exception {
                    System.out.println("onNext=" + integers.toString());
                }
            });
}

輸出

onNext=[1, 2, 3, 4, 5]

10、小結

  1. delay():延遲事件發射的資料項
  2. do():監聽事件流的生命週期
  3. materialize()/dematerialize():對事件流進行裝箱/拆箱
  4. serialize():同步事件流的發射
  5. timeInterval():對事件流增加時間間隔
  6. timeout():對事件流增加限定時間
  7. timestamp():對事件流增加時間戳
  8. using():對事件流增加一次性的資源
  9. to():對資料流中的資料項進行集合的轉換

3.7 Conditional and Boolean Operators(條件和布林操作符)

1、all

all操作符表示對所有資料項進行校驗,如果所有都通過則返回true,否則返回false

public static void main(String[] args) {
    Observable.just(1, 2, 3, 4, 5)
            .all(new Predicate<Integer>() {
                @Override
                public boolean test(Integer integer) throws Exception {
                    return integer > 0;
                }
            })
            .subscribe(new Consumer<Boolean>() {
                @Override
                public void accept(Boolean aBoolean) throws Exception {
                    System.out.println("onNext=" + aBoolean);
                }
            });
}

輸出

onNext=true

2、contains

contains操作符表示事件流中發射的資料項當中是否包含有指定的資料項

public static void main(String[] args) {
    Observable.just(1, 2, 3, 4, 5)
            .contains(2)
            .subscribe(new Consumer<Boolean>() {
                @Override
                public void accept(Boolean aBoolean) throws Exception {
                    System.out.println("onNext=" + aBoolean);
                }
            });
}

輸出

onNext=true

3、amb

amb操作符在多個事件流中只發射最