Android RxJava操作符的學習---功能性操作符
3.4 功能性操作符
-
作用
輔助被觀察者(Observable
) 在傳送事件時實現一些功能性需求 -
實際應用場景
- 連線(訂閱) 觀察者 & 被觀察者
- 執行緒排程(切換)
- 錯誤處理
- 事件生命週期操作
- 延時操作
- 重複傳送操作
-
型別
根據上述應用場景,常見的功能性操作符 主要有:
3.4.3. 應用場景 & 對應操作符詳解
注:在使用RxJava 2
操作符前,記得在專案的Gradle
中新增依賴:
dependencies { compile 'io.reactivex.rxjava2:rxandroid:2.0.1' compile 'io.reactivex.rxjava2:rxjava:2.0.7' // 注:RxJava2 與 RxJava1 不能共存,即依賴不能同時存在 }
3.4.3.1 連線被觀察者 & 觀察者
-
需求場景
即使得被觀察者 & 觀察者 形成訂閱關係 -
對應操作符
subscribe()
-
作用
訂閱,即連線觀察者 & 被觀察者 -
具體使用
observable.subscribe(observer); // 前者 = 被觀察者(observable);後者 = 觀察者(observer 或 subscriber) <-- 1. 分步驟的完整呼叫 --> // 步驟1: 建立被觀察者 Observable 物件 Observable<Integer> observable = Observable.create(new ObservableOnSubscribe<Integer>() { @Override public void subscribe(ObservableEmitter<Integer> emitter) throws Exception { emitter.onNext(1); emitter.onNext(2); emitter.onNext(3); emitter.onComplete(); } }); // 步驟2:建立觀察者 Observer 並 定義響應事件行為 Observer<Integer> observer = new Observer<Integer>() { // 通過複寫對應方法來 響應 被觀察者 @Override public void onSubscribe(Disposable d) { Log.d(TAG, "開始採用subscribe連線"); } // 預設最先呼叫複寫的 onSubscribe() @Override public void onNext(Integer value) { Log.d(TAG, "對Next事件"+ value +"作出響應" ); } @Override public void onError(Throwable e) { Log.d(TAG, "對Error事件作出響應"); } @Override public void onComplete() { Log.d(TAG, "對Complete事件作出響應"); } }; // 步驟3:通過訂閱(subscribe)連線觀察者和被觀察者 observable.subscribe(observer); <-- 2. 基於事件流的鏈式呼叫 --> Observable.create(new ObservableOnSubscribe<Integer>() { // 1. 建立被觀察者 & 生產事件 @Override public void subscribe(ObservableEmitter<Integer> emitter) throws Exception { emitter.onNext(1); emitter.onNext(2); emitter.onNext(3); emitter.onComplete(); } }).subscribe(new Observer<Integer>() { // 2. 通過通過訂閱(subscribe)連線觀察者和被觀察者 // 3. 建立觀察者 & 定義響應事件的行為 @Override public void onSubscribe(Disposable d) { Log.d(TAG, "開始採用subscribe連線"); } // 預設最先呼叫複寫的 onSubscribe() @Override public void onNext(Integer value) { Log.d(TAG, "對Next事件"+ value +"作出響應" ); } @Override public void onError(Throwable e) { Log.d(TAG, "對Error事件作出響應"); } @Override public void onComplete() { Log.d(TAG, "對Complete事件作出響應"); } }); } }
- 測試結果
- 擴充套件說明
<-- Observable.subscribe(Subscriber) 的內部實現 --> public Subscription subscribe(Subscriber subscriber) { subscriber.onStart(); // 在觀察者 subscriber抽象類複寫的方法 onSubscribe.call(subscriber),用於初始化工作 // 通過該呼叫,從而回調觀察者中的對應方法從而響應被觀察者生產的事件 // 從而實現被觀察者呼叫了觀察者的回撥方法 & 由被觀察者向觀察者的事件傳遞,即觀察者模式 // 同時也看出:Observable只是生產事件,真正的傳送事件是在它被訂閱的時候,即當 subscribe() 方法執行時 }
3.4.3.2 執行緒排程
- 需求場景
快速、方便指定 & 控制被觀察者 & 觀察者 的工作執行緒
1). RxJava執行緒控制(排程 / 切換)的作用是什麼?
指定 被觀察者 (Observable)
/ 觀察者(Observer)
的工作執行緒型別。
2). 為什麼要進行RxJava執行緒控制(排程 / 切換)?
2).1 背景
- 在
RxJava
模型中,被觀察者(Observable)
/ 觀察者(Observer)
的工作執行緒 = 建立自身的執行緒
即,若被觀察者
(Observable)
/ 觀察者(Observer)
在主執行緒被建立,那麼他們的工作(生產事件 / 接收& 響應事件)就會發生在主執行緒
- 因為建立被觀察者
(Observable)
/ 觀察者(Observer)
的執行緒 = 主執行緒 - 所以生產事件 / 接收& 響應事件都發生在主執行緒
下面請看1個RxJava的基礎使用
public class MainActivity extends AppCompatActivity {
private static final String TAG = "Rxjava";
@Override
protected void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
setContentView(R.layout.activity_main);
// 步驟1:建立被觀察者 Observable & 傳送事件
// 在主執行緒建立被觀察者 Observable 物件
// 所以生產事件的執行緒是:主執行緒
Observable<Integer> observable = Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
Log.d(TAG, " 被觀察者 Observable的工作執行緒是: " + Thread.currentThread().getName());
// 列印驗證
emitter.onNext(1);
emitter.onComplete();
}
});
// 步驟2:建立觀察者 Observer 並 定義響應事件行為
// 在主執行緒建立觀察者 Observer 物件
// 所以接收 & 響應事件的執行緒是:主執行緒
Observer<Integer> observer = new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "開始採用subscribe連線");
Log.d(TAG, " 觀察者 Observer的工作執行緒是: " + Thread.currentThread().getName());
// 列印驗證
}
@Override
public void onNext(Integer value) {
Log.d(TAG, "對Next事件"+ value +"作出響應" );
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "對Error事件作出響應");
}
@Override
public void onComplete() {
Log.d(TAG, "對Complete事件作出響應");
}
};
// 步驟3:通過訂閱(subscribe)連線觀察者和被觀察者
observable.subscribe(observer);
}
}
- 測試結果
2).2 衝突
- 對於一般的需求場景,需要在子執行緒中實現耗時的操作;然後回到主執行緒實現
UI
操作 - 應用到
RxJava
模型中,可理解為:- 被觀察者
(Observable)
在 子執行緒 中生產事件(如實現耗時操作等等) - 觀察者
(Observer)
在 主執行緒 接收 & 響應事件(即實現UI操作)
- 被觀察者
2).3 解決方案
所以,為了解決上述衝突,即實現 真正的非同步操作,我們需要對RxJava
進行 執行緒控制(也稱為排程 / 切換)
3). 實現方式
採用 RxJava
內建的執行緒排程器( Scheduler
),即通過 功能性操作符subscribeOn()
& observeOn()
實現
3).1 功能性操作符subscribeOn() & observeOn()簡介
- 作用
執行緒控制,即指定 被觀察者(Observable)
/ 觀察者(Observer)
的工作執行緒型別 - 執行緒型別
在RxJava
中,內建了多種用於排程的執行緒型別
型別 | 含義 | 應用場景 |
---|---|---|
Schedulers.immediate() | 當前執行緒 = 不指定執行緒 | 預設 |
AndroidSchedulers.mainThread() | Android主執行緒 | 操作UI |
Schedulers.newThread() | 常規新執行緒 | 耗時等操作 |
Schedulers.io() | io操作執行緒 | 網路請求、讀寫檔案等io密集型操作 |
Schedulers.computation() | CPU計算操作執行緒 | 大量計算操作 |
- 注:
RxJava
內部使用 執行緒池 來維護這些執行緒,所以執行緒的排程效率非常高。
3).2 具體使用
- 具體是在 (上述步驟3)通過訂閱(subscribe)連線觀察者和被觀察者中實現
<-- 使用說明 -->
// Observable.subscribeOn(Schedulers.Thread):指定被觀察者 傳送事件的執行緒(傳入RxJava內建的執行緒型別)
// Observable.observeOn(Schedulers.Thread):指定觀察者 接收 & 響應事件的執行緒(傳入RxJava內建的執行緒型別)
<-- 例項使用 -->
// 步驟3:通過訂閱(subscribe)連線觀察者和被觀察者
observable.subscribeOn(Schedulers.newThread()) // 1. 指定被觀察者 生產事件的執行緒
.observeOn(AndroidSchedulers.mainThread()) // 2. 指定觀察者 接收 & 響應事件的執行緒
.subscribe(observer); // 3. 最後再通過訂閱(subscribe)連線觀察者和被觀察者
-
測試結果
-
特別注意
1. 若Observable.subscribeOn()
多次指定被觀察者 生產事件的執行緒,則只有第一次指定有效,其餘的指定執行緒無效
// 步驟3:通過訂閱(subscribe)連線觀察者和被觀察者
observable.subscribeOn(Schedulers.newThread()) // 第一次指定被觀察者執行緒 = 新執行緒
.subscribeOn(AndroidSchedulers.mainThread()) // 第二次指定被觀察者執行緒 = 主執行緒
.observeOn(AndroidSchedulers.mainThread())
.subscribe(observer);
- 測試結果:被觀察者的執行緒 = 第一次指定的執行緒 = 新的工作執行緒,第二次指定的執行緒(主執行緒)無效
2. 若Observable.observeOn()多次指定觀察者 接收 & 響應事件的執行緒,則每次指定均有效,即每指定一次,就會進行一次執行緒的切換
// 步驟3:通過訂閱(subscribe)連線觀察者和被觀察者
observable.subscribeOn(Schedulers.newThread())
.observeOn(AndroidSchedulers.mainThread()) // 第一次指定觀察者執行緒 = 主執行緒
.doOnNext(new Consumer<Integer>() { // 生產事件
@Override
public void accept(Integer integer) throws Exception {
Log.d(TAG, "第一次觀察者Observer的工作執行緒是: " + Thread.currentThread().getName());
}
})
.observeOn(Schedulers.newThread()) // 第二次指定觀察者執行緒 = 新的工作執行緒
.doOnNext(new Consumer<Integer>() { // 生產事件
@Override
public void accept(Integer integer) throws Exception {
Log.d(TAG, "第二次觀察者Observer的工作執行緒是: " + Thread.currentThread().getName());
}
})
.subscribe(observer); // 生產事件
// 注:
// 1. 整體方法呼叫順序:觀察者.onSubscribe()> 被觀察者.subscribe()> 觀察者.doOnNext()>觀察者.onNext()>觀察者.onComplete()
// 2. 觀察者.onSubscribe()固定在主執行緒進行
- 測試結果:每呼叫一次
observeOn()
,觀察者的執行緒就會切換一次
4). 具體例項
下面,我將採用最常見的 Retrofit + RxJava
實現 網路請求 的功能,從而說明 RxJava
的執行緒控制的具體應用
4).1 功能說明
- 實現功能:將中文翻譯成英文 - > 顯示到介面
- 實現方案:採用
Get
方法對 金山詞霸API 傳送網路請求
- 先切換到工作執行緒 傳送網路請求
- 再切換到主執行緒進行
UI
更新
4).2 步驟說明
- 新增依賴
- 建立 接收伺服器返回資料 的類
- 建立 用於描述網路請求 的介面(區別於傳統形式)
- 建立 Retrofit 例項
- 建立 網路請求介面例項 並 配置網路請求引數(區別於傳統形式)
- 傳送網路請求(區別於傳統形式)
- 傳送網路請求
- 對返回的資料進行處理
4).3 步驟實現
步驟1: 新增依賴
a. 在 Gradle
加入Retrofit
庫的依賴
build.gradle
dependencies {
// Android 支援 Rxjava
// 此處一定要注意使用RxJava2的版本
compile 'io.reactivex.rxjava2:rxjava:2.0.1'
compile 'io.reactivex.rxjava2:rxandroid:2.0.1'
// Android 支援 Retrofit
compile 'com.squareup.retrofit2:retrofit:2.1.0'
// 銜接 Retrofit & RxJava
// 此處一定要注意使用RxJava2的版本
compile 'com.jakewharton.retrofit:retrofit2-rxjava2-adapter:1.0.0'
// 支援Gson解析
compile 'com.squareup.retrofit2:converter-gson:2.1.0'
}
b. 新增 網路許可權
AndroidManifest.xml
<uses-permission android:name="android.permission.INTERNET"/>
步驟2:建立 接收伺服器返回資料 的類
- 金山詞霸
API
的資料格式說明如下:
// URL模板
http://fy.iciba.com/ajax.php
// URL例項
http://fy.iciba.com/ajax.php?a=fy&f=auto&t=auto&w=hello%20world
// 引數說明:
// a:固定值 fy
// f:原文內容型別,日語取 ja,中文取 zh,英語取 en,韓語取 ko,德語取 de,西班牙語取 es,法語取 fr,自動則取 auto
// t:譯文內容型別,日語取 ja,中文取 zh,英語取 en,韓語取 ko,德語取 de,西班牙語取 es,法語取 fr,自動則取 auto
// w:查詢內容
- 根據 金山詞霸API 的資料格式,建立 接收伺服器返回資料 的類:
Translation.java
public class Translation {
private int status;
private content content;
private static class content {
private String from;
private String to;
private String vendor;
private String out;
private int errNo;
}
//定義 輸出返回資料 的方法
public void show() {
System.out.println( "Rxjava翻譯結果:" + status);
System.out.println("Rxjava翻譯結果:" + content.from);
System.out.println("Rxjava翻譯結果:" + content.to);
System.out.println("Rxjava翻譯結果:" + content.vendor);
System.out.println("Rxjava翻譯結果:" + content.out);
System.out.println("Rxjava翻譯結果:" + content.errNo);
}
}
步驟3:建立 用於描述網路請求 的介面
採用 註解 + Observable<...>
介面描述 網路請求引數
GetRequestthread_Interface.java
public interface GetRequest_Interface {
@GET("ajax.php?a=fy&f=auto&t=auto&w=hi%20world")
Observable<Translation> getCall();
// 註解裡傳入 網路請求 的部分URL地址
// Retrofit把網路請求的URL分成了兩部分:一部分放在Retrofit物件裡,另一部分放在網路請求接口裡
// 如果接口裡的url是一個完整的網址,那麼放在Retrofit物件裡的URL可以忽略
// 採用Observable<...>介面
// getCall()是接受網路請求資料的方法
}
接下來的步驟均在MainActivity.java內實現(請看註釋)
MainActivity.java
public class MainActivity extends AppCompatActivity {
private static final String TAG = "Rxjava";
@Override
protected void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
setContentView(R.layout.activity_main);
//步驟4:建立Retrofit物件
Retrofit retrofit = new Retrofit.Builder()
.baseUrl("http://fy.iciba.com/") // 設定 網路請求 Url
.addConverterFactory(GsonConverterFactory.create()) //設定使用Gson解析(記得加入依賴)
.addCallAdapterFactory(RxJava2CallAdapterFactory.create()) // 支援RxJava
.build();
// 步驟5:建立 網路請求介面 的例項
GetRequest_Interface request = retrofit.create(GetRequest_Interface.class);
// 步驟6:採用Observable<...>形式 對 網路請求 進行封裝
Observable<Translation> observable = request.getCall();
// 步驟7:傳送網路請求
observable.subscribeOn(Schedulers.io()) // 在IO執行緒進行網路請求
.observeOn(AndroidSchedulers.mainThread()) // 回到主執行緒 處理請求結果
.subscribe(new Observer<Translation>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "開始採用subscribe連線");
}
@Override
public void onNext(Translation result) {
// 步驟8:對返回的資料進行處理
result.show() ;
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "請求失敗");
}
@Override
public void onComplete() {
Log.d(TAG, "請求成功");
}
});
}
}
4).4 測試結果
5). 特別注意
5).1 依賴包問題
- 問題說明
示意圖
- 解決方法
通過在Gradle
使用packageOptions
解決
build.gradle
android {
...
packagingOptions {
exclude 'META-INF/rxjava.properties'
}
}
5).2 應用程式崩潰問題
- 背景:在傳送網路請求時 退出當前
Activity
- 衝突:此時如果回到主執行緒更新
UI
,App
會崩潰 - 解決方案:當
Activity
退出時,呼叫Disposable.dispose()
切斷觀察者和被觀察者的連線,使得觀察者無法收到事件 & 響應事件
當出現多個
Disposable
時,可採用RxJava
內建容器CompositeDisposable
進行統一管理
// 新增Disposable到CompositeDisposable容器
CompositeDisposable.add()
// 清空CompositeDisposable容器
CompositeDisposable.clear()
3.4.3.3 延遲操作
-
需求場景
即在被觀察者傳送事件前進行一些延遲的操作 -
對應操作符使用
delay()
-
作用
使得被觀察者延遲一段時間再發送事件 -
方法介紹
delay()
具備多個過載方法,具體如下:
// 1. 指定延遲時間
// 引數1 = 時間;引數2 = 時間單位
delay(long delay,TimeUnit unit)
// 2. 指定延遲時間 & 排程器
// 引數1 = 時間;引數2 = 時間單位;引數3 = 執行緒排程器
delay(long delay,TimeUnit unit,mScheduler scheduler)
// 3. 指定延遲時間 & 錯誤延遲
// 錯誤延遲,即:若存在Error事件,則如常執行,執行後再丟擲錯誤異常
// 引數1 = 時間;引數2 = 時間單位;引數3 = 錯誤延遲引數
delay(long delay,TimeUnit unit,boolean delayError)
// 4. 指定延遲時間 & 排程器 & 錯誤延遲
// 引數1 = 時間;引數2 = 時間單位;引數3 = 執行緒排程器;引數4 = 錯誤延遲引數
delay(long delay,TimeUnit unit,mScheduler scheduler,boolean delayError): 指定延遲多長時間並新增排程器,錯誤通知可以設定是否延遲
- 具體使用
Observable.just(1, 2, 3)
.delay(3, TimeUnit.SECONDS) // 延遲3s再發送,由於使用類似,所以此處不作全部展示
.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Integer value) {
Log.d(TAG, "接收到了事件"+ value );
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "對Error事件作出響應");
}
@Override
public void onComplete() {
Log.d(TAG, "對Complete事件作出響應");
}
});
- 測試結果
3.4.3.4 在事件的生命週期中操作
- 需求場景
在事件傳送 & 接收的整個生命週期過程中進行操作
如傳送事件前的初始化、傳送事件後的回撥請求等
- 對應操作符使用
do()
- 作用
在某個事件的生命週期中呼叫 - 型別
do()
操作符有很多個,具體如下:
- 具體使用
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> e) throws Exception {
e.onNext(1);
e.onNext(2);
e.onNext(3);
e.onError(new Throwable("發生錯誤了"));
}
})
// 1. 當Observable每傳送1次資料事件就會呼叫1次
.doOnEach(new Consumer<Notification<Integer>>() {
@Override
public void accept(Notification<Integer> integerNotification) throws Exception {
Log.d(TAG, "doOnEach: " + integerNotification.getValue());
}
})
// 2. 執行Next事件前呼叫
.doOnNext(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.d(TAG, "doOnNext: " + integer);
}
})
// 3. 執行Next事件後呼叫
.doAfterNext(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.d(TAG, "doAfterNext: " + integer);
}
})
// 4. Observable正常傳送事件完畢後呼叫
.doOnComplete(new Action() {
@Override
public void run() throws Exception {
Log.e(TAG, "doOnComplete: ");
}
})
// 5. Observable傳送錯誤事件時呼叫
.doOnError(new Consumer<Throwable>() {
@Override
public void accept(Throwable throwable) throws Exception {
Log.d(TAG, "doOnError: " + throwable.getMessage());
}
})
// 6. 觀察者訂閱時呼叫
.doOnSubscribe(new Consumer<Disposable>() {
@Override
public void accept(@NonNull Disposable disposable) throws Exception {
Log.e(TAG, "doOnSubscribe: ");
}
})
// 7. Observable傳送事件完畢後呼叫,無論正常傳送完畢 / 異常終止
.doAfterTerminate(new Action() {
@Override
public void run() throws Exception {
Log.e(TAG, "doAfterTerminate: ");
}
})
// 8. 最後執行
.doFinally(new Action() {
@Override
public void run() throws Exception {
Log.e(TAG, "doFinally: ");
}
})
.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Integer value) {
Log.d(TAG, "接收到了事件"+ value );
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "對Error事件作出響應");
}
@Override
public void onComplete() {
Log.d(TAG, "對Complete事件作出響應");
}
});
- 測試結果
3.4.3.5 錯誤處理
-
需求場景
傳送事件過程中,遇到錯誤時的處理機制 -
對應操作符型別
- 對應操作符使用
onErrorReturn()
- 作用
遇到錯誤時,傳送1個特殊事件 & 正常終止
可捕獲在它之前發生的異常
- 具體使用
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> e) throws Exception {
e.onNext(1);
e.onNext(2);
e.onError(new Throwable("發生錯誤了"));
}
})
.onErrorReturn(new Function<Throwable, Integer>() {
@Override
public Integer apply(@NonNull Throwable throwable) throws Exception {
// 捕捉錯誤異常
Log.e(TAG, "在onErrorReturn處理了錯誤: "+throwable.toString() );
return 666;
// 發生錯誤事件後,傳送一個"666"事件,最終正常結束
}
})
.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Integer value) {
Log.d(TAG, "接收到了事件"+ value );
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "對Error事件作出響應");
}
@Override
public void onComplete() {
Log.d(TAG, "對Complete事件作出響應");
}
});
- 測試結果
onErrorResumeNext()
- 作用
遇到錯誤時,傳送1個新的Observable
注:
onErrorResumeNext()
攔截的錯誤 =Throwable
;若需攔截Exception
請用onExceptionResumeNext()
- 若
onErrorResumeNext()
攔截的錯誤 =Exception
,則會將錯誤傳遞給觀察者的onError
方法
- 具體使用
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> e) throws Exception {
e.onNext(1);
e.onNext(2);
e.onError(new Throwable("發生錯誤了"));
}
})
.onErrorResumeNext(new Function<Throwable, ObservableSource<? extends Integer>>() {
@Override
public ObservableSource<? extends Integer> apply(@NonNull Throwable throwable) throws Exception {
// 1. 捕捉錯誤異常
Log.e(TAG, "在onErrorReturn處理了錯誤: "+throwable.toString() );
// 2. 發生錯誤事件後,傳送一個新的被觀察者 & 傳送事件序列
return Observable.just(11,22);
}
})
.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Integer value) {
Log.d(TAG, "接收到了事件"+ value );
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "對Error事件作出響應");
}
@Override
public void onComplete() {
Log.d(TAG, "對Complete事件作出響應");
}
});
- 測試結果
onExceptionResumeNext()
- 作用
遇到錯誤時,傳送1個新的Observable
注:
onExceptionResumeNext()
攔截的錯誤 =Exception
;若需攔截Throwable
請用onErrorResumeNext()
- 若
onExceptionResumeNext()
攔截的錯誤 =Throwable
,則會將錯誤傳遞給觀察者的onError
方法
- 具體使用
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> e) throws Exception {
e.onNext(1);
e.onNext(2);
e.onError(new Exception("發生錯誤了"));
}
})
.onExceptionResumeNext(new Observable<Integer>() {
@Override
protected void subscribeActual(Observer<? super Integer> observer) {
observer.onNext(11);
observer.onNext(22);
observer.onComplete();
}
})
.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Integer value) {
Log.d(TAG, "接收到了事件"+ value );
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "對Error事件作出響應");
}
@Override
public void onComplete() {
Log.d(TAG, "對Complete事件作出響應");
}
});
- 測試結果
retry()
- 作用
重試,即當出現錯誤時,讓被觀察者(Observable
)重新發射資料
- 接收到 onError()時,重新訂閱 & 傳送事件
Throwable
和Exception
都可攔截
- 型別
共有5種過載方法
<-- 1. retry() -->
// 作用:出現錯誤時,讓被觀察者重新發送資料
// 注:若一直錯誤,則一直重新發送
<-- 2. retry(long time) -->
// 作用:出現錯誤時,讓被觀察者重新發送資料(具備重試次數限制
// 引數 = 重試次數
<-- 3. retry(Predicate predicate) -->
// 作用:出現錯誤後,判斷是否需要重新發送資料(若需要重新發送& 持續遇到錯誤,則持續重試)
// 引數 = 判斷邏輯
<-- 4. retry(new BiPredicate<Integer, Throwable>) -->
// 作用:出現錯誤後,判斷是否需要重新發送資料(若需要重新發送 & 持續遇到錯誤,則持續重試
// 引數 = 判斷邏輯(傳入當前重試次數 & 異常錯誤資訊)
<-- 5. retry(long time,Predicate predicate) -->
// 作用:出現錯誤後,判斷是否需要重新發送資料(具備重試次數限制
// 引數 = 設定重試次數 & 判斷邏輯
- 具體使用
<-- 1. retry() -->
// 作用:出現錯誤時,讓被觀察者重新發送資料
// 注:若一直錯誤,則一直重新發送
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> e) throws Exception {
e.onNext(1);
e.onNext(2);
e.onError(new Exception("發生錯誤了"));
e.onNext(3);
}
})
.retry() // 遇到錯誤時,讓被觀察者重新發射資料(若一直錯誤,則一直重新發送
.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Integer value) {
Log.d(TAG, "接收到了事件"+ value );
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "對Error事件作出響應");
}
@Override
public void onComplete() {
Log.d(TAG, "對Complete事件作出響應");
}
});
<-- 2. retry(long time) -->
// 作用:出現錯誤時,讓被觀察者重新發送資料(具備重試次數限制
// 引數 = 重試次數
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> e) throws Exception {
e.onNext(1);
e.onNext(2);
e.onError(new Exception("發生錯誤了"));
e.onNext(3);
}
})
.retry(3) // 設定重試次數 = 3次
.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Integer value) {
Log.d(TAG, "接收到了事件"+ value );
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "對Error事件作出響應");
}
@Override
public void onComplete() {
Log.d(TAG, "對Complete事件作出響應");
}
});
<-- 3. retry(Predicate predicate) -->
// 作用:出現錯誤後,判斷是否需要重新發送資料(若需要重新發送& 持續遇到錯誤,則持續重試)
// 引數 = 判斷邏輯
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> e) throws Exception {
e.onNext(1);
e.onNext(2);
e.onError(new Exception("發生錯誤了"));
e.onNext(3);
}
})
// 攔截錯誤後,判斷是否需要重新發送請求
.retry(new Predicate<Throwable>() {
@Override
public boolean test(@NonNull Throwable throwable) throws Exception {
// 捕獲異常
Log.e(TAG, "retry錯誤: "+throwable.toString());
//返回false = 不重新重新發送資料 & 呼叫觀察者的onError結束
//返回true = 重新發送請求(若持續遇到錯誤,就持續重新發送)
return true;
}
})
.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Integer value) {
Log.d(TAG, "接收到了事件"+ value );
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "對Error事件作出響應");
}
@Override
public void onComplete() {
Log.d(TAG, "對Complete事件作出響應");
}
});
<-- 4. retry(new BiPredicate<Integer, Throwable>) -->
// 作用:出現錯誤後,判斷是否需要重新發送資料(若需要重新發送 & 持續遇到錯誤,則持續重試
// 引數 = 判斷邏輯(傳入當前重試次數 & 異常錯誤資訊)
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> e) throws Exception {
e.onNext(1);
e.onNext(2);
e.onError(new Exception("發生錯誤了"));
e.onNext(3);
}
})
// 攔截錯誤後,判斷是否需要重新發送請求
.retry(new BiPredicate<Integer, Throwable>() {
@Override
public boolean test(@NonNull Integer integer, @NonNull Throwable throwable) throws Exception {
// 捕獲異常
Log.e(TAG, "異常錯誤 = "+throwable.toString());
// 獲取當前重試次數
Log.e(TAG, "當前重試次數 = "+integer);
//返回false = 不重新重新發送資料 & 呼叫觀察者的onError結束
//返回true = 重新發送請求(若持續遇到錯誤,就持續重新發送)
return true;
}
})
.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Integer value) {
Log.d(TAG, "接收到了事件"+ value );
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "對Error事件作出響應");
}
@Override
public void onComplete() {
Log.d(TAG, "對Complete事件作出響應");
}
});
<-- 5. retry(long time,Predicate predicate) -->
// 作用:出現錯誤後,判斷是否需要重新發送資料(具備重試次數限制
// 引數 = 設定重試次數 & 判斷邏輯
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> e) throws Exception {
e.onNext(1);
e.onNext(2);
e.onError(new Exception("發生錯誤了"));
e.onNext(3);
}
})
// 攔截錯誤後,判斷是否需要重新發送請求
.retry(3, new Predicate<Throwable>() {
@Override
public boolean test(@NonNull Throwable throwable) throws Exception {
// 捕獲異常
Log.e(TAG, "retry錯誤: "+throwable.toString());
//返回false = 不重新重新發送資料 & 呼叫觀察者的onError()結束
//返回true = 重新發送請求(最多重新發送3次)
return true;
}
})
.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Integer value) {
Log.d(TAG, "接收到了事件"+ value );
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "對Error事件作出響應");
}
@Override
public void onComplete() {
Log.d(TAG, "對Complete事件作出響應");
}
});
retryUntil()
- 作用
出現錯誤後,判斷是否需要重新發送資料
- 若需要重新發送 & 持續遇到錯誤,則持續重試
- 作用類似於
retry(Predicate predicate)
- 具體使用
具體使用類似於retry(Predicate predicate)
,唯一區別:返回true
則不重新發送資料事件。此處不作過多描述
retryWhen()
- 作用
遇到錯誤時,將發生的錯誤傳遞給一個新的被觀察者(Observable
),並決定是否需要重新訂閱原始被觀察者(Observable
)& 傳送事件
- 具體使用
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> e) throws Exception {
e.onNext(1);
e.onNext(2);
e.onError(new Exception("發生錯誤了"));
e.onNext(3);
}
})
// 遇到error事件才會回撥
.retryWhen(new Function<Observable<Throwable>, ObservableSource<?>>() {
@Override
public ObservableSource<?> apply(@NonNull Observable<Throwable> throwableObservable) throws Exception {
// 引數Observable<Throwable>中的泛型 = 上游操作符丟擲的異常,可通過該條件來判斷異常的型別
// 返回Observable<?> = 新的被觀察者 Observable(任意型別)
// 此處有兩種情況:
// 1. 若 新的被觀察者 Observable傳送的事件 = Error事件,那麼 原始Observable則不重新發送事件:
// 2. 若 新的被觀察者 Observable傳送的事件 = Next事件 ,那麼原始的Observable則重新發送事件:
return throwableObservable.flatMap(new Function<Throwable, ObservableSource<?>>() {
@Override
public ObservableSource<?> apply(@NonNull Throwable throwable) throws Exception {
// 1. 若返回的Observable傳送的事件 = Error事件,則原始的Observable不重新發送事件
// 該異常錯誤資訊可在觀察者中的onError()中獲得
return Observable.error(new Throwable("retryWhen終止啦"));
// 2. 若返回的Observable傳送的事件 = Next事件,則原始的Observable重新發送事件(若持續遇到錯誤,則持續重試)
// return Observable.just(1);
}
});
}
})
.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Integer value) {
Log.d(TAG, "接收到了事件"+ value );
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "對Error事件作出響應" + e.toString());
// 獲取異常錯誤資訊
}
@Override
public void onComplete() {
Log.d(TAG, "對Complete事件作出響應");
}
});
- 測試結果
3.4.3.6 重複傳送
-
需求場景
重複不斷地傳送被觀察者事件 -
對應操作符型別
repeat()
&repeatWhen()
repeat()
- 作用
無條件地、重複傳送 被觀察者事件
具備過載方法,可設定重複建立次數
- 具體使用
// 不傳入引數 = 重複傳送次數 = 無限次
repeat();
// 傳入引數 = 重複傳送次數有限
repeatWhen(Integer int );
// 注:
// 1. 接收到.onCompleted()事件後,觸發重新訂閱 & 傳送
// 2. 預設執行在一個新的執行緒上
// 具體使用
Observable.just(1, 2, 3, 4)
.repeat(3) // 重複建立次數 =- 3次
.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "開始採用subscribe連線");
}
@Override
public void onNext(Integer value) {
Log.d(TAG, "接收到了事件" + value);
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "對Error事件作出響應");
}
@Override
public void onComplete() {
Log.d(TAG, "對Complete事件作出響應");
}
});
- 測試結果
repeatWhen()
-
作用
有條件地、重複傳送 被觀察者事件 -
原理
將原始Observable
停止傳送事件的標識(Complete()
/Error()
)轉換成1個Object
型別資料傳遞給1個新被觀察者(Observable
),以此決定是否重新訂閱 & 傳送原來的Observable
- 若新被觀察者(
Observable
)返回1個Complete
/Error
事件,則不重新訂閱 & 傳送原來的Observable
- 若新被觀察者(
Observable
)返回其餘事件時,則重新訂閱 & 傳送原來的Observable
- 具體使用
Observable.just(1,2,4).repeatWhen(new Function<Observable<Object>, ObservableSource<?>>() {
@Override
// 在Function函式中,必須對輸入的 Observable<Object>進行處理,這裡我們使用的是flatMap操作符接收上游的資料
public ObservableSource<?> apply(@NonNull Observable<Object> objectObservable) throws Exception {
// 將原始 Observable 停止傳送事件的標識(Complete() / Error())轉換成1個 Object 型別資料傳遞給1個新被觀察者(Observable)
// 以此決定是否重新訂閱 & 傳送原來的 Observable
// 此處有2種情況:
// 1. 若新被觀察者(Observable)返回1個Complete() / Error()事件,則不重新訂閱 & 傳送原來的 Observable
// 2. 若新被觀察者(Observable)返回其餘事件,則重新訂閱 & 傳送原來的 Observable
return objectObservable.flatMap(new Function<Object, ObservableSource<?>>() {
@Override
public ObservableSource<?> apply(@NonNull Object throwable) throws Exception {
// 情況1:若新被觀察者(Observable)返回1個Complete() / Error()事件,則不重新訂閱 & 傳送原來的 Observable
return Observable.empty();
// Observable.empty() = 傳送Complete事件,但不會回撥觀察者的onComplete()
// return Observable.error(new Throwable("不再重新訂閱事件"));
// 返回Error事件 = 回撥onError()事件,並接收傳過去的錯誤資訊。
// 情況2:若新被觀察者(Observable)返回其餘事件,則重新訂閱 & 傳送原來的 Observable
// return Observable.just(1);
// 僅僅是作為1個觸發重新訂閱被觀察者的通知,傳送的是什麼資料並不重要,只要不是Complete() / Error()事件
}
});
}
})
.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "開始採用subscribe連線");
}
@Override
public void onNext(Integer value) {
Log.d(TAG, "接收到了事件" + value);
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "對Error事件作出響應:" + e.toString());
}
@Override
public void onComplete() {
Log.d(TAG, "對Complete事件作出響應");
}
});
- 測試結果