RxJava2.x 萌新之路 操作符篇
操作符總覽
Rxjava為函數語言程式設計提供了眾多的操作符,操作符的運用可以使得程式邏輯更為簡潔。
網上已有眾多操作符說明教學,但不親身總結和嘗試一遍,是難以體會到其中奧妙與融會貫通的,簡單記錄總結以備大家使用參考。
建立操作符
just
自動依次傳送事件序列。
例項:
Observable .just("1", "2", "3", "4", "5", "6", "7", "8", "9", "10")
依次傳送呼叫onNext(),最後預設呼叫complete()
create
手動建立事件序列,返回一個可自由操作的emitter,優點是自由控制事件流程。
emitter.onNext();
emitter.onError();
emitter.onComplete();
fromIterable
傳入陣列並按角標依次傳送事件。
Observable.fromIterable(list),每次接收單個元素。
fromArray
傳入陣列一次性發送,一次接收所有元素。
timer
延時傳送事件 Observable .timer(2, TimeUnit.SECONDS)
interval
可取代CountDownTimer、Handler,5秒傳送一次事件:
Observable .interval(5, TimeUnit.SECONDS)
例項:取代handler進行定時計劃
private Disposable mDisposable; @Override protected void doSomething() { mDisposable = Flowable.interval(20, TimeUnit.SECONDS) .doOnNext(new Consumer<Long>() { @Override public void accept(@NonNull Long aLong) throws Exception { doTask(); } }); } /** * 銷燬時停止計劃 */ @Override protected void onDestroy() { super.onDestroy(); if (mDisposable != null){ mDisposable.dispose(); } }
intervalRange
給事件更多的時間控制:
intervalRange(long start, long count, long initialDelay, long period, TimeUnit unit)
引數1:起始傳送值
引數2:傳送數量
引數3:首次傳送延遲事件
引數4:每次傳送事件間隔
引數5:時間單位
Range
依次傳送範圍內的事件
Observable.range(2, 6),接收型別Integer
轉換操作符
map
實現單個數據的轉換
例項:把網路中ResponseBody用Gson轉換為相對應的資料實體再下發給子類。
.map(new Function<Response, Number>() { @Override public MobileAddress apply(@NonNull Response response) throws Exception { if (response.isSuccessful()) { ResponseBody body = response.body(); if (body != null) { Log.e(TAG, "map:轉換前:" + response.body()); return new Gson().fromJson(body.string(), MobileAddress.class); } } return null; } }).observeOn(AndroidSchedulers.mainThread()) .doOnNext(new Consumer<MobileAddress>() { @Override public void accept(@NonNull MobileAddress s) throws Exception { Log.e(TAG, "doOnNext: Number:" + s.getNumbser() + "\n"); } })
flatMap和concatMap
兩者都可以實現資料集合中一對多事件的轉換,後者會按傳送的順序獲取接收結果,前者可能是亂序接收(不確定哪個事件先完成)。
一對多事件轉換:在flatMap集合中例如可以操作一個公司實體,並轉換為單個部門實體,返回後在後續的accept中,又可以使用單個部門實體對每個成員進行邏輯處理。
例項:
Observable.fromArray(1,2,3,4,5) .flatMap(new Function<Integer, ObservableSource<Integer>>() { @Override public ObservableSource<Integer> apply(@NonNull Integer integer) throws Exception { int delay = 0; if(integer == 3){ delay = 500;//延遲500ms } return Observable.just(integer *10).delay(delay, TimeUnit.MILLISECONDS); } }) .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(new Consumer<Integer>() { @Override public void accept(@NonNull Integer integer) throws Exception { Log.e("tag","accept:"+integer); } });
使用flatMap結果:10,20,40,30,50
使用contactMap結果:10,20,30,40,50
buffer
分批發送事件
例項:
Observable .just(1, 2, 3, 4, 5, 6) .buffer(2)
傳送1,2;傳送3,4;在傳送5,6
合併操作符
merge和contat
兩者都可以合併多個Observable事件,前者傳送順序不確定(並行無序),後者按順序傳送(序列有序)。
mergeArray和concatArray效果相同,適用於大於4個事件的情況。
例項:
定義cache和network兩個事件,先檢視快取是否有資料,有即onNext去重新整理頁面,沒有則onComplete讀取網路資料。
Observable.concat(cache,network)
concatDelayError和 mergeDelayError
兩者都可以在merge和contat操作中出現錯誤時停止傳送當前事件集合,但不影響合併中的另一個事件集合傳送
zip
zip
操作符可以將多個 Observable
的資料結合為一個數據源再發射出去
例項:分別請求生日、地址、性別等資訊後,將多個請求結果合成一個,再進行UI更新。
....分別請求生日、地址... Observable.zip(observable1, observable2, new BiFunction<Birth, Address, String>() { @Override public String apply(@NonNull Birth birth, @NonNull Address address) throws Exception { return "合併後的資料為 Birth:"+birth.getResult()+" Address:"+address.getResult(); } }).subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(new Consumer<String>() { @Override public void accept(@NonNull String s) throws Exception { Log.e(TAG, "accept: 成功:" + s+"\n"); } });
過濾操作符
操作符 | 說明 |
---|---|
filter | 自定義篩選條件,返回boolean |
distinct | 去重 |
distinctUntilChanged | 過濾連續相同事件 |
skip,skipLast | 跳過前n個事件或最後n個 |
take和takeLast | 只接收前n個事件或最後n個 |
elementAt和elementAtOrError | 前者只發送第n個,可設定預設值,不拋異常;後者越界拋異常。 |
ignoreElements | 只接收完成和報錯資訊 |
distinct | 去重 |
ofType | 指定接收資料型別 |
throttleFirst/throttleLast | 只接收指定時間內第一個或最後一個事件 |
其他操作符
do
doOnEach() :當Observable每傳送一次事件就會呼叫一次(包含onNext(),onError(),onComplete())
doOnNext(): 執行 onNext()前呼叫
doAfterNext(): 執行onNext()後呼叫
doOnComplete():執行onComplete()前呼叫
doOnError():執行 onError()前呼叫
doOnTerminate(): 執行終止(無論正常傳送完畢/異常終止)
doFinally(): 最後執行
doOnSubscribe() :觀察者訂閱是呼叫
doOnUnScbscribe(): 觀察者取消訂閱時呼叫
onErrorReturn
捕獲錯誤並返回,不傳送後續事件。
onExceptionResumeNext/onErrorResumeNext
捕獲錯誤跳過當前事件同時不中斷髮送後續事件。
retry
retry()
: 出現錯誤時,讓被觀察者重新發送資料。若錯誤一直髮生,則一直重新發送 retry(long time)
:與retry不同的書,若錯誤一直髮生,被觀察者則一直重新發送資料,但這持續重新發送有次數限制 retry(Predicate predicate)
: 出現錯誤時,根據指定邏輯(可以捕獲到發生的錯誤)決定是否讓被觀察者重新發送資料 retry(new BiPredicate<Integer, Throwable>)
:出現錯誤時,根據指定邏輯(可以捕獲重發的次數和發生的錯誤)決定是否讓被觀察者重新發送資料 retry(long time,Predicate predicate)
: 出現錯誤時,根據指定邏輯(可以捕獲到發生的錯誤)決定是否讓被觀察者重新發送資料。並且有持續重發的次數限制
retryUntil
遇到錯誤時根據制定規則選擇是否重發
retryWhen
遇到錯誤時,將發生的錯誤傳遞給一個新的被觀察者(Observable),並決定是否需要重新訂閱原始被觀察者(Observable)
repeat和repeatWhen
repeat
重複發射 observable的資料序列,可以使無限次也可以是指定次數.不傳時為重複無限次。 repeatWhen
遇到錯誤選擇返回object給新觀察者或中止事件
返回引數選擇:
Observable.empty();
傳送Complete事件,但不會回撥觀察者的Complete()
onComplete()
直接完成。
Observable.error(new Throwable("不再重新訂閱事件"));
Observable.just(1);
繼續傳送事件。
debounce
一定的時間內沒有操作就會發送事件(只會傳送最後一次操作的事件)
例項:
Observable.intervalRange(1, 2, 3, 4, TimeUnit.SECONDS)
.debounce(2, TimeUnit.SECONDS)
只有最後一個4的事件會被髮送(2秒後)
條件操作符
操作符 | 說明 |
---|---|
all | 判斷被觀察者所有事件是否滿足某個事件,如果全部滿足則返回true,都在返回false |
takeUntil | 當事件滿足設定的條件時,該事件的下一個事件不會被髮送了。包含超過臨界條件的第一個事件 |
takeWhile | 當事件滿足設定的條件時,傳送事件 |
skipUntil | 直到設定的條件事件發出之後,開始傳送原始事件。 |
skipWhile | 跳過while範圍內事件 |
amb | 多個Observable序列中,只發送第一個 |
contains | 是否存在特定元素 |
exists | 是否滿足特定條件 |
DefaultIfEmpty | 如果沒有正常結束事件(onComlete執行),返回預設值 |
SequenceEqual | 判斷兩個事件序列是否是相同的資料,相同的順序,相同的終止狀態 |
相似操作符對比
timer()
:用於建立Observable
,延遲傳送一次。 interval()
:用於建立Observable
,跟TimerTask
類似,用於週期性傳送。 delay()
:用於事件流中,可以延遲傳送事件流中的某一次傳送。