RxJava操作符(六)——功能操作符
阿新 • • 發佈:2019-01-22
一、功能操作符:輔助被觀察者(Observable) 在傳送事件時實現一些功能性需求
二、功能操作符按照使用功能,大致分類:
- 訂閱:subscribe()
- 執行緒排程:subscribeOn()、observeOn()
- 延遲:delay()
- do操作:do()
- 錯誤處理 :onErrorReturn() 、onErrorResumeNext() 、onExceptionResumeNext()、retry()、retryUntil() 、retryWhen()
- 重複操作符:repeat() 、repeatWhen()
三、使用詳解
- subscribe()
- 作用:訂閱觀察者和被觀察者
- 使用示例
//建立被觀察者物件 Observable observable = Observable.just(1, 2, 3); //建立觀察者物件 Consumer<Integer> integerConsumer = new Consumer<Integer>() { @Override public void accept(Integer integer) throws Exception { } }; //訂閱 observable.subscribe(integerConsumer);
完成訂閱後,當被觀察者傳送事件時,觀察者會執行其實現的方法。
- subscribeOn()/observeOn()
- 作用:切換執行執行緒
- 介紹:
- 在 RxJava模型中,被觀察者 (Observable) / 觀察者(Observer)的工作執行緒 = 建立自身的執行緒
- 建立被觀察者 (Observable) / 觀察者(Observer)的執行緒 = 主執行緒,所以整個過程都會執行在主執行緒
- 所以生產事件 / 接收& 響應事件都發生在主執行緒,而我們有些耗時操作或網路請求是不需要阻塞執行緒的,所以需要在其他執行緒執行操作後,此時就用到subscribeOn()
- 當我們在其他執行緒操作資料後需要更新UI時,此時需要切換到主執行緒使用 observeOn()
3、執行緒型別:在 RxJava
中,內建了多種用於排程的執行緒型別
型別 | 含義 | 應用場景 |
---|---|---|
Schedulers.immediate() | 當前執行緒 = 不指定執行緒 | 預設 |
AndroidSchedulers.mainThread() | Android主執行緒 | 操作UI |
Schedulers.newThread() | 常規新執行緒 | 耗時等操作 |
Schedulers.io() | io操作執行緒 | 網路請求、讀寫檔案等io密集型操作 |
Schedulers.computation() | CPU計算操作執行緒 | 大量計算操作 |
- 注:
RxJava
內部使用 執行緒池 來維護這些執行緒,所以執行緒的排程效率非常高。
4、使用示例:
5、使用注意:Observable.just(1, 2, 3) .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe();
- subscribeOn()針對的是目標過程是呼叫此方法之前的過程,observeOn()操作目標為觀察者物件或其後的執行過程
- 對於subscribeOn()多次呼叫只有第一次起作用,observeOn()每次呼叫都會起作用
Observable.create(new ObservableOnSubscribe() {
@Override
public void subscribe(ObservableEmitter e) throws Exception {
Log.e("Thread===",Thread.currentThread().getName());
e.onNext(1);
}
}).subscribeOn(AndroidSchedulers.mainThread())
.subscribeOn(Schedulers.newThread())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer() {
@Override
public void accept(Object o) throws Exception {
Log.e("thread===",Thread.currentThread().getName());
}
});
輸出結果:
05-06 06:04:00.454 4814-4814/com.example.administrator.googleplay E/Thread===: main
05-06 06:04:00.547 4814-4814/com.example.administrator.googleplay E/thread===: main
上述過程,制定了兩次訂閱的執行緒分別為主執行緒和建立執行緒,從輸出結果上看被觀察者是在main執行緒中傳送的事件,後面的newThread並未起作用,下面看看observeOn(): Observable.create(new ObservableOnSubscribe() {
@Override
public void subscribe(ObservableEmitter e) throws Exception {
e.onNext(1);
}
}).subscribeOn(Schedulers.io())
.observeOn(Schedulers.newThread())
.doOnNext(new Consumer() {
@Override
public void accept(Object o) throws Exception {
Log.e("doOnNext Thread===",Thread.currentThread().getName());
}
})
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer() {
@Override
public void accept(Object o) throws Exception {
Log.e("subscribe Thread===",Thread.currentThread().getName());
}
});
上面使用了兩次observeOn切換執行緒,如果兩者都起作用,那第一個應該建立新執行緒,第二列印的應該是主執行緒,看看輸出結果:05-06 06:10:56.608 5303-5330/com.example.administrator.googleplay E/doOnNext Thread===: RxNewThreadScheduler-1
05-06 06:10:56.624 5303-5303/com.example.administrator.googleplay E/subscribe Thread===: main
輸出結果與分析的一致,所以驗證上面的結論:對於subscribeOn()多次呼叫只有第一次起作用,observeOn()每次呼叫都會起作用
- delay()
- 作用:延遲操作符可以延遲一段時間傳送事件
- delay()的過載
// 1. 指定延遲時間
// 引數1 = 時間;引數2 = 時間單位
delay(long delay,TimeUnit unit)
// 2. 指定延遲時間 & 排程器
// 引數1 = 時間;引數2 = 時間單位;引數3 = 執行緒排程器
delay(long delay,TimeUnit unit,mScheduler scheduler)
// 3. 指定延遲時間 & 錯誤延遲
// 錯誤延遲,即:若存在Error事件,則如常執行,執行後再丟擲錯誤異常
// 引數1 = 時間;引數2 = 時間單位;引數3 = 錯誤延遲引數
delay(long delay,TimeUnit unit,boolean delayError)
// 4. 指定延遲時間 & 排程器 & 錯誤延遲
// 引數1 = 時間;引數2 = 時間單位;引數3 = 執行緒排程器;引數4 = 錯誤延遲引數
delay(long delay,TimeUnit unit,mScheduler scheduler,boolean delayError): 指定延遲多長時間並新增排程
3、使用示例
Observable.just(1,2,3)
.delay(1, TimeUnit.SECONDS)
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
}
});
- do()操作
- 作用 :在某個事件的生命週期中呼叫
- 型別 :do()操作符有很多個,具體如下
3、使用示例
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> e) throws Exception {
e.onNext(1);
e.onNext(2);
e.onNext(3);
e.onError(new Throwable("發生錯誤了"));
}
})
// 1. 當Observable每傳送1次資料事件就會呼叫1次
.doOnEach(new Consumer<Notification<Integer>>() {
@Override
public void accept(Notification<Integer> integerNotification) throws Exception {
Log.d(TAG, "doOnEach: " + integerNotification.getValue());
}
})
// 2. 執行Next事件前呼叫
.doOnNext(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.d(TAG, "doOnNext: " + integer);
}
})
// 3. 執行Next事件後呼叫
.doAfterNext(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.d(TAG, "doAfterNext: " + integer);
}
})
// 4. Observable正常傳送事件完畢後呼叫
.doOnComplete(new Action() {
@Override
public void run() throws Exception {
Log.e(TAG, "doOnComplete: ");
}
})
// 5. Observable傳送錯誤事件時呼叫
.doOnError(new Consumer<Throwable>() {
@Override
public void accept(Throwable throwable) throws Exception {
Log.d(TAG, "doOnError: " + throwable.getMessage());
}
})
// 6. 觀察者訂閱時呼叫
.doOnSubscribe(new Consumer<Disposable>() {
@Override
public void accept(@NonNull Disposable disposable) throws Exception {
Log.e(TAG, "doOnSubscribe: ");
}
})
// 7. Observable傳送事件完畢後呼叫,無論正常傳送完畢 / 異常終止
.doAfterTerminate(new Action() {
@Override
public void run() throws Exception {
Log.e(TAG, "doAfterTerminate: ");
}
})
// 8. 最後執行
.doFinally(new Action() {
@Override
public void run() throws Exception {
Log.e(TAG, "doFinally: ");
}
})
.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Integer value) {
Log.d(TAG, "接收到了事件"+ value );
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "對Error事件作出響應");
}
@Override
public void onComplete() {
Log.d(TAG, "對Complete事件作出響應");
}
});
輸出結果:
錯誤處理符
- onErrorReturn()
- 作用: 當發生一個錯誤時會返回一個數據,並且立即停止操作
- 使用示例:
Observable.create(new ObservableOnSubscribe() {
@Override
public void subscribe(ObservableEmitter e) throws Exception {
e.onNext(1);
e.onNext(2);
e.onNext(3);
e.onError(new Throwable("傳送異常!"));
}
}).onErrorReturn(new Function<Throwable,Integer>() {
@Override
public Integer apply(Throwable o) throws Exception {
Log.e("=====",o.getMessage());
return 110;
}
}).subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer() {
@Override
public void accept(Object o) throws Exception {
Log.e("=====",o + "");
}
}, new Consumer<Throwable>() {
@Override
public void accept(Throwable throwable) throws Exception {
Log.e("=====",throwable.getMessage());
}
});
輸出結果:
05-06 06:43:17.498 6446-6470/com.example.administrator.googleplay E/=====: 傳送異常!
05-06 06:43:17.513 6446-6446/com.example.administrator.googleplay E/=====: 1
05-06 06:43:17.513 6446-6446/com.example.administrator.googleplay E/=====: 2
05-06 06:43:17.513 6446-6446/com.example.administrator.googleplay E/=====: 3
05-06 06:43:17.513 6446-6446/com.example.administrator.googleplay E/=====: 110
在傳送異常事件後首先執行了onErrorReturn輸出異常資訊,然後修改並返回一個數據,此資料作為一個新的事件傳送,從而回掉onNext()方法。
- onErrorResumeNext()
- 作用:當發生一個錯誤時會返回一個新的被觀察著物件
- 使用示例:使用方法與onErrorReturn() 相同,只是將傳送的資料換成了一個Onservable物件
.onErrorResumeNext(new Function<Throwable, ObservableSource>() {
@Override
public ObservableSource apply(Throwable throwable) throws Exception {
return Observable.just("A","B");
}
}
- onExceptionResumeNext()
- 作用:當發生一個異常時返回一個新的被觀察者
- 使用示例同上
注意:
- onExceptionResumeNext()攔截的錯誤 = Exception;若需攔截Throwable請用onErrorResumeNext()
- 若onExceptionResumeNext()攔截的錯誤 = Throwable,則會將錯誤傳遞給觀察者的onError方法
- retry()
- 作用:當發生異常時被觀察者會重新發送事件
- 型別:
<-- 1. retry() -->
// 作用:出現錯誤時,讓被觀察者重新發送資料
// 注:若一直錯誤,則一直重新發送
<-- 2. retry(long time) -->
// 作用:出現錯誤時,讓被觀察者重新發送資料(具備重試次數限制
// 引數 = 重試次數
<-- 3. retry(Predicate predicate) -->
// 作用:出現錯誤後,判斷是否需要重新發送資料(若需要重新發送& 持續遇到錯誤,則持續重試)
// 引數 = 判斷邏輯 //返回false = 不重新重新發送資料 & 呼叫觀察者的onError()結束 //返回true = 重新發送請求(最多重新發送3次)
<-- 4. retry(new BiPredicate<Integer, Throwable>) -->// 作用:出現錯誤後,判斷是否需要重新發送資料(若需要重新發送 & 持續遇到錯誤,則持續重試// 引數 = 判斷邏輯(傳入當前重試次數 & 異常錯誤資訊)//返回false = 不重新重新發送資料 & 呼叫觀察者的onError()結束 //返回true = 重新發送請求(最多重新發送3次)<-- 5. retry(long time,Predicate predicate) -->// 作用:出現錯誤後,判斷是否需要重新發送資料(具備重試次數限制// 引數 = 設定重試次數 & 判斷邏輯 //返回false = 不重新重新發送資料 & 呼叫觀察者的onError()結束 //返回true = 重新發送請求(最多重新發送3次)使用示例:
// 作用:出現錯誤後,判斷是否需要重新發送資料(具備重試次數限制
// 引數 = 設定重試次數 & 判斷邏輯
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> e) throws Exception {
e.onNext(1);
e.onNext(2);
e.onError(new Exception("發生錯誤了"));
e.onNext(3);
}
})
// 攔截錯誤後,判斷是否需要重新發送請求
.retry(3, new Predicate<Throwable>() {
@Override
public boolean test(@NonNull Throwable throwable) throws Exception {
// 捕獲異常
Log.e(TAG, "retry錯誤: "+throwable.toString());
//返回false = 不重新重新發送資料 & 呼叫觀察者的onError()結束
//返回true = 重新發送請求(最多重新發送3次)
return true;
}
})
.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Integer value) {
Log.d(TAG, "接收到了事件"+ value );
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "對Error事件作出響應");
}
@Override
public void onComplete() {
Log.d(TAG, "對Complete事件作出響應");
}
});
- retryUntil()
- 作用:當出現錯誤時判斷是否需要重新發送資料,功能與上述類似
- 使用示例:
.retryUntil(new BooleanSupplier() {
@Override
public boolean getAsBoolean() throws Exception {
return true;
}
})
注意:當返回true時表示不重複傳送,其實從名字中可以看出來retryUntil ,有個Until是當達到某種條件時即可停止
- retryWhen
- 作用:遇到錯誤時,將發生的錯誤傳遞給一個新的被觀察者(Observable),並決定是否需要重新訂閱原始被觀察者Observable 傳送事件
- 使用示例:
Observable.create(new ObservableOnSubscribe() {
@Override
public void subscribe(ObservableEmitter e) throws Exception {
e.onNext(1);
e.onNext(2);
e.onError(new Throwable("傳送異常!"));
e.onNext(3);
}
}).retryWhen(new Function<Observable<Throwable>, ObservableSource<?>>() {
@Override
public ObservableSource<?> apply(Observable<Throwable> throwableObservable) throws Exception {
return throwableObservable.flatMap(new Function<Throwable, ObservableSource<?>>() {
@Override
public ObservableSource<?> apply(Throwable throwable) throws Exception {
return Observable.just(4);
}
});
}
})
輸出結果:
/com.example.administrator.googleplay E/=====: 1
05-06 07:15:02.627 7459-7459/com.example.administrator.googleplay E/=====: 2
05-06 07:15:02.627 7459-7459/com.example.administrator.googleplay E/=====: 1
05-06 07:15:02.627 7459-7459/com.example.administrator.googleplay E/=====: 2
05-06 07:15:02.627 7459-7459/com.example.administrator.googleplay E/=====: 1
從上面可以看出當傳送異常時會重新發送,傳送資料為1,2 迴圈,
注:1. 若 新的被觀察者 Observable傳送的事件 = Error事件,那麼 原始Observable則不重新發送事件:
2. 若 新的被觀察者 Observable傳送的事件 = Next事件 ,那麼原始的Observable則重新發送事件:
現在修改上面的Observable.just(4)為 Observable.error(new Throwable("retryWhen終止啦"));,
Observable.error(new Throwable("retryWhen終止啦"));
再次執行程式,回掉onError()方法,輸出結果:
05-06 07:19:35.237 8015-8015/com.example.administrator.googleplay E/=====: retryWhen終止啦
重複傳送
- repeat()
- 作用:無條件不停止迴圈
- 使用型別
Observable.just(1, 2, 3)
.repeat()
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer() {
@Override
public void accept(Object o) throws Exception {
Log.e("=====", o + "");
}
});
輸出結果:
05-06 07:24:51.407 8210-8210/com.example.administrator.googleplay E/=====: 1
05-06 07:24:51.408 8210-8210/com.example.administrator.googleplay E/=====: 2
05-06 07:24:51.408 8210-8210/com.example.administrator.googleplay E/=====: 3
05-06 07:24:51.408 8210-8210/com.example.administrator.googleplay E/=====: 1
05-06 07:24:51.408 8210-8210/com.example.administrator.googleplay E/=====: 2
05-06 07:24:51.408 8210-8210/com.example.administrator.googleplay E/=====: 3
05-06 07:24:51.408 8210-8210/com.example.administrator.googleplay E/=====: 1
05-06 07:24:51.408 8210-8210/com.example.administrator.googleplay E/=====: 2
05-06 07:24:51.408 8210-8210/com.example.administrator.googleplay E/=====: 3
05-06 07:24:51.408 8210-8210/com.example.administrator.googleplay E/=====: 1
05-06 07:24:51.408 8210-8210/com.example.administrator.googleplay E/=====: 2
05-06 07:24:51.408 8210-8210/com.example.administrator.googleplay E/=====: 3
05-06 07:24:51.408 8210-8210/com.example.administrator.googleplay E/=====: 1
05-06 07:24:51.408 8210-8210/com.example.administrator.googleplay E/=====: 2
05-06 07:24:51.408 8210-8210/com.example.administrator.googleplay E/=====: 3
過載:repeat(3)引數:迴圈的次數,先修改傳入引數2,執行程式
.repeat(2)
輸出結果:
05-06 07:28:17.482 8210-8210/com.example.administrator.googleplay E/=====: 1
05-06 07:28:17.490 8210-8210/com.example.administrator.googleplay E/=====: 2
05-06 07:28:17.490 8210-8210/com.example.administrator.googleplay E/=====: 3
05-06 07:28:17.490 8210-8210/com.example.administrator.googleplay E/=====: 1
05-06 07:28:17.490 8210-8210/com.example.administrator.googleplay E/=====: 2
05-06 07:28:17.509 8210-8210/com.example.administrator.googleplay E/=====: 3
- repeatUntil () / repeat When()
- 作用:按照條件決定是否重複傳送事件
- 使用示例:二者的使用方法與上述的retryUntil()/retryWhen()相同
以上就是RxJava的功能操作符的介紹,功能強大之處在開發中會深有體會。