Rxjava2功能性操作符
delay():在被觀察者傳送事件前進行一些延遲的操作
// 1. 指定延遲時間
// 引數1 = 時間;引數2 = 時間單位
delay(long delay,TimeUnit unit)
// 2. 指定延遲時間 & 排程器
// 引數1 = 時間;引數2 = 時間單位;引數3 = 執行緒排程器
delay(long delay,TimeUnit unit,mScheduler scheduler)
// 3. 指定延遲時間 & 錯誤延遲
// 錯誤延遲,即:若存在Error事件,則如常執行,執行後再丟擲錯誤異常
// 引數1 = 時間;引數2 = 時間單位;引數3 = 錯誤延遲引數
delay(long delay,TimeUnit unit,boolean delayError)
// 4. 指定延遲時間 & 排程器 & 錯誤延遲
// 引數1 = 時間;引數2 = 時間單位;引數3 = 執行緒排程器;引數4 = 錯誤延遲引數
delay(long delay,TimeUnit unit,mScheduler scheduler,boolean delayError): 指定延遲多長時間並新增排程器,錯誤通知可以設定是否延遲
do():在事件傳送 & 接收的整個生命週期過程中進行操作 如傳送事件前的初始化、傳送事件後的回撥請求等 如圖總結
錯誤處理:傳送事件過程中,遇到錯誤時的處理機制
onErrorReturn() 可捕獲在它之前發生的異常
遇到錯誤時,傳送1個新的Observable:
onErrorResumeNext()
onErrorResumeNext()
攔截的錯誤 =Throwable
;若需攔截Exception
請用onExceptionResumeNext()
- 若
onErrorResumeNext()
攔截的錯誤 =Exception
,則會將錯誤傳遞給觀察者的onError
方法
onExceptionResumeNext()
onExceptionResumeNext()
攔截的錯誤 =Exception
;若需攔截Throwable
onErrorResumeNext()
如果onExceptionResumeNext()
攔截的錯誤 =Throwable
,則會將錯誤傳遞給觀察者的onError
方法
retry()重試,即當出現錯誤時,讓被觀察者(Observable
)重新發射資料
<-- 1. retry() -->
// 作用:出現錯誤時,讓被觀察者重新發送資料
// 注:若一直錯誤,則一直重新發送
<-- 2. retry(long time) -->
// 作用:出現錯誤時,讓被觀察者重新發送資料(具備重試次數限制
// 引數 = 重試次數
<-- 3. retry(Predicate predicate) -->
// 作用:出現錯誤後,判斷是否需要重新發送資料(若需要重新發送& 持續遇到錯誤,則持續重試)
// 引數 = 判斷邏輯
<-- 4. retry(new BiPredicate<Integer, Throwable>) -->
// 作用:出現錯誤後,判斷是否需要重新發送資料(若需要重新發送 & 持續遇到錯誤,則持續重試
// 引數 = 判斷邏輯(傳入當前重試次數 & 異常錯誤資訊)
<-- 5. retry(long time,Predicate predicate) -->
// 作用:出現錯誤後,判斷是否需要重新發送資料(具備重試次數限制
// 引數 = 設定重試次數 & 判斷邏輯
retryUntil()出現錯誤後,判斷是否需要重新發送資料
具體使用類似於retry(Predicate predicate)
,唯一區別:返回 true
則不重新發送數
retryWhen()遇到錯誤時,將發生的錯誤傳遞給一個新的被觀察者(Observable
),並決定是否需要重新訂閱原始被觀察者(Observable
)& 傳送事件
repeat()無條件地、重複傳送 被觀察者事件
repeat();不傳入引數 = 重複傳送次數 = 無限次
repeatWhen(Integer int )傳入引數 = 重複傳送次數有限
repeatWhen()有條件地、重複傳送 被觀察者事件
將原始 Observable
停止傳送事件的標識(Complete()
/ Error()
)轉換成1個 Object
型別資料傳遞給1個新被觀察者(Observable
),以此決定是否重新訂閱 & 傳送原來的 Observable
- 若新被觀察者(
Observable
)返回1個Complete
/Error
事件,則不重新訂閱 & 傳送原來的Observable
- 若新被觀察者(
Observable
)返回其餘事件時,則重新訂閱 & 傳送原來的Observable
// Observable.just(1,2,4).repeatWhen(new Function<Observable<Object>, ObservableSource<?>>() {
@Override
// 在Function函式中,必須對輸入的 Observable<Object>進行處理,這裡我們使用的是flatMap操作符接收上游的資料
public ObservableSource<?> apply(@NonNull Observable<Object> objectObservable) throws Exception {
// 將原始 Observable 停止傳送事件的標識(Complete() / Error())轉換成1個 Object 型別資料傳遞給1個新被觀察者(Observable)
// 以此決定是否重新訂閱 & 傳送原來的 Observable
// 此處有2種情況:
// 1. 若新被觀察者(Observable)返回1個Complete() / Error()事件,則不重新訂閱 & 傳送原來的 Observable
// 2. 若新被觀察者(Observable)返回其餘事件,則重新訂閱 & 傳送原來的 Observable
return objectObservable.flatMap(new Function<Object, ObservableSource<?>>() {
@Override
public ObservableSource<?> apply(@NonNull Object throwable) throws Exception {
// 情況1:若新被觀察者(Observable)返回1個Complete() / Error()事件,則不重新訂閱 & 傳送原來的 Observable
return Observable.empty();
// Observable.empty() = 傳送Complete事件,但不會回撥觀察者的onComplete()
// return Observable.error(new Throwable("不再重新訂閱事件"));
// 返回Error事件 = 回撥onError()事件,並接收傳過去的錯誤資訊。
// 情況2:若新被觀察者(Observable)返回其餘事件,則重新訂閱 & 傳送原來的 Observable
// return Observable.just(1);
// 僅僅是作為1個觸發重新訂閱被觀察者的通知,傳送的是什麼資料並不重要,只要不是Complete() / Error()事件
}
});
}
})
.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "開始採用subscribe連線");
}
@Override
public void onNext(Integer value) {
Log.d(TAG, "接收到了事件" + value);
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "對Error事件作出響應:" + e.toString());
}
@Override
public void onComplete() {
Log.d(TAG, "對Complete事件作出響應");
}
});