RxJava 錯誤處理操作符(Error Handling Operators)
RxJava系列教程:
一般來說,Observable不會拋異常。它會呼叫 onError 終止Observable序列,以此通知所有的觀察者發生了一個不可恢復的錯誤。 但是,也存在一些異常。例如,如果 onError 呼叫失敗了,Observable不會嘗試再次呼叫 onError 去通知觀察者,它會丟擲 RuntimeException,OnErrorFailedException 或者 OnErrorNotImplementedException。
有時我們希望觀察者或者操作符應該對異常發生時的 onError 通知做出合適的響應,而不是捕獲(catch)異常。很多操作符可用於對Observable發射的onError通知做出響應或者從錯誤中恢復,例如,你可以:
- 吞掉這個錯誤,切換到一個備用的Observable繼續發射資料
- 吞掉這個錯誤然後發射預設值
- 吞掉這個錯誤並立即嘗試重啟這個Observable
- 吞掉這個錯誤,在一些回退間隔後重啟這個Observable
我們可以使用Error handling相關的操作符來集中統一地處理錯誤。RxJava中錯誤處理的操作符為 Catch和 Retry。
Catch
Catch操作符能夠攔截原始Observable的onError通知,不讓Observable因為產生錯誤而終止。相當於Java中try/catch操作,不能因為拋異常而導致程式崩潰。
RxJava將Catch實現為三個不同的操作符:
onErrorReturn:讓Observable遇到錯誤時發射一個特殊的項並且正常終止。
onErrorResumeNext:讓Observable在遇到錯誤時開始發射第二個Observable的資料序列。
onExceptionResumeNext:讓Observable在遇到錯誤時繼續發射後面的資料項。
onErrorReturn
onErrorReturn方法 返回一個映象原有Observable行為的新Observable
會忽略前者的onError呼叫,不會將錯誤傳遞給觀察者,而是發射一個特殊的項並呼叫觀察者的onCompleted方法。
API
Javadoc: onErrorReturn(Func1))
示例程式碼
/*
* onErrorReturn:
* 返回一個原有Observable行為的新Observable映象,
* 後者會忽略前者的onError呼叫,不會將錯誤傳遞給觀察者,
* 作為替代,它會發發射一個特殊的項並呼叫觀察者的onCompleted方法
*/
createObserver()
//作為替代,它會發發射一個特殊的項並呼叫觀察者的onCompleted方法。
.onErrorReturn(new Func1<Throwable, String>() {
@Override
public String call(Throwable throwable) {
return "do something";
}
})
.subscribe(new Subscriber<String>() {
@Override
public void onCompleted() {
System.out.println("onCompleted");
}
@Override
public void onNext(String value) {
System.out.println("onSuccess value = " + value);
}
@Override
public void onError(Throwable error) {
System.out.println("onError error = " + error);
}
});
這裡建立Observable的方法(2個)如下:
private static Observable<String> createObserver() {
return Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
for (int i = 1; i <= 6; i++) {
if (i < 3) {
subscriber.onNext(i+"");
} else {
//會忽略onError呼叫,不會將錯誤傳遞給觀察者
subscriber.onError(new Throwable("Throw error"));
}
}
}
});
}
private static Observable<String> createObserver2() {
return Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
for (int i = 1; i <= 6; i++) {
if (i < 3) {
subscriber.onNext("onNext:" + i);
} else {
subscriber.onError(new Exception("the nubmer is greater than 3"));
//下面寫法也是可以的
/*try {
throw new Exception("the nubmer is greater than 3");
} catch (Exception e) {
subscriber.onError(e);
}*/
}
}
}
});
}
輸出結果如下:
onSuccess value = 1
onSuccess value = 2
onSuccess value = do something
onCompleted
在手動建立Observale時,當Observable傳送了第二個資料後,Observable傳送了onError通知,然後又傳送了1個數據。而在onErrorReturn方法處理中,其引數函式中,建立並返回了一個特殊項( do something).
從Log列印可以看出,觀察者並沒有執行onError方法,意味著Observale並沒有接收到onError通知,而是接收到了一個特殊項後,呼叫了onCompleted方法,結束了此次訂閱。而這個特殊項,正是在onErrorReturn中引數函式中,建立的特殊項。
onErrorResumeNext
onErrorResumeNext方法與onErrorReturn()方法類似,都是攔截原Observable的onError通知,不同的是攔截後的處理方式,onErrorReturn建立並返回一個特殊項,而onErrorResumeNext建立並返回一個新的Observabl,觀察者會訂閱它,並接收其發射的資料。
API
Javadoc: onErrorResumeNext(Func1))
Javadoc: onErrorResumeNext(Observable))
示例程式碼
createObserver()
.onErrorResumeNext(new Func1<Throwable, Observable<? extends String>>() {
@Override
public Observable<String> call(Throwable t) {
return Observable.just("a","b","c");
}
})
.subscribe(new Subscriber<String>() {
@Override
public void onCompleted() {
System.out.println("onCompleted");
}
@Override
public void onNext(String value) {
System.out.println("onSuccess value = " + value);
}
@Override
public void onError(Throwable error) {
System.out.println("onError error = " + error);
}
});
輸出結果如下:
onSuccess value = 1
onSuccess value = 2
onSuccess value = a
onSuccess value = b
onSuccess value = c
onCompleted
在手動建立Observale時,當Observable傳送了第二個資料後,Observable傳送了onError通知,然後又傳送了3個數據。在onErrorResumeNext方法中的引數函式中,建立了一個新的Observable。
從Log列印可以看出,觀察者並沒有執行onError方法,意味著Observale並沒有接收到onError通知,而是接收到了新建的建立了一個新的Observable發射的出具。在新Observable發射完資料後,呼叫了onCompleted方法,結束了此次訂閱。
onExceptionResumeNext
onExceptionResumeNext方法與onErrorResumeNext方法類似建立並返回一個擁有類似原Observable的新Observable,,也使用這個備用的Observable。不同的是,如果onError收到的Throwable不是一個Exception,它會將錯誤傳遞給觀察者的onError方法,不會使用備用的Observable。
這裡要普及一個概念,Java的異常分為錯誤(error)和異常(exception)兩種,它們都是繼承於Throwable類。
錯誤(error)一般是比較嚴重的系統問題,比如我們經常遇到的OutOfMemoryError、StackOverflowError等都是錯誤。錯誤一般繼承於Error類,而Error類又繼承於Throwable類,如果需要捕獲錯誤,需要使用try..catch(Error e)或者try..catch(Throwable e)句式。使用try..catch(Exception e)句式無法捕獲錯誤
異常(Exception)也是繼承於Throwable類,一般是根據實際處理業務丟擲的異常,分為執行時異常(RuntimeException)和普通異常。普通異常直接繼承於Exception類,如果方法內部沒有通過try..catch句式進行處理,必須通過throws關鍵字把異常丟擲外部進行處理(即checked異常);而執行時異常繼承於RuntimeException類,如果方法內部沒有通過try..catch句式進行處理,不需要顯式通過throws關鍵字丟擲外部,如IndexOutOfBoundsException、NullPointerException、ClassCastException等都是執行時異常,當然RuntimeException也是繼承於Exception類,因此是可以通過try..catch(Exception e)句式進行捕獲處理的。
API
Javadoc: onExceptionResumeNext(Observable))
示例程式碼
/*
* onExceptionResumeNext:
* 和onErrorResumeNext類似,可以說是onErrorResumeNext的特例,
* 區別是如果onError收到的Throwable不是一個Exception,它會將錯誤傳遞給觀察者的onError方法,不會使用備用的Observable。
*/
createObserver()
.onExceptionResumeNext(Observable.just("www.stay4it.com"))
.subscribe(new Subscriber<String>() {
@Override
public void onCompleted() {
System.out.println("onCompleted");
}
@Override
public void onNext(String value) {
System.out.println("onSuccess value = " + value);
}
@Override
public void onError(Throwable error) {
System.out.println("onError error = " + error);
}
});
輸出結果如下:
onSuccess value = 1
onSuccess value = 2
onError error = java.lang.Throwable: Throw error
從Log列印可以看出,沒有使用備用的Observable,這是因為onError收到的Throwable不是一個Exception,所以將錯誤傳遞給觀察者的onError方法。那麼怎麼才能使用備用的Observable呢?程式碼如下:
createObserver2()// 注意這裡
.onExceptionResumeNext(Observable.just("www.stay4it.com"))
.subscribe(new Subscriber<String>() {
@Override
public void onCompleted() {
System.out.println("onCompleted");
}
@Override
public void onNext(String value) {
System.out.println("onSuccess value = " + value);
}
@Override
public void onError(Throwable error) {
System.out.println("onError error = " + error);
}
});
輸出結果如下:
onSuccess value = onNext:1
onSuccess value = onNext:2
onSuccess value = www.stay4it.com
onCompleted
Retry
顧名思義,retry的意思就是試著重來,當原始Observable發射onError通知時,retry操作符不會讓onError通知傳遞給觀察者,它會重新訂閱這個Observable一次或者多次(意味著重新從頭髮射資料),所以可能造成資料項重複傳送的情況。
如果重新訂閱了指定的次數還是發射了onError通知,將不再嘗試重新訂閱,它會把最新的一個onError通知傳遞給觀察者。
RxJava中將Retry操作符的實現為retry和retryWhen兩種。
retry操作符預設在trampoline排程器上執行。
Javadoc: retry():無論收到多少次onError通知,都會繼續訂閱並重發原始Observable,直到onCompleted。
Javadoc: retry(long):接受count引數的retry會最多重新訂閱count次,如果次數超過了就不會嘗試再次訂閱,它會把最新的一個onError通知傳遞給他的觀察者。
Javadoc: retry(Func2): 這個版本的retry接受一個謂詞函式作為引數,這個函式的兩個引數是:重試次數和導致發射onError通知的Throwable。這個函式返回一個布林值,如果返回true,retry應該再次訂閱和映象原始的Observable,如果返回false,retry會將最新的一個onError通知傳遞給它的觀察者。
API
Javadoc: retry()
Javadoc: retry(long)
Javadoc: retry(Func2)
示例程式碼
/**
* retry()
* 無限次嘗試重新訂閱
*/
createObserver()
.retry()
.subscribe(new Subscriber<String>() {
@Override
public void onCompleted() {
System.out.println("onCompleted");
}
@Override
public void onNext(String value) {
System.out.println("onSuccess value = " + value);
}
@Override
public void onError(Throwable error) {
System.out.println("onError error = " + error);
}
});
/**
* retry(count)
* 最多2次嘗試重新訂閱
*/
createObserver()
.retry(2)
.subscribe(new Subscriber<String>() {
@Override
public void onCompleted() {
System.out.println("onCompleted");
}
@Override
public void onNext(String value) {
System.out.println("onSuccess value = " + value);
}
@Override
public void onError(Throwable error) {
System.out.println("onError error = " + error);
}
});
/**
* retry(Func2)
*/
createObserver()
.retry(new Func2<Integer, Throwable, Boolean>() {
@Override
public Boolean call(Integer t1, Throwable throwable) {
System.out.println("發生錯誤了:"+throwable.getMessage()+",第"+t1+"次重新訂閱");
if(t1>2){
return false;//不再重新訂閱
}
//此處也可以通過判斷throwable來控制不同的錯誤不同處理
return true;
}
})
.subscribe(new Subscriber<String>() {
@Override
public void onCompleted() {
System.out.println("onCompleted");
}
@Override
public void onNext(String value) {
System.out.println("onSuccess value = " + value);
}
@Override
public void onError(Throwable error) {
System.out.println("onError error = " + error);
}
});
輸出結果如下:
onSuccess value = 1
onSuccess value = 2
onSuccess value = 1
onSuccess value = 2
…無限次
onSuccess value = 1
onSuccess value = 2
onSuccess value = 1
onSuccess value = 2
onSuccess value = 1
onSuccess value = 2
onError error = java.lang.Throwable: Throw error
onSuccess value = 1
onSuccess value = 2
發生錯誤了:Throw error,第1次重新訂閱
onSuccess value = 1
onSuccess value = 2
發生錯誤了:Throw error,第2次重新訂閱
onSuccess value = 1
onSuccess value = 2
發生錯誤了:Throw error,第3次重新訂閱
onError error = java.lang.Throwable: Throw error
retryWhen
retryWhen和retry類似,區別是,retryWhen將onError中的Throwable傳遞給一個函式,這個函式產生另一個Observable,retryWhen觀察它的結果再決定是不是要重新訂閱原始的Observable。如果這個Observable發射了一項資料,它就重新訂閱,如果這個Observable發射的是onError通知,它就將這個通知傳遞給觀察者然後終止。
retryWhen()預設在trampoline排程器上執行,可以通過引數指定其它的排程器。
API
Javadoc: retryWhen(Func1)
Javadoc: retryWhen(Func1,Scheduler)
示例程式碼
int retryCount = 0;
final int maxRetries = 3;
Observable.create(new Observable.OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> subscriber) {
subscriber.onError(new RuntimeException("always fails"));
}
})
.subscribeOn(Schedulers.immediate())
.retryWhen(new Func1<Observable<? extends Throwable>, Observable<?>>() {
@Override
public Observable<?> call(Observable<? extends Throwable> observable) {
return observable.flatMap(new Func1<Throwable, Observable<?>>() {
@Override
public Observable<?> call(Throwable throwable) {
if (++retryCount <= maxRetries) {
// When this Observable calls onNext, the original Observable will be retried (i.e. re-subscribed).
System.out.println("get error, it will try after " + 1000 + " millisecond, retry count " + retryCount);
return Observable.timer(1000, TimeUnit.MILLISECONDS);
}
return Observable.error(throwable);
}
});
}
})
.subscribe(new Subscriber<Integer>() {
@Override
public void onCompleted() {
System.out.println("onCompleted");
}
@Override
public void onNext(Integer value) {
System.out.println("onSuccess value = " + value);
}
@Override
public void onError(Throwable error) {
System.out.println("onError error = " + error);
}
});
輸出結果如下:
get error, it will try after 1000 millisecond, retry count 1
get error, it will try after 1000 millisecond, retry count 2
get error, it will try after 1000 millisecond, retry count 3
onError error = java.lang.RuntimeException: always fails