RxJava的操作符3
本篇文章繼續介紹以下型別的操作符
- Combining Observables(Observable的組合操作符)
- Error Handling Operators(Observable的錯誤處理操作符)
Combining Observables(Observable的組合操作符)
combineLatest操作符
combineLatest操作符把兩個Observable產生的結果進行合併,合併的結果組成一個新的Observable。這兩個Observable中任意一個Observable產生的結果,都和另一個Observable最後產生的結果,按照一定的規則進行合併。流程圖如下:
呼叫例子如下:
//產生0,5,10,15,20數列
Observable<Long> observable1 = Observable.timer(0, 1000, TimeUnit.MILLISECONDS)
.map(new Func1<Long, Long>() {
@Override
public Long call(Long aLong) {
return aLong * 5;
}
}).take(5 );
//產生0,10,20,30,40數列
Observable<Long> observable2 = Observable.timer(500, 1000, TimeUnit.MILLISECONDS)
.map(new Func1<Long, Long>() {
@Override
public Long call(Long aLong) {
return aLong * 10;
}
}).take(5 );
Observable.combineLatest(observable1, observable2, new Func2<Long, Long, Long>() {
@Override
public Long call(Long aLong, Long aLong2) {
return aLong+aLong2;
}
}).subscribe(new Subscriber<Long>() {
@Override
public void onCompleted() {
System.out.println("Sequence complete.");
}
@Override
public void onError(Throwable e) {
System.err.println("Error: " + e.getMessage());
}
@Override
public void onNext(Long aLong) {
System.out.println("Next: " + aLong);
}
});
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
執行結果如下:
Next: 0
Next: 5
Next: 15
Next: 20
Next: 30
Next: 35
Next: 45
Next: 50
Next: 60
Sequence complete.
join操作符
join操作符把類似於combineLatest操作符,也是兩個Observable產生的結果進行合併,合併的結果組成一個新的Observable,但是join操作符可以控制每個Observable產生結果的生命週期,在每個結果的生命週期內,可以與另一個Observable產生的結果按照一定的規則進行合併,流程圖如下:
join方法的用法如下:
observableA.join(observableB,
observableA產生結果生命週期控制函式,
observableB產生結果生命週期控制函式,
observableA產生的結果與observableB產生的結果的合併規則)
呼叫例子如下:
//產生0,5,10,15,20數列
Observable<Long> observable1 = Observable.timer(0, 1000, TimeUnit.MILLISECONDS)
.map(new Func1<Long, Long>() {
@Override
public Long call(Long aLong) {
return aLong * 5;
}
}).take(5);
//產生0,10,20,30,40數列
Observable<Long> observable2 = Observable.timer(500, 1000, TimeUnit.MILLISECONDS)
.map(new Func1<Long, Long>() {
@Override
public Long call(Long aLong) {
return aLong * 10;
}
}).take(5);
observable1.join(observable2, new Func1<Long, Observable<Long>>() {
@Override
public Observable<Long> call(Long aLong) {
//使Observable延遲600毫秒執行
return Observable.just(aLong).delay(600, TimeUnit.MILLISECONDS);
}
}, new Func1<Long, Observable<Long>>() {
@Override
public Observable<Long> call(Long aLong) {
//使Observable延遲600毫秒執行
return Observable.just(aLong).delay(600, TimeUnit.MILLISECONDS);
}
}, new Func2<Long, Long, Long>() {
@Override
public Long call(Long aLong, Long aLong2) {
return aLong + aLong2;
}
}).subscribe(new Subscriber<Long>() {
@Override
public void onCompleted() {
System.out.println("Sequence complete.");
}
@Override
public void onError(Throwable e) {
System.err.println("Error: " + e.getMessage());
}
@Override
public void onNext(Long aLong) {
System.out.println("Next: " + aLong);
}
});
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
- 45
- 46
- 47
- 48
- 49
- 50
- 51
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
- 45
- 46
- 47
- 48
- 49
- 50
- 51
執行結果如下:
Next: 0
Next: 5
Next: 15
Next: 20
Next: 30
Next: 35
Next: 45
Next: 50
Next: 60
Sequence complete.
groupJoin操作符
groupJoin操作符非常類似於join操作符,區別在於join操作符中第四個引數的傳入函式不一致,其流程圖如下:
呼叫例子如下:
//產生0,5,10,15,20數列
Observable<Long> observable1 = Observable.timer(0, 1000, TimeUnit.MILLISECONDS)
.map(new Func1<Long, Long>() {
@Override
public Long call(Long aLong) {
return aLong * 5;
}
}).take(5);
//產生0,10,20,30,40數列
Observable<Long> observable2 = Observable.timer(500, 1000, TimeUnit.MILLISECONDS)
.map(new Func1<Long, Long>() {
@Override
public Long call(Long aLong) {
return aLong * 10;
}
}).take(5);
observable1.groupJoin(observable2, new Func1<Long, Observable<Long>>() {
@Override
public Observable<Long> call(Long aLong) {
return Observable.just(aLong).delay(1600, TimeUnit.MILLISECONDS);
}
}, new Func1<Long, Observable<Long>>() {
@Override
public Observable<Long> call(Long aLong) {
return Observable.just(aLong).delay(600, TimeUnit.MILLISECONDS);
}
}, new Func2<Long, Observable<Long>, Observable<Long>>() {
@Override
public Observable<Long> call(Long aLong, Observable<Long> observable) {
return observable.map(new Func1<Long, Long>() {
@Override
public Long call(Long aLong2) {
return aLong + aLong2;
}
});
}
}).subscribe(new Subscriber<Observable<Long>>() {
@Override
public void onCompleted() {
System.out.println("Sequence complete.");
}
@Override
public void onError(Throwable e) {
System.err.println("Error: " + e.getMessage());
}
@Override
public void onNext(Observable<Long> observable) {
observable.subscribe(new Subscriber<Long>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(Long aLong) {
System.out.println("Next: " + aLong);
}
});
}
});
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
- 45
- 46
- 47
- 48
- 49
- 50
- 51
- 52
- 53
- 54
- 55
- 56
- 57
- 58
- 59
- 60
- 61
- 62
- 63
- 64
- 65
- 66
- 67
- 68
- 69
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
- 45
- 46
- 47
- 48
- 49
- 50
- 51
- 52
- 53
- 54
- 55
- 56
- 57
- 58
- 59
- 60
- 61
- 62
- 63
- 64
- 65
- 66
- 67
- 68
- 69
執行結果如下:
Next: 0
Next: 5
Next: 10
Next: 15
Next: 20
Next: 25
Next: 30
Next: 35
Next: 40
Next: 45
Next: 50
Next: 60
Next: 55
Sequence complete.
merge操作符
merge操作符是按照兩個Observable提交結果的時間順序,對Observable進行合併,如ObservableA每隔500毫秒產生資料為0,5,10,15,20;而ObservableB每隔500毫秒產生資料0,10,20,30,40,其中第一個資料延遲500毫秒產生,最後合併結果為:0,0,5,10,10,20,15,30,20,40;其流程圖如下:
呼叫例子如下:
//產生0,5,10,15,20數列
Observable<Long> observable1 = Observable.timer(0, 1000, TimeUnit.MILLISECONDS)
.map(new Func1<Long, Long>() {
@Override
public Long call(Long aLong) {
return aLong * 5;
}
}).take(5);
//產生0,10,20,30,40數列
Observable<Long> observable2 = Observable.timer(500, 1000, TimeUnit.MILLISECONDS)
.map(new Func1<Long, Long>() {
@Override
public Long call(Long aLong) {
return aLong * 10;
}
}).take(5);
Observable.merge(observable1, observable2)
.subscribe(new Subscriber<Long>() {
@Override
public void onCompleted() {
System.out.println("Sequence complete.");
}
@Override
public void onError(Throwable e) {
System.err.println("Error: " + e.getMessage());
}
@Override
public void onNext(Long aLong) {
System.out.println("Next:" + aLong);
}
});
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
執行結果如下:
Next:0
Next:0
Next:5
Next:10
Next:10
Next:20
Next:15
Next:30
Next:20
Next:40
Sequence complete.
mergeDelayError操作符
從merge操作符的流程圖可以看出,一旦合併的某一個Observable中出現錯誤,就會馬上停止合併,並對訂閱者回調執行onError方法,而mergeDelayError操作符會把錯誤放到所有結果都合併完成之後才執行,其流程圖如下:
呼叫例子如下:
//產生0,5,10數列,最後會產生一個錯誤
Observable<Long> errorObservable = Observable.error(new Exception("this is end!"));
Observable < Long > observable1 = Observable.timer(0, 1000, TimeUnit.MILLISECONDS)
.map(new Func1<Long, Long>() {
@Override
public Long call(Long aLong) {
return aLong * 5;
}
}).take(3).mergeWith(errorObservable.delay(3500, TimeUnit.MILLISECONDS));
//產生0,10,20,30,40數列
Observable<Long> observable2 = Observable.timer(500, 1000, TimeUnit.MILLISECONDS)
.map(new Func1<Long, Long>() {
@Override
public Long call(Long aLong) {
return aLong * 10;
}
}).take(5);
Observable.mergeDelayError(observable1, observable2)
.subscribe(new Subscriber<Long>() {
@Override
public void onCompleted() {
System.out.println("Sequence complete.");
}
@Override
public void onError(Throwable e) {
System.err.println("Error: " + e.getMessage());
}
@Override
public void onNext(Long aLong) {
System.out.println("Next:" + aLong);
}
});
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
執行結果如下:
Next:0
Next:0
Next:5
Next:10
Next:10
Next:20
Next:30
Next:40
Error: this is end!
startWith操作符
startWith操作符是在源Observable提交結果之前,插入指定的某些資料,其流程圖如下:
呼叫例子如下:
Observable.just(10,20,30).startWith(2, 3, 4).subscribe(new Subscriber<Integer>() {
@Override
public void onCompleted() {
System.out.println("Sequence complete.");
}
@Override
public void onError(Throwable e) {
System.err.println("Error: " + e.getMessage());
}
@Override
public void onNext(Integer value) {
System.out.println("Next:" + value);
}
});
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
執行結果如下:
Next:2
Next:3
Next:4
Next:10
Next:20
Next:30
Sequence complete.
switchOnNext操作符
switchOnNext操作符是把一組Observable轉換成一個Observable,轉換規則為:對於這組Observable中的每一個Observable所產生的結果,如果在同一個時間記憶體在兩個或多個Observable提交的結果,只取最後一個Observable提交的結果給訂閱者,其流程圖如下:
呼叫例子如下:
//每隔500毫秒產生一個observable
Observable<Observable<Long>> observable = Observable.timer(0, 500, TimeUnit.MILLISECONDS).map(new Func1<Long, Observable<Long>>() {
@Override
public Observable<Long> call(Long aLong) {
//每隔200毫秒產生一組資料(0,10,20,30,40)
return Observable.timer(0, 200, TimeUnit.MILLISECONDS).map(new Func1<Long, Long>() {
@Override
public Long call(Long aLong) {
return aLong * 10;
}
}).take(5);
}
}).take(2);
Observable.switchOnNext(observable).subscribe(new Subscriber<Long>() {
@Override
public void onCompleted() {
System.out.println("Sequence complete.");
}
@Override
public void onError(Throwable e) {
System.err.println("Error: " + e.getMessage());
}
@Override
public void onNext(Long aLong) {
System.out.println("Next:" + aLong);
}
});
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
執行結果如下:
Next:0
Next:10
Next:20
Next:0
Next:10
Next:20
Next:30
Next:40
Sequence complete.
zip操作符
zip操作符是把兩個observable提交的結果,嚴格按照順序進行合併,其流程圖如下:
呼叫例子如下:
Observable<Integer> observable1 = Observable.just(10,20,30);
Observable<Integer> observable2 = Observable.just(4, 8, 12, 16);
Observable.zip(observable1, observable2, new Func2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer integer, Integer integer2) {
return integer + integer2;
}
}).subscribe(new Subscriber<Integer>() {
@Override
public void onCompleted() {
System.out.println("Sequence complete.");
}
@Override
public void onError(Throwable e) {
System.err.println("Error: " + e.getMessage());
}
@Override
public void onNext(Integer value) {
System.out.println("Next:" + value);
}
});
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
執行結果如下:
Next:14
Next:28
Next:42
Sequence complete.
Error Handling Operators(Observable的錯誤處理操作符)
onErrorReturn操作符
onErrorReturn操作符是在Observable發生錯誤或異常的時候(即將回調oError方法時),攔截錯誤並執行指定的邏輯,返回一個跟源Observable相同型別的結果,最後回撥訂閱者的onComplete方法,其流程圖如下:
呼叫例子如下:
Observable<Integer> observable = Observable.create(new Observable.OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> subscriber) {
if (subscriber.isUnsubscribed()) return;
//迴圈輸出數字
try {
for (int i = 0; i < 10; i++) {
if (i == 4) {
throw new Exception("this is number 4 error!");
}
subscriber.onNext(i);
}
subscriber.onCompleted();
} catch (Exception e) {
subscriber.onError(e);
}
}
});
observable.onErrorReturn(new Func1<Throwable, Integer>() {
@Override
public Integer call(Throwable throwable) {
return 1004;
}
}).subscribe(new Subscriber<Integer>() {
@Override
public void onCompleted() {
System.out.println("Sequence complete.");
}
@Override
public void onError(Throwable e) {
System.err.println("Error: " + e.getMessage());
}
@Override
public void onNext(Integer value) {
System.out.println("Next:" + value);
}
});
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
執行結果如下:
Next:0
Next:1
Next:2
Next:3
Next:1004
Sequence complete.
onErrorResumeNext操作符
onErrorResumeNext操作符跟onErrorReturn類似,只不過onErrorReturn只能在錯誤或異常發生時只返回一個和源Observable相同型別的結果,而onErrorResumeNext操作符是在錯誤或異常發生時返回一個Observable,也就是說可以返回多個和源Observable相同型別的結果,其流程圖如下:
呼叫例子如下:
Observable<Integer> observable = Observable.create(new Observable.OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> subscriber) {
if (subscriber.isUnsubscribed()) return;
//迴圈輸出數字
try {
for (int i = 0; i < 10; i++) {
if (i == 4) {
throw new Exception("this is number 4 error!");
}
subscriber.onNext(i);
}
subscriber.onCompleted();
} catch (Exception e) {
subscriber.onError(e);
}
}
});
observable.onErrorResumeNext(new Func1<Throwable, Observable<? extends Integer>>() {
@Override
public Observable<? extends Integer> call(Throwable throwable) {
return Observable.just(100,101, 102);
}
}).subscribe(new Subscriber<Integer>() {
@Override
public void onCompleted() {
System.out.println("Sequence complete.");
}
@Override
public void onError(Throwable e) {
System.err.println("Error: " + e.getMessage());
}
@Override
public void onNext(Integer value) {
System.out.println("Next:" + value);
}
});
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
執行結果如下:
Next:0
Next:1
Next:2
Next:3
Next:100
Next:101
Next:102
Sequence complete.
onExceptionResumeNext操作符
onExceptionResumeNext操作符和onErrorResumeNext操作符類似,不同的地方在於onErrorResumeNext操作符是當Observable發生錯誤或異常時觸發,而onExceptionResumeNext是當Observable發生異常時才觸發。
這裡要普及一個概念就是,Java的異常分為錯誤(error)和異常(exception)兩種,它們都是繼承於Throwable類。
錯誤(error)一般是比較嚴重的系統問題,比如我們經常遇到的OutOfMemoryError、StackOverflowError等都是錯誤。錯誤一般繼承於Error類,而Error類又繼承於Throwable類,如果需要捕獲錯誤,需要使用try..catch(Error e)或者try..catch(Throwable e)句式。使用try..catch(Exception e)句式無法捕獲錯誤
異常(Exception)也是繼承於Throwable類,一般是根據實際處理業務丟擲的異常,分為執行時異常(RuntimeException)和普通異常。普通異常直接繼承於Exception類,如果方法內部沒有通過try..catch句式進行處理,必須通過throws關鍵字把異常丟擲外部進行處理(即checked異常);而執行時異常繼承於RuntimeException類,如果方法內部沒有通過try..catch句式進行處理,不需要顯式通過throws關鍵字丟擲外部,如IndexOutOfBoundsException、NullPointerException、ClassCastException等都是執行時異常,當然RuntimeException也是繼承於Exception類,因此是可以通過try..catch(Exception
e)句式進行捕獲處理的。
onExceptionResumeNext流程圖如下:
呼叫例子如下:
Observable<Integer> observable = Observable.create(new Observable.OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> subscriber) {
if (subscriber.isUnsubscribed()) return;
//迴圈輸出數字
try {
for (int i = 0; i < 10; i++) {
if (i == 4) {
throw new Exception("this is number 4 error!");
}
subscriber.onNext(i);
}
subscriber.onCompleted();
} catch (Throwable e) {
subscriber.onError(e);
}
}
});
observable.onExceptionResumeNext(Observable.just(100, 101, 102)).subscribe(new Subscriber<Integer>() {
@Override
public void onCompleted() {
System.out.println("Sequence complete.");
}
@Override
public void onError(Throwable e) {
System.err.println("Error: " + e.getMessage());
}
@Override
public void onNext(Integer value) {
System.out.println("Next:" + value);
}
});
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
執行結果如下:
Next:0
Next:1
Next:2
Next:3
Next:100
Next:101
Next:102
Sequence complete.
retry操作符
retry操作符是當Observable發生錯誤或者異常時,重新嘗試執行Observable的邏輯,如果經過n次重新嘗試執行後仍然出現錯誤或者異常,則最後回撥執行onError方法;當然如果源Observable沒有錯誤或者異常出現,則按照正常流程執行。其流程圖如下:
呼叫例子如下:
Observable<Integer> observable = Observable.create(new Observable.OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> subscriber) {
if (subscriber.isUnsubscribed()) return;
//迴圈輸出數字
try {
for (int i = 0; i < 10; i++) {
if (i == 4) {
throw new Exception("this is number 4 error!");
}
subscriber.onNext(i);
}
subscriber.onCompleted();
} catch (Throwable e) {
subscriber.onError(e);
}
}
});
observable.retry(2).subscribe(new Subscriber<Integer>() {
@Override
public void onCompleted() {
System.out.println("Sequence complete.");
}
@Override
public void onError(Throwable e) {
System.err.println("Error: " + e.getMessage());
}
@Override
public void onNext(Integer value) {
System.out.println("Next:" + value);
}
});
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
執行結果如下:
Next:0
Next:1
Next:2
Next:3
Next:0
Next:1
Next:2
Next:3
Next:0
Next:1
Next:2
Next:3
Error: this is number 4 error!
retryWhen操作符
retryWhen操作符類似於retry操作符,都是在源observable出現錯誤或者異常時,重新嘗試執行源observable的邏輯,不同在於retryWhen操作符是在源Observable出現錯誤或者異常時,通過回撥第二個Observable來判斷是否重新嘗試執行源Observable的邏輯,如果第二個Observable沒有錯誤或者異常出現,則就會重新嘗試執行源Observable的邏輯,否則就會直接回調執行訂閱者的onError方法。其流程圖如下:
呼叫例子如下:
Observable<Integer> observable = Observable.create(new Observable.OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> subscriber) {
System.out.println("subscribing");
subscriber.onError(new RuntimeException("always fails"));
}
});
observable.retryWhen(new Func1<Observable<? extends Throwable>, Observable<?>>() {
@Override
public Observable<?> call(Observable<? extends Throwable> observable) {
return observable.zipWith(Observable.range(1, 3), new Func2<Throwable, Integer, Integer>() {
@Override
public Integer call(Throwable throwable, Integer integer) {
return integer;
}
}).flatMap(new Func1<Integer, Observable<?>>() {
@Override
public Observable<?> call(Integer integer) {
System.out.println("delay retry by " + integer + " second(s)");
//每一秒中執行一次
return Observable.timer(integer, TimeUnit.SECONDS);
}
});
}
}).subscribe(new Subscriber<Integer>() {
@Override
public void onCompleted() {
System.out.println("Sequence complete.");
}
@Override
public void onError(Throwable e) {
System.err.println("Error: " + e.getMessage());
}
@Override
public void onNext(Integer value) {
System.out.println("Next:" + value);
}
});
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
執行結果如下:
subscribing
delay retry by 1 second(s)
subscribing
delay retry by 2 second(s)
subscribing
delay retry by 3 second(s)
subscribing
Sequence complete.
好了,先介紹這麼多,下回繼續介紹其他的操作符,敬請期待!