RxJava過濾操作符
概述
過濾操作符用於過濾和選擇Observable發射的資料序列,讓Observable只返回滿足我們條件的資料。
Debounce
Debounce會過濾掉髮射速率過快的資料項,相當於限流,但是需要注意的是debounce過濾掉的資料會被丟棄掉。
如果在一個指定的時間間隔過去了仍舊沒有發射一個,那麼它將發射最後的那個。
RxJava將這個操作符實現為throttleWithTimeout和debounce.
簡單粗暴的說法:當N個結點發生的時間太靠近(即發生的時間差小於設定的值T),debounce就會自動過濾掉前N-1個結點。
場景:比如EidtText輸入聯想,可以使用debounce減少頻繁的網路請求。避免每輸入(刪除)一個字就做一次聯想。
和switchMap結合使用效果更佳,一個用於取消上次請求,一個用於節流。
throttleWithTimeOut
通過時間來限流,源Observable每次發射出來一個數據後就會進行計時,
如果在設定好的時間結束前源Observable有新的資料發射出來,這個資料就會被丟棄,同時重新開始計時。
如果每次都是在計時結束前發射資料,那麼這個限流就會走向極端:只會發射最後一個數據。
預設在computation排程器上執行
例項:
public void throttleWithTimeout() {
Subscription subscribe = Observable.create(new Observable.OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> subscriber) {
for (int i = 0; i < 10; i++) {
if (!subscriber.isUnsubscribed()) {
subscriber.onNext(i);
}
int sleep = 100;
if (i % 3 == 0) {
sleep = 300;
}
try {
Thread.sleep(sleep);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
subscriber.onCompleted();
}
}).subscribeOn(Schedulers.computation())
.throttleWithTimeout(200, TimeUnit.MILLISECONDS)
.observeOn(AndroidSchedulers.mainThread())
.subscribe(i -> logger("throttleWithTimeout:" + i));
addSubscription(subscribe);
列印結果:
throttleWithTimeout:0
throttleWithTimeout:3
throttleWithTimeout:6
throttleWithTimeout:9
結果分析:每隔100毫秒發射一個數據,當要發射的資料是3的倍數的時候,下一個資料就延遲到300毫秒再發射
即:0 -300ms-> 1 -100ms-> 2 -100ms-> 3 ..
設定過濾時間為200ms,則1,2都被過濾丟棄。
deounce
不僅可以使用時間來進行過濾,還可以根據一個函式來進行限流。
這個函式的返回值是一個臨時Observable,
如果源Observable在發射一個新的資料的時候,
上一個資料根據函式所生成的臨時Observable還沒有結束,那麼上一個資料就會被過濾掉。
值得注意的是,如果源Observable產生的最後一個結果後在規定的時間間隔內呼叫了onCompleted,
那麼通過debounce操作符也會把這個結果提交給訂閱者。
public void debounce() {
Observable.just(1, 2, 3, 4, 5, 6, 7, 8, 9).debounce(integer -> {
return Observable.create(new Observable.OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> subscriber) {
//如果%2==0,則發射資料並呼叫了onCompleted結束,則不會被丟棄
if (integer % 2 == 0 && !subscriber.isUnsubscribed()) {
subscriber.onNext(integer);
subscriber.onCompleted();
}
}
});
})
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
logger("debounce:" + integer);
}
});
}
列印結果:
debounce:2
debounce:4
debounce:6
debounce:8
debounce:9
由結果可知,9的列印證明預設呼叫了onCompleted
Distinct
Distinct操作符的用處就是用來去重,只允許還沒有發射過的資料項通過
distinctUntilChanged和這個函式功能類似,是去掉連續重複的資料
例項:
public void distinct(){
Observable.just(1, 2, 1, 1, 2, 3)
.distinct()
.subscribe(new Subscriber<Integer>() {
@Override
public void onNext(Integer item) {
logger("Next: " + item);
}
@Override
public void onError(Throwable error) {
logger("Error: " + error.getMessage());
}
@Override
public void onCompleted() {
logger("Sequence complete.");
}
});
}
列印結果:
Next: 1
Next: 2
Next: 3
Sequence complete.
public void distinctUntilChangedObserver(){
Observable.just(1, 2, 3, 3, 3, 1, 2, 3, 3)
.distinctUntilChanged()
.subscribe(integer -> logger("UntilChanged:"+integer));
}
列印結果:
UntilChanged: 1
UntilChanged: 2
UntilChanged: 3
UntilChanged: 1
UntilChanged: 2
UntilChanged: 3
ElementAt
從字面意思來看,ElementAt只會返回指定位置的資料。其相關方法有elementAtOrDefault(int,T),可以允許預設值
例項:
public void elementAt(){
Observable.just(0, 1, 2, 3, 4, 5).elementAt(2)
.subscribe(i -> logger("elementAt:" + i));
}
列印結果:
elementAt:2
Filter
允許傳入一個Func,通過的資料才會被髮射。
特殊形式ofType(Class):Observable只返回指定型別的資料。
例項:
public void filter() {
Observable.just(1, 2, 3, 4, 5)
.filter(new Func1<Integer, Boolean>() {
@Override
public Boolean call(Integer item) {
return (item < 4);
}
}).subscribe(new Subscriber<Integer>() {
@Override
public void onNext(Integer item) {
logger("Next: " + item);
}
@Override
public void onError(Throwable error) {
logger("Error: " + error.getMessage());
}
@Override
public void onCompleted() {
logger("Sequence complete.");
}
});
}
列印結果:
Next: 1
Next: 2
Next: 3
Sequence complete.
First、Last
First返回滿足條件的第一條資料.被實現為first,firstOrDefault和takeFirst。
Last操作符只返回最後一條滿足條件的資料,被實現為last和lastOrDefault。
如果獲取不到資料,First和Last會丟擲NoSuchElementException異常
takeFist會返回一個空的Observable(不呼叫onNext()但是會呼叫onCompleted)。
First和Last 都沒有實現為一個返回Observable的過濾操作符,
而是一個在當時就發射原始Observable指定資料項的阻塞函式。如果需要的是過濾操作符,
可以使用Take(1)、ElementAt(0)或者TakeLast(1),TakeLast(Func)
如果不想立即返回Observable,而是需要阻塞並返回值,可以使用BlockingObservable,
通過Observable.toBlocking或者BlockingObservable.from方法來轉化。
例項:
public void first() {
BlockingObservable<Integer> integerBlockingObservable = Observable.create(new Observable.OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> subscriber) {
for (int i = 0; i < 5; i++) {
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
if (!subscriber.isUnsubscribed()) {
logger("onNext:" + i);
subscriber.onNext(i);
}
}
if (!subscriber.isUnsubscribed()) {
subscriber.onCompleted();
}
}
}).toBlocking();
Integer first = integerBlockingObservable.first(new Func1<Integer, Boolean>() {
@Override
public Boolean call(Integer integer) {
return integer > 3;
}
});
logger(first);
}
2s後列印了:<– 阻塞了,知道大於3的資料發射出來
onNext:0
onNext:1
onNext:2
onNext:3
onNext:4
4
takeLast例項:
public void takeLast() {
Observable.just(1, 2, 3, 4, 5, 6, 7).takeLast(2)
.subscribe(new Subscriber<Integer>() {
@Override
public void onNext(Integer item) {
logger("Next: " + item);
}
@Override
public void onError(Throwable error) {
logger("Error: " + error.getMessage());
}
@Override
public void onCompleted() {
logger("Sequence complete.");
}
});
}
列印結果:
Next: 6
Next: 7
Sequence complete.
Skip、Take
Skip操作符將源Observable發射的資料過濾掉前n項,而Take操作符則只取前n項
相關操作符:TakeLast:發射Observable發射的最後N項資料,
takeLastBuffer:最後N項資料收集到list再發射
SkipLast:忽略Observable’發射的後N項資料,只保留前面的資料。
skipLast操作符提交滿足條件的結果給訂閱者存在延遲效果
例項:
public void skip(){
Observable.just(0, 1, 2, 3, 4, 5).skip(2).subscribe(i -> logger("Skip:" + i));
}
public void take(){
Observable.just(0, 1, 2, 3, 4, 5).take(2).subscribe(i -> logger("Take:" + i));
}
列印結果:
Skip:2
Skip:3
Skip:4
Skip:5
Take:0
Take:1
Sample、ThrottleFirst
Sample操作符會定時地發射源Observable最近發射的資料,其他的都會被過濾掉。
RxJava將這個操作符實現為sample和throttleLast。
而ThrottleFirst操作符則會定期發射這個時間段裡源Observable發射的第一個資料
這兩個操作符都在computation排程器上執行。
例項:
private Observable<Integer> createObserver() {
return Observable.create(new Observable.OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> subscriber) {
for (int i = 0; i < 20; i++) {
try {
Thread.sleep(200);
} catch (InterruptedException e) {
e.printStackTrace();
}
subscriber.onNext(i);
}
subscriber.onCompleted();
}
});
}
public void sample() {
createObserver().sample(1000, TimeUnit.MILLISECONDS)
.subscribe(i -> logger("sample:" + i));
}
public void throttleFirst() {
createObserver().throttleFirst(1000, TimeUnit.MILLISECONDS)
.subscribe(i -> logger("throttleFirst:" + i));
}
列印結果:
sample:3
sample:8
sample:13
sample:18
throttleFirst:0
throttleFirst:5
throttleFirst:10
throttleFirst:15
其中sample操作符會每隔5個數字發射出一個數據來,
而throttleFirst則會每隔5個數據發射第一個資料。
ThrottleFirst 與RxBinding結合
@OnClick(R.id.btn_click)
public void btnClick(){
RxView.clicks(btnClick)
.throttleFirst(1, TimeUnit.SECONDS)
.subscribe(new Action1<Void>() {
@Override
public void call(Void aVoid) {
Toast.makeText(RxBindingButtonClick.this,"Click",Toast.LENGTH_SHORT).show();
}
});
}
在1s內只響應一次點選,避免了重複點選的問題
ignoreElements
ignoreElements操作符忽略所有源Observable產生的結果,只把Observable的onCompleted和onError事件通知給訂閱者。
ignoreElements操作符適用於不太關心Observable產生的結果,只是在Observable結束時(onCompleted)或者出現錯誤時能夠收到通知。
例項:
public void ignoreElements(){
Observable.just(1,2,3,4,5,6,7,8).ignoreElements()
.subscribe(new Subscriber<Integer>() {
@Override
public void onNext(Integer item) {
logger("Next: " + item);
}
@Override
public void onError(Throwable error) {
logger("Error: " + error.getMessage());
}
@Override
public void onCompleted() {
logger("Sequence complete.");
}
});
}
列印結果:
Sequence complete.