RxJava----操作符:輔助操作符
Observable Utility Operators(輔助操作符)
delay
顧名思義,Delay操作符就是讓發射資料的時機延後一段時間,這樣所有的資料都會依次延後一段時間發射。
log("start subscrib:" + System.currentTimeMillis()/1000);
Observable<Long> observable = Observable.create(new Observable.OnSubscribe<Long>() {
@Override
public void call(Subscriber<? super Long> subscriber) {
for (int i = 1; i <= 2; i++) {
Long currentTime=System.currentTimeMillis()/1000;
log("subscrib:" + currentTime);
subscriber.onNext(currentTime);
try {
Thread.sleep(1000 );
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}).subscribeOn(Schedulers.newThread());
observable.delay(2000, TimeUnit.MILLISECONDS).subscribe(new Action1<Long>() {
@Override
public void call(Long aLong) {
log("delay:"+System.currentTimeMillis()/1000+"---"+(System.currentTimeMillis()/1000-aLong));
}
});
結果:
start subscrib:1462519228
subscrib:1462519228
subscrib:1462519229
delay:1462519230---2
delay:1462519231---2
delaySubscription
不同之處在於Delay是延時資料的發射,而DelaySubscription是延時註冊Subscriber。
dealy是延遲發射,delaySubscription則是延遲收到。
log("start subscrib:" + System.currentTimeMillis()/1000);
Observable<Long> observable = Observable.create(new Observable.OnSubscribe<Long>() {
@Override
public void call(Subscriber<? super Long> subscriber) {
for (int i = 1; i <= 2; i++) {
Long currentTime=System.currentTimeMillis()/1000;
log("subscrib:" + currentTime);
subscriber.onNext(currentTime);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}).subscribeOn(Schedulers.newThread());
observable.delaySubscription(2000, TimeUnit.MILLISECONDS).subscribe(new Action1<Long>() {
@Override
public void call(Long aLong) {
log("delaySubscription:"+System.currentTimeMillis()/1000+"---"+(System.currentTimeMillis()/1000-aLong));
}
});
結果:
start subscrib:1462519279
subscrib:1462519281
delaySubscription:1462519281---0
subscrib:1462519282
delaySubscription:1462519282---0
do
do操作符就是給Observable的生命週期的各個階段加上一系列的回撥監聽,當Observable執行到這個階段的時候,這些回撥就會被觸發。在Rxjava實現了很多的doxxx操作符。
doOnEach
doOnEach可以給Observable加上這樣的樣一個回撥:Observable每發射一個數據的時候就會觸發這個回撥,不僅包括onNext還包括onError和onCompleted。
Observable observable=Observable.just(1,2,3);
observable.doOnEach(new Action1<Notification>() {
@Override
public void call(Notification notification) {
log("doOnEach send " + notification.getValue() + " type:" + notification.getKind());
}
}).subscribe(new Action1() {
@Override
public void call(Object o) {
log(o.toString());
}
});
Subject<Integer, Integer> values = ReplaySubject.create();
values.doOnEach(new Action1<Notification<? super Integer>>() {
@Override
public void call(Notification<? super Integer> notification) {
log("doOnEach send " + notification.getValue() + " type:" + notification.getKind());
}
}).subscribe(new Action1() {
@Override
public void call(Object o) {
log(o.toString());
}
});
values.onNext(4);
values.onNext(5);
values.onNext(6);
values.onError(new Exception("Oops"));
結果:
doOnEach send 1 type:OnNext
1
doOnEach send 2 type:OnNext
2
doOnEach send 3 type:OnNext
3
doOnEach send null type:OnCompleted
doOnEach send 4 type:OnNext
4
doOnEach send 5 type:OnNext
5
doOnEach send 6 type:OnNext
6
doOnEach send null type:OnError
doOnNext
doOnNext則只有onNext的時候才會被觸發。
Subject<Integer, Integer> values = ReplaySubject.create();
values.doOnNext(new Action1<Integer>() {
@Override
public void call(Integer integer) {
log("doOnNext send :"+integer.toString());
}
}).subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
log(integer.toString());
}
});
values.onNext(4);
values.onError(new Exception("Oops"));
結果:
doOnNext send :4
4
doOnSubscribe
doOnSubscribe會在Subscriber進行訂閱的時候觸發回撥。
Observable observable=Observable.just(1,2);
observable.subscribe(new Action1() {
@Override
public void call(Object o) {
log("first:"+o.toString());
}
});
observable.subscribe(new Action1() {
@Override
public void call(Object o) {
log("second:"+o.toString());
}
});
結果:
I'm be subscribed!
first:1
first:2
I'm be subscribed!
second:1
second:2
doOnUnSubscribe
doOnUnSubscribe則會在Subscriber進行反訂閱的時候觸發回撥。
當一個Observable通過OnError或者OnCompleted結束的時候,會反訂閱所有的Subscriber。
Observable observable = Observable.just(1, 2).doOnUnsubscribe(new Action0() {
@Override
public void call() {
log("I'm be unSubscribed!");
}
});
Subscription subscribe1 = observable.subscribe();
Subscription subscribe2 = observable.subscribe();
subscribe1.unsubscribe();
subscribe2.unsubscribe();
結果:
I'm be unSubscribed!
I'm be unSubscribed!
doOnError
doOnError會在OnError發生的時候觸發回撥,並將Throwable物件作為引數傳進回撥函式裡;
try {
Observable observable = Observable.error(new Throwable("呵呵噠")).doOnError(new Action1<Throwable>() {
@Override
public void call(Throwable throwable) {
log(throwable.getMessage().toString());
}
});
observable.subscribe();
}catch (Exception e){
log("catch the exception");
}
結果:
呵呵噠
catch the exception
doOnComplete
doOnComplete會在OnCompleted發生的時候觸發回撥。
Observable observable = Observable.empty().doOnCompleted(new Action0() {
@Override
public void call() {
log("Complete!");
}
});
observable.subscribe();
結果:
Complete!
doOnTerminate
DoOnTerminate會在Observable結束前觸發回撥,無論是正常還是異常終止;
Subject<Integer, Integer> values = ReplaySubject.create();
values.doOnTerminate(new Action0() {
@Override
public void call() {
log("order to terminate");
}
}).subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
log(integer.toString());
}
}, new Action1<Throwable>() {
@Override
public void call(Throwable throwable) {
log(throwable.getMessage().toString());
}
});
values.onNext(4);
values.onError(new Exception("Oops"));
結果:
4
order to terminate
Oops
finallyDo
finallyDo會在Observable結束後觸發回撥,無論是正常還是異常終止。
Observable observable = Observable.empty().finallyDo(new Action0() {
@Override
public void call() {
log("already terminate");
}
});
observable.subscribe(new Action1() {
@Override
public void call(Object o) {
}
}, new Action1<Throwable>() {
@Override
public void call(Throwable throwable) {
}
}, new Action0() {
@Override
public void call() {
log("Complete!");
}
});
結果:
Complete!
already terminate
materialize
materialize操作符將OnNext/OnError/OnComplete都轉化為一個Notification物件並按照原來的順序發射出來。
public final Observable<Notification<T>> materialize()
元資料中包含了源 Observable 所發射的動作,是呼叫 onNext 還是 onComplete。注意上圖中,源 Observable 結束的時候, materialize 還會發射一個 onComplete 資料,然後才發射一個結束事件。
Observable<Long> values = Observable.interval(100, TimeUnit.MILLISECONDS);
values.take(3)
.materialize()
.subscribe(new Action1<Object>() {
@Override
public void call(Object o) {
log(o.toString());
}
});
結果:
meterialize:0--type:OnNext
meterialize:1--type:OnNext
meterialize:2--type:OnNext
meterialize:null--type:OnCompleted
Notification 類包含了一些判斷每個資料發射型別的方法,如果出錯了還可以獲取錯誤資訊 Throwable 物件。
dematerialize
deMeterialize則是與materialize 執行相反的過程。
Observable<Long> values = Observable.interval(100, TimeUnit.MILLISECONDS);
values.take(3)
.materialize()
.dematerialize()
.subscribe(new Action1<Object>() {
@Override
public void call(Object o) {
log(o.toString());
}
});
結果:
0
1
2
注意:在呼叫dematerialize()
之前必須先呼叫materialize()
,否則會報錯。
serialize
強制Observable按次序發射資料並且功能是有效的
如果你無法確保自定義的操作符符合 Rx 的約定,例如從多個源非同步獲取資料,則可以使用 serialize 操作函式。 serialize 可以把一個不符合約定的 Observable 轉換為一個符合約定的 Observable。
下面建立一個不符合約定的 Observable,並且訂閱到該 Observable上:
Observable<Integer> observable = Observable.create(new Observable.OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> subscriber) {
subscriber.onNext(1);
subscriber.onNext(2);
subscriber.onCompleted();
subscriber.onNext(3);
subscriber.onCompleted();
}
});
observable.doOnUnsubscribe(new Action0() {
@Override
public void call() {
log("Unsubscribed");
}
}) .subscribe(
new Action1<Integer>() {
@Override
public void call(Integer integer) {
}
}, new Action1<Throwable>() {
@Override
public void call(Throwable throwable) {
}
}, new Action0() {
@Override
public void call() {
log("Complete!");
}
});
結果:
1
2
Complete!
Unsubscribed
先不管上面的 Observable 發射的資料,訂閱結束的情況看起來符合 Rx 約定。 這是由於 subscribe 認為當前資料流結束的時候會主動結束這個 Subscription。但實際使用中我們可能並不想直接結束這個Subscription。還有一個函式為 unsafeSubscribe ,該函式不會自動取消訂閱。
observable.doOnUnsubscribe(new Action0() {
@Override
public void call() {
log("Unsubscribed");
}
})
.unsafeSubscribe(new Subscriber<Integer>() {
@Override
public void onCompleted() {
log("Complete!");
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(Integer integer) {
}
});
結果:
1
2
Complete!
3
Complete!
上面的示例最後就沒有列印 Unsubscribed 字串。
unsafeSubscribe 也不能很好的處理錯誤情況。所以該函式幾乎沒用。在文件中說:該函式應該僅僅在自定義操作函式中處理巢狀訂閱的情況。 為了避免這種操作函式接受到不合法的資料流,我們可以在其上應用 serialize 操作函式:
Observable<Integer> observable = Observable.create(new Observable.OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> subscriber) {
subscriber.onNext(1);
subscriber.onNext(2);
subscriber.onCompleted();
subscriber.onNext(3);
subscriber.onCompleted();
}
})
.cast(Integer.class)
.serialize();
observable.doOnUnsubscribe(new Action0() {
@Override
public void call() {
log("Unsubscribed");
}
})
.unsafeSubscribe(new Subscriber<Integer>() {
@Override
public void onCompleted() {
log("Complete!");
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(Integer integer) {
}
});
結果:
1
2
Complete!
儘管上面的程式碼中沒有呼叫unsubscribe, 但是資料流事件依然符合約定。最後也收到了完成事件。
timeout
新增超時機制,如果過了指定的一段時間沒有發射資料,就發射一個錯誤通知
- 我們可以認為timeout()為一個Observable的限時的副本。
- 如果在指定的時間間隔內Observable不發射值的話,它監聽的原始的Observable時就會觸發onError()函式。
Observable<Long> values = Observable.interval(200, TimeUnit.MILLISECONDS);
Subscription subscription = values
.timeout(300,TimeUnit.MILLISECONDS)
.subscribe(new Observer<Long>() {
@Override
public void onCompleted() {
log("Complete!");
}
@Override
public void onError(Throwable e) {
log(e.getMessage().toString());
}
@Override
public void onNext(Long aLong) {
log(aLong+"");
}
});
結果:
0
1
2
...
Rxjava將Timeout實現為很多不同功能的操作符,比如說超時後用一個備用的Observable繼續發射資料等。
Observable.create(new Observable.OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> subscriber) {
for (int i = 0; i <= 3; i++) {
try {
Thread.sleep(i * 100);
} catch (InterruptedException e) {
e.printStackTrace();
}
subscriber.onNext(i);
}
subscriber.onCompleted();
}
}).timeout(200, TimeUnit.MILLISECONDS, Observable.just(5, 6)).subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
log(integer.toString());
}
});
結果:
0
1
2
5
6
timestamp
給Observable發射的每個資料項新增一個時間戳
timestamp 把資料轉換為 Timestamped 型別,裡面包含了原始的資料和一個原始資料是何時發射的時間戳。
public final Observable<Timestamped<T>> timestamp()
Observable<Long> values = Observable.interval(100, TimeUnit.MILLISECONDS);
values.take(3)
.timestamp()
.subscribe(new Action1<Timestamped>() {
@Override
public void call(Timestamped mTimestamped) {
log(mTimestamped.toString());
}
});
結果:
Timestamped(timestampMillis = 1461758360570, value = 0)
Timestamped(timestampMillis = 1461758360670, value = 1)
Timestamped(timestampMillis = 1461758360771, value = 2)
從結果可以看到,上面的資料大概每隔100毫秒發射一個。
timeInterval
將一個Observable轉換為發射兩個資料之間所耗費時間的Observable
如果你想知道前一個數據和當前資料發射直接的時間間隔,則可以使用 timeInterval 函式。
public final Observable<TimeInterval<T>> timeInterval()
Observable<Long> values = Observable.interval(100, TimeUnit.MILLISECONDS);
values.take(3)
.timeInterval()
.subscribe(new Action1<TimeInterval>() {
@Override
public void call(TimeInterval mTimeInterval) {
log(mTimeInterval.toString());
}
});
結果:
TimeInterval [intervalInMilliseconds=101, value=0]
TimeInterval [intervalInMilliseconds=99, value=1]
TimeInterval [intervalInMilliseconds=100, value=2]
using
建立一個只在Observable的生命週期記憶體在的一次性資源
Using操作符建立一個在Observable生命週期記憶體活的資源,也可以這樣理解:我們建立一個資源並使用它,用一個Observable來限制這個資源的使用時間,當這個Observable終止的時候,這個資源就會被銷燬。
public static final <T,Resource> Observable<T> using(
Func0<Resource> resourceFactory,
Func1<? super Resource,? extends Observable<? extends T>> observableFactory,
Action1<? super Resource> disposeAction)
using 有三個引數,分別是:
- 1.建立這個一次性資源的函式
- 2.建立Observable的函式
- 3.釋放資源的函式
當 Observable 被訂閱的時候,resourceFactory 用來獲取到需要的資源;observableFactory 用這個資源來發射資料;當 Observable 完成的時候,disposeAction 來釋放資源。
Observable observable = Observable.using(new Func0<Animal>() {
@Override
public Animal call() {
return new Animal();
}
}, new Func1<Animal, Observable<?>>() {
@Override
public Observable<?> call(Animal animal) {
return Observable.timer(3, TimeUnit.SECONDS);//三秒後發射一次就completed
// return Observable.timer(4, 2, TimeUnit.SECONDS);//沒有completed,不停的發射資料
// return Observable.range(1,3);//一次發射三個資料,馬上結束
// return Observable.just(1,2,3);//一次發射三個資料,馬上結束
}
}, new Action1<Animal>() {
@Override
public void call(Animal animal) {
animal.relase();
}
});
Subscriber subscriber = new Subscriber() {
@Override
public void onCompleted() {
log("subscriber---onCompleted");
}
@Override
public void onError(Throwable e) {
log("subscriber---onError");
}
@Override
public void onNext(Object o) {
log("subscriber---onNext"+o.toString());//o是發射的次數統計,可以用timer(4, 2, TimeUnit.SECONDS)測試
}
};
observable.count().subscribe(subscriber);
結果:
create animal
animal eat
animal eat
animal eat
subscriber---onNext1
subscriber---onCompleted
animal released