1. 程式人生 > >Rxjava2功能性操作符

Rxjava2功能性操作符

delay():在被觀察者傳送事件前進行一些延遲的操作

// 1. 指定延遲時間
// 引數1 = 時間;引數2 = 時間單位
delay(long delay,TimeUnit unit)

// 2. 指定延遲時間 & 排程器
// 引數1 = 時間;引數2 = 時間單位;引數3 = 執行緒排程器
delay(long delay,TimeUnit unit,mScheduler scheduler)

// 3. 指定延遲時間  & 錯誤延遲
// 錯誤延遲,即:若存在Error事件,則如常執行,執行後再丟擲錯誤異常
// 引數1 = 時間;引數2 = 時間單位;引數3 = 錯誤延遲引數
delay(long delay,TimeUnit unit,boolean delayError)

// 4. 指定延遲時間 & 排程器 & 錯誤延遲
// 引數1 = 時間;引數2 = 時間單位;引數3 = 執行緒排程器;引數4 = 錯誤延遲引數
delay(long delay,TimeUnit unit,mScheduler scheduler,boolean delayError): 指定延遲多長時間並新增排程器,錯誤通知可以設定是否延遲

do():在事件傳送 & 接收的整個生命週期過程中進行操作 如傳送事件前的初始化、傳送事件後的回撥請求等 如圖總結

錯誤處理:傳送事件過程中,遇到錯誤時的處理機制

onErrorReturn() 可捕獲在它之前發生的異常

遇到錯誤時,傳送1個新的Observable:

onErrorResumeNext()   

  1. onErrorResumeNext()攔截的錯誤 = Throwable;若需攔截Exception請用onExceptionResumeNext()
  2. onErrorResumeNext()攔截的錯誤 = Exception,則會將錯誤傳遞給觀察者的onError方法

onExceptionResumeNext()

  1. onExceptionResumeNext()攔截的錯誤 = Exception;若需攔截Throwable
    請用onErrorResumeNext()
  2. 如果onExceptionResumeNext()攔截的錯誤 = Throwable,則會將錯誤傳遞給觀察者的onError方法

retry()重試,即當出現錯誤時,讓被觀察者(Observable)重新發射資料

<-- 1. retry() -->
// 作用:出現錯誤時,讓被觀察者重新發送資料
// 注:若一直錯誤,則一直重新發送

<-- 2. retry(long time) -->
// 作用:出現錯誤時,讓被觀察者重新發送資料(具備重試次數限制
// 引數 = 重試次數
 
<-- 3. retry(Predicate predicate) -->
// 作用:出現錯誤後,判斷是否需要重新發送資料(若需要重新發送& 持續遇到錯誤,則持續重試)
// 引數 = 判斷邏輯

<--  4. retry(new BiPredicate<Integer, Throwable>) -->
// 作用:出現錯誤後,判斷是否需要重新發送資料(若需要重新發送 & 持續遇到錯誤,則持續重試
// 引數 =  判斷邏輯(傳入當前重試次數 & 異常錯誤資訊)

<-- 5. retry(long time,Predicate predicate) -->
// 作用:出現錯誤後,判斷是否需要重新發送資料(具備重試次數限制
// 引數 = 設定重試次數 & 判斷邏輯

retryUntil()出現錯誤後,判斷是否需要重新發送資料

具體使用類似於retry(Predicate predicate),唯一區別:返回 true 則不重新發送數

retryWhen()遇到錯誤時,將發生的錯誤傳遞給一個新的被觀察者(Observable),並決定是否需要重新訂閱原始被觀察者(Observable)& 傳送事件

repeat()無條件地、重複傳送 被觀察者事件

repeat();不傳入引數 = 重複傳送次數 = 無限次
 repeatWhen(Integer int )傳入引數 = 重複傳送次數有限

repeatWhen()有條件地、重複傳送 被觀察者事件

將原始 Observable 停止傳送事件的標識(Complete() / Error())轉換成1個 Object 型別資料傳遞給1個新被觀察者(Observable),以此決定是否重新訂閱 & 傳送原來的 Observable

  1. 若新被觀察者(Observable)返回1個Complete / Error事件,則不重新訂閱 & 傳送原來的 Observable
  2. 若新被觀察者(Observable)返回其餘事件時,則重新訂閱 & 傳送原來的 Observable

// Observable.just(1,2,4).repeatWhen(new Function<Observable<Object>, ObservableSource<?>>() {
            @Override
            // 在Function函式中,必須對輸入的 Observable<Object>進行處理,這裡我們使用的是flatMap操作符接收上游的資料
            public ObservableSource<?> apply(@NonNull Observable<Object> objectObservable) throws Exception {
                // 將原始 Observable 停止傳送事件的標識(Complete() /  Error())轉換成1個 Object 型別資料傳遞給1個新被觀察者(Observable)
                // 以此決定是否重新訂閱 & 傳送原來的 Observable
                // 此處有2種情況:
                // 1. 若新被觀察者(Observable)返回1個Complete() /  Error()事件,則不重新訂閱 & 傳送原來的 Observable
                // 2. 若新被觀察者(Observable)返回其餘事件,則重新訂閱 & 傳送原來的 Observable
                return objectObservable.flatMap(new Function<Object, ObservableSource<?>>() {
                    @Override
                    public ObservableSource<?> apply(@NonNull Object throwable) throws Exception {

                        // 情況1:若新被觀察者(Observable)返回1個Complete() /  Error()事件,則不重新訂閱 & 傳送原來的 Observable
                        return Observable.empty();
                        // Observable.empty() = 傳送Complete事件,但不會回撥觀察者的onComplete()

                        // return Observable.error(new Throwable("不再重新訂閱事件"));
                        // 返回Error事件 = 回撥onError()事件,並接收傳過去的錯誤資訊。

                        // 情況2:若新被觀察者(Observable)返回其餘事件,則重新訂閱 & 傳送原來的 Observable
                        // return Observable.just(1);
                       // 僅僅是作為1個觸發重新訂閱被觀察者的通知,傳送的是什麼資料並不重要,只要不是Complete() /  Error()事件
                    }
                });

            }
        })
                .subscribe(new Observer<Integer>() {
                    @Override
                    public void onSubscribe(Disposable d) {
                        Log.d(TAG, "開始採用subscribe連線");
                    }

                    @Override
                    public void onNext(Integer value) {
                        Log.d(TAG, "接收到了事件" + value);
                    }

                    @Override
                    public void onError(Throwable e) {
                        Log.d(TAG, "對Error事件作出響應:" + e.toString());
                    }

                    @Override
                    public void onComplete() {
                        Log.d(TAG, "對Complete事件作出響應");
                    }

                });