RX操作之輔助操作符二(doonunsubscribe、doOnCompleted、doOnError、doOnTerminate、finallyDo、delay、delaySubscription)
阿新 • • 發佈:2019-01-09
一、doOnUnSubscribe
取消訂閱時的監聽
Observable<Integer> observable = Observable.just(1,2,3,4,5,6); Subscriber<Integer> subscriber = new Subscriber<Integer>() { @Override public void onNext(Integer v) { Log.e(TAG,"onNext................."+v); } @Override public void onCompleted() { Log.e(TAG, "onCompleted................."); } @Override public void onError(Throwable e) { Log.e(TAG, "onError....................."); } }; observable .doOnUnsubscribe(new Action0() { @Override public void call() { Log.e(TAG, "觀察者取消訂閱了它生成的Observable....................."); } }) .subscribe(subscriber);
執行結果:
二、doOnCompleted
Observable正常終止時的監聽
Observable<Integer> observable = Observable.just(1,2,3,4,5,6); Subscriber<Integer> subscriber = new Subscriber<Integer>() { @Override public void onNext(Integer v) { Log.e(TAG,"onNext................."+v); } @Override public void onCompleted() { Log.e(TAG, "onCompleted................."); } @Override public void onError(Throwable e) { Log.e(TAG, "onError....................."); } }; observable .doOnCompleted(new Action0() { @Override public void call() { Log.e(TAG, "Observable正常終止了....................."); } }) .subscribe(subscriber);
執行結果:
三、doOnError
出錯時的監聽
Observable<Integer> observable = Observable.create(new Observable.OnSubscribe<Integer>() { @Override public void call(Subscriber<? super Integer> subscriber) { for (int i = 0; i < 5; i++) { if(i == 3){ subscriber.onError(new Throwable("EROOR")); }else { subscriber.onNext(i); } try { Thread.sleep(1000); } catch (Exception e) { } } subscriber.onCompleted(); } }); Subscriber<Integer> subscriber = new Subscriber<Integer>() { @Override public void onNext(Integer v) { Log.e(TAG,"onNext................."+v); } @Override public void onCompleted() { Log.e(TAG, "onCompleted................."); } @Override public void onError(Throwable e) { Log.e(TAG, "onError....................."); } }; observable .doOnError(new Action1<Throwable>() { @Override public void call(Throwable throwable) { Log.e(TAG, "出錯了....................."+throwable.toString()); } }) .subscribe(subscriber);
執行結果:
四、doOnTerminate
訂閱即將被終止時的監聽,無論是正常終止還是異常終止
Observable<Integer> observable = Observable.create(new Observable.OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> subscriber) {
for (int i = 0; i < 5; i++) {
if(i == 3){
subscriber.onError(new Throwable("EROOR"));
}else {
subscriber.onNext(i);
}
try {
Thread.sleep(1000);
} catch (Exception e) {
}
}
subscriber.onCompleted();
}
});
Subscriber<Integer> subscriber = new Subscriber<Integer>() {
@Override
public void onNext(Integer v) {
Log.e(TAG,"onNext................."+v);
}
@Override
public void onCompleted() {
Log.e(TAG, "onCompleted.................");
}
@Override
public void onError(Throwable e) {
Log.e(TAG, "onError.....................");
}
};
observable
.doOnTerminate(new Action0() {
@Override
public void call() {
Log.e(TAG, "訂閱即將被終止.....................");
}
})
.subscribe(subscriber);
執行結果:
五、finallyDo
訂閱完成之後的監聽,無論是正常終止還是異常終止
Observable<Integer> observable = Observable.create(new Observable.OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> subscriber) {
for (int i = 0; i < 5; i++) {
if(i == 3){
subscriber.onError(new Throwable("EROOR"));
}else {
subscriber.onNext(i);
}
try {
Thread.sleep(1000);
} catch (Exception e) {
}
}
subscriber.onCompleted();
}
});
Subscriber<Integer> subscriber = new Subscriber<Integer>() {
@Override
public void onNext(Integer v) {
Log.e(TAG,"onNext................."+v);
}
@Override
public void onCompleted() {
Log.e(TAG, "onCompleted.................");
}
@Override
public void onError(Throwable e) {
Log.e(TAG, "onError.....................");
}
};
observable
.finallyDo(new Action0() {
@Override
public void call() {
Log.e(TAG, "訂閱已經終止.....................");
}
})
.subscribe(subscriber);
執行結果:
六、delay
延遲一段指定的時間再發射來自Observable的發射物。Delay
操作符讓原始Observable在發射每項資料之前都暫停一段指定的時間段。效果是Observable發射的資料項在時間上向前整體平移了一個增量。
5s之後資料才打印出來
Integer[]items = {1,2,3,4,5};
Observable<Integer>observable = Observable.from(items).delay(5,TimeUnit.SECONDS).observeOn(Schedulers.newThread()).subscribeOn(Schedulers.newThread());
Subscriber<Integer> subscriber = new Subscriber<Integer>() {
@Override
public void onNext(Integer v) {
Log.e(TAG,"onNext................."+v);
}
@Override
public void onCompleted() {
Log.e(TAG, "onCompleted.................");
}
@Override
public void onError(Throwable e) {
Log.e(TAG, "onError.....................");
}
};
observable.subscribe(subscriber);
執行結果:
七、delaySubscription
延遲訂閱源Observable
Integer[]items = {1,2,3,4,5};
Observable<Integer>observable = Observable.from(items);
Subscriber<Integer> subscriber = new Subscriber<Integer>() {
@Override
public void onNext(Integer v) {
Log.e(TAG,"onNext................."+v);
}
@Override
public void onCompleted() {
Log.e(TAG, "onCompleted.................");
}
@Override
public void onError(Throwable e) {
Log.e(TAG, "onError.....................");
}
};
observable
.delaySubscription(2,TimeUnit.SECONDS)
.doOnSubscribe(new Action0() {
@Override
public void call() {
Log.e(TAG, "觀察者訂閱了它生成的Observable.....................");
}
}).subscribe(subscriber);
訂閱後,延遲了2s後onNext函式才被呼叫
執行結果:
八、timeInterval
返回聯絡發射的observable的時間間隔
Observable<TimeInterval<Long>>observable = Observable.interval(1,TimeUnit.SECONDS).take(5).timeInterval();
Subscriber<TimeInterval<Long>> subscriber = new Subscriber<TimeInterval<Long>>() {
@Override
public void onNext(TimeInterval<Long> v) {
Log.e(TAG,"onNext................."+v.getValue()+"....................停了"+v.getIntervalInMilliseconds()+"毫s發射了一條資料");
}
@Override
public void onCompleted() {
Log.e(TAG, "onCompleted.................");
}
@Override
public void onError(Throwable e) {
Log.e(TAG, "onError.....................");
}
};
observable.subscribe(subscriber);
執行結果: