RxJava2.x 學習教程(三)常用操作符
錯誤處理類
Retry
當原始Observable在遇到錯誤時進行重試,目的是希望本次訂閱不以失敗事件結束!
Observable.just(1, "2") .cast(Integer.class)//將被觀察者傳送的事件資料型別強轉為Integer .retry(3)//如果出錯 重試三次, 如果不寫引數,表示一直重複嘗試 .subscribe(integer -> Log.e("TAG", "" + integer), throwable -> Log.e("TAG", throwable.getMessage())); //執行結果:1 1 1 1 java.lang.String cannot be cast to java.lang.Integer
Catch類
捕獲錯誤,並進行處理!區別於retry的重試!
//onErrorReturn 當原先的被觀察者遇到錯誤時,再去發射一個事件 Observable.just(1,"2") .cast(Integer.class) .onErrorReturn(throwable -> { Log.e("TAG", throwable.getMessage()); return 0; }).subscribe(integer -> Log.e("TAG", "" + integer)); //結果:1 // java.lang.String cannot be cast to java.lang.Integer // 0
//onErrorReturnItem底層呼叫就是onErrorReturn
Observable.just(1,"2")
.cast(Integer.class)
.onErrorReturnItem(0)
.subscribe(integer -> Log.e("TAG", "" + integer));
//結果:1 0
//onErrorResumeNext: 當原始Observable在遇到錯誤時,使用其他Observable的資料序列 Observable.just(1, "2") .cast(Integer.class) .onErrorResumeNext(Observable.just(3,4)) .subscribe(integer -> Log.e("TAG", "" + integer)); //結果:1 3 4
實用工具類( 輔助類)
Delay
延遲發射
Observable.fromArray(1,2,3)
.delay(3, TimeUnit.SECONDS)//3s以後再去發射事件 整體延遲 區別於interval對每個事件的延遲效果
.subscribeOn(Schedulers.io())//io執行緒發射事件
.observeOn(AndroidSchedulers.mainThread())//主執行緒獲取通知
.subscribe(integer -> Log.e("TAG", "" + integer));
// 1 2 3
Do系列
主要是通過註冊回撥的方式,監聽被觀察者的生命週期(鏈式操作的各個過程)。
doAfterTerminate doOnComplete doOnDispose doOnEach doOnError doOnLifecycle
doOnNext doOnSubscribe doOnTerminate onTerminateDetach
doOnNext 監聽emitter.onNext()事件, 可以做一些想做的事情,比如資料儲存,不會原來改變發射的資料,發生在emitter.onNext()之前
Observable.just(1,2,3)
.doOnNext(integer -> Log.e("TAG",++integer + "監聽emitter.onNext() 不會原來改變發射的資料"))
.subscribe(integer -> Log.e("TAG", "" + integer));
05-23 10:23:14.095 3702-3702/com.example.rxjava.rxjavademo E/TAG: 2監聽emitter.onNext() 不會原來改變發射的資料
05-23 10:23:14.095 3702-3702/com.example.rxjava.rxjavademo E/TAG: 1
05-23 10:23:14.095 3702-3702/com.example.rxjava.rxjavademo E/TAG: 3監聽emitter.onNext() 不會原來改變發射的資料
05-23 10:23:14.095 3702-3702/com.example.rxjava.rxjavademo E/TAG: 2
05-23 10:23:14.095 3702-3702/com.example.rxjava.rxjavademo E/TAG: 4監聽emitter.onNext() 不會原來改變發射的資料
05-23 10:23:14.095 3702-3702/com.example.rxjava.rxjavademo E/TAG: 3
doAfterNext 發生在onNext()之後,因為它在訂閱過程中是共享的,所以應當注意執行緒安全的問題!
Observable.just(1,2,3)
.doAfterNext(integer -> Log.e("TAG", "監聽: " + ++integer))
.subscribe(integer -> Log.e("TAG", "" + integer));
doOnComplete:監聽OnComplete
Observable.just(1,2,3)
.doOnComplete(() -> Log.e("TAG", "onComplete over"))
.subscribe(integer -> Log.e("TAG", "" + integer));
//1 2 3 onComplete over
還有一些,用法類似
05-23 11:04:25.805 11261-11261/com.example.rxjava.rxjavademo E/TAG: doOnSubscribe:
05-23 11:04:25.805 11261-11261/com.example.rxjava.rxjavademo E/TAG: doOnLifecycle: false
05-23 11:04:25.805 11261-11261/com.example.rxjava.rxjavademo E/TAG: doOnNext: 1
05-23 11:04:25.805 11261-11261/com.example.rxjava.rxjavademo E/TAG: doOnEach: onNext
05-23 11:04:25.805 11261-11261/com.example.rxjava.rxjavademo E/TAG: 收到訊息: 1
05-23 11:04:25.805 11261-11261/com.example.rxjava.rxjavademo E/TAG: doAfterNext: 1
05-23 11:04:25.805 11261-11261/com.example.rxjava.rxjavademo E/TAG: doOnNext: 2
05-23 11:04:25.805 11261-11261/com.example.rxjava.rxjavademo E/TAG: doOnEach: onNext
05-23 11:04:25.805 11261-11261/com.example.rxjava.rxjavademo E/TAG: 收到訊息: 2
05-23 11:04:25.805 11261-11261/com.example.rxjava.rxjavademo E/TAG: doAfterNext: 2
05-23 11:04:25.805 11261-11261/com.example.rxjava.rxjavademo E/TAG: doOnComplete:
05-23 11:04:25.805 11261-11261/com.example.rxjava.rxjavademo E/TAG: doOnEach: onComplete
05-23 11:04:25.805 11261-11261/com.example.rxjava.rxjavademo E/TAG: doFinally:
05-23 11:04:25.805 11261-11261/com.example.rxjava.rxjavademo E/TAG: doAfterTerminate:
materialize/dematerialize
materialize將Observable轉換成一個通知列表,dematerialize作用正好相反
Observable.just(1,2)
.materialize()
.subscribe(notification -> Log.e("TAG", (notification.isOnNext()?"onNext":notification.isOnComplete()?"onComplete":"onError") + " " + notification.getValue()));
//onNext 1
//onNext 2
//onComplete null
subscribeOn/observeOn(執行緒排程)
subscribeOn,指定被觀察者(上游)執行任務的排程器,observeOn指定觀察者(下游)的排程器。
- 簡單地說,subscribeOn() 指定的就是發射事件的執行緒,observerOn 指定的就是訂閱者接收事件的執行緒
- 多次指定發射事件的執行緒只有第一次指定的有效,也就是說多次呼叫 subscribeOn() 只有第一次的有效,其餘的會被忽略
- 但多次指定訂閱者接收執行緒是可以的,也就是說每呼叫一次 observerOn(),下游的執行緒就會切換一
RxJava 中,已經內建了很多執行緒選項供我們選擇,如:
- Schedulers.io() 代表io操作的執行緒, 通常用於網路,讀寫檔案等io密集型的操作;
- Schedulers.computation() 代表CPU計算密集型的操作, 例如需要大量計算的操作;
- Schedulers.newThread() 代表一個常規的新執行緒;
- AndroidSchedulers.mainThread() 代表Android的主執行緒
條件和布林操作符
All
判定被觀察者發射的資料是否都滿足某個條
Amb
給定多個Observable,只讓第一個發射資料的Observable發射全部資料
List list = new ArrayList<ObservableSource>();
Observable<Integer> observable1 = Observable.just(1,3).delay(1, TimeUnit.SECONDS);//延遲1s發射
Observable<Integer> observable2 = Observable.just(2,4);
list.add(observable1);
list.add(observable2);
Observable.amb(list)
.subscribe(integer -> Log.e("TAG", "" +integer));
//2 4
contains
判定一個Observable是否發射一個特定的值
Observable.just(1,22,3)
.contains(2)//發射的資料是否有2
.subscribe(aBoolean -> Log.e("TAG", "" + aBoolean));
//false
SequenceEqual
判定兩個Observables是否發射相同的資料序列(相同的資料,相同的順序,相同的終止狀態),和是否延遲無關!
Observable<Integer> observable1 = Observable.just(1,3).delay(1, TimeUnit.SECONDS);//延遲1s發射 1 3
Observable<Integer> observable2 = Observable.just(3).startWith(1);//發射3之前 先發射1
Observable.sequenceEqual(observable1, observable2)
.subscribe(aBoolean -> Log.e("TAG", "" + aBoolean));
//true
SkipUntil:丟棄原被觀察者的發射物,直到第二個被觀察者發射了一項資料那一刻!
SkipWhile:當你設定的條件為false的時候, 才發射原來的發射物
TakeUntil:第二個被觀察者發射了一項資料那一刻,停止原始被觀察者的發射物,和SkipUntil相反
TakeWhile:一直髮射原始發射物,直到你設定的條件為fasle,和SkipWhile相反
數學和聚合類
concat
合併多個被觀察者,前面的被觀察者的資料發射完畢,才會傳送後面的,區別於merge
Observable<Integer> observable1 = Observable.just(1,3).delay(1, TimeUnit.SECONDS);
Observable<Integer> observable2 = Observable.just(2,4);
Observable.concat(observable1, observable2)
.subscribe(integer -> Log.e("TAG", "" + integer));
//1 3 2 4
count
攔截原始發射事件並返回其數量,將其發射到下游!
Observable.create((ObservableOnSubscribe<Integer>) emitter -> {
emitter.onNext(1);
emitter.onNext(2);
emitter.onComplete();
}).count()
.subscribe(integer -> Log.e("TAG", "" + integer));
//2
Reduce
Reduce操作符對原始Observable發射資料的第一項應用一個函式,然後再將這個函式的返回值與第二項資料一起傳遞給函式,以此類推,持續這個過程知道原始Observable發射它的最後一項資料並終止,此時Reduce返回的Observable發射這個函式返回的最終值。
Observable.just(1,2,5)
.reduce((integer, integer2) -> {
Log.e("TAG", integer + " --- " + integer2);
return integer+integer2;
}).subscribe(integer -> Log.e("TAG", "" + integer));
05-23 20:09:58.894 32133-32133/com.example.rxjava.rxjavademo E/TAG: 1 --- 2
05-23 20:09:58.894 32133-32133/com.example.rxjava.rxjavademo E/TAG: 5 --- 3
05-23 20:09:58.894 32133-32133/com.example.rxjava.rxjavademo E/TAG: 結果: 8
連結操作符
可連線的被觀察者在被訂閱時並不開始發射資料,只有在它的connect()被呼叫時才開始!
Publish
Publish可將普通的被觀察者轉為可連線的被觀察者。如果一個連線的被觀察者已經開始發射資料,再對其進行訂閱只能接受之後發射的資料,訂閱之前已經發射過的資料就丟失了。
ConnectableObservable<Long> publish = Observable.interval(1000, TimeUnit.MILLISECONDS).take(4).publish();
publish.subscribe(aLong -> Log.e("TAG", "beforeConnect: " + aLong));
publish.connect();//如果不connect 都收不到資料
publish.delaySubscription(2, TimeUnit.SECONDS)//延遲2兩秒訂閱 前兩秒的資料丟失
.subscribe(aLong -> Log.e("TAG", "**afterConnect: " + aLong));
05-24 23:36:05.294 17077-17123/com.example.rxjava.rxjavademo E/TAG: beforeConnect: 0
05-24 23:36:06.294 17077-17123/com.example.rxjava.rxjavademo E/TAG: beforeConnect: 1
05-24 23:36:07.304 17077-17123/com.example.rxjava.rxjavademo E/TAG: beforeConnect: 2
05-24 23:36:07.304 17077-17123/com.example.rxjava.rxjavademo E/TAG: **afterConnect: 2
05-24 23:36:08.304 17077-17123/com.example.rxjava.rxjavademo E/TAG: beforeConnect: 3
05-24 23:36:08.304 17077-17123/com.example.rxjava.rxjavademo E/TAG: **afterConnect: 3
Replay
通過上面的介紹我們瞭解到,ConnectableObservable和普通的被觀察者最大的區別就是,呼叫Connect操作符開始發射資料,後面的訂閱者會丟失之前發射過的資料。
Replay返回的ConnectableObservable 會快取之前已經發射的資料,這樣即使有訂閱者在其發射資料開始之後進行訂閱也能收到之前發射過的資料。Replay操作符能指定快取的大小或者時間,這樣能避免耗費太多記憶體。
//快取所有資料
ConnectableObservable<Long> replay1 = Observable.interval(1000, TimeUnit.MILLISECONDS)
.take(4).replay();
replay1.connect();
replay1.delaySubscription(2, TimeUnit.SECONDS)
.subscribe(aLong -> Log.e("TAG", "" + aLong));
//0 1 2 3 沒有資料丟失
//快取一個數據
ConnectableObservable<Long> replay2 = Observable.interval(1000, TimeUnit.MILLISECONDS)
.take(4).replay(1);
replay2.connect();
replay2.delaySubscription(2, TimeUnit.SECONDS)
.subscribe(aLong -> Log.e("TAG", "" + aLong));
//1 2 3 快取了訂閱前的一個數據
//快取一個數據
ConnectableObservable<Long> replay3 = Observable.interval(1000, TimeUnit.MILLISECONDS)
.take(4).replay(1, TimeUnit.SECONDS);
replay3.connect();
replay3.delaySubscription(2, TimeUnit.SECONDS)
.subscribe(aLong -> Log.e("TAG", "" + aLong));
//1 2 3 快取了訂閱前1s的資料
轉換類To系列
toList
收集原始被觀察者發射的所有資料到一個列表,然後返回這個列表.!
Observable.just(1, 4, 3)
.toList().subscribe(list -> {
Log.e("TAG", list.toString());
});//[1, 4, 3]
toSortedList
收集原始被觀察者發射的所有資料到一個有序列表,然後返回這個列表!
Observable.just(1,4,3)
.toSortedList()//預設升序
.subscribe(integers -> Log.e("TAG", integers.toString()));
// [1, 3, 4]
toMap
將序列資料轉換為一個Map。我們可以根據資料項生成key和生成value!
Observable.just("b","a","c")
//一個引數的用於生成key,value預設是事件資料
//第一個泛型是事件資料型別,第二個泛型是key的型別
.toMap(new Function<String, String>() {
@Override
public String apply(String s) throws Exception {
return s + "-key: ";
}
})//這部分用箭頭函式改寫:.toMap(s -> s + "-key: ")
.subscribe(new Consumer<Map<String, String>>() {
@Override
public void accept(Map<String, String> stringStringMap) throws Exception {
Log.e("TAG", stringStringMap.toString());
}
});
//{c-key: =c, a-key: =a, b-key: =b}
Observable.just(1,2,3)
//兩個引數的 第一個生成key 第二個生成vlaue
.toMap(i -> i + 10, i -> i + "*")
.subscribe(integerIntegerMap -> Log.e("TAG", integerIntegerMap.toString()));
//{12=2*, 11=1*, 13=3*}
還有一些比如blockingIterable(),將原始被觀察者發射的資料裝進一個迭代器Iterable,可以使用forEach等操作!