RxJava 2.x 之過濾操作符
最近幾天想把rxjava2的操作符都整理一下,看到網上的很多文章都總結的很好,但是時間久了依然會忘記。
過濾操作符
- filter操作符
- take操作符
- takeLast操作符
- firstElement/lastElement操作符
- first/last操作符
- firstOrError/lastOrError操作符
- elementAt/elementAtOrError操作符
- ofType操作符
- skip/skipLast操作符
- ignoreElements操作符
- distinct/distinctUntilChanged操作符
- timeout操作符
- throttleFirst操作符
- throttleLast/sample操作符
- throttleWithTimeout/debounce操作符
filter
filter操作符,可以自己設定任意的規則來過濾資料
Observable.just(1, 2, 3, 4, 5, 6, 7)
.filter(new Predicate<Integer>() {
@Override
public boolean test(Integer value) throws Exception {
return value % 2 == 0;
}
}).subscribeWith(new CommonObserver<>());
輸出:
onNext 2
onNext 4
onNext 6
onComplete
Process finished with exit code 0
take
take操作符主要用來限制發射的數量
Observable.just(1, 2, 3, 4, 5, 6, 7, 8, 9)
. take(5)
.subscribeWith(new CommonObserver<>());
輸出:
onNext 1
onNext 2
onNext 3
onNext 4
onNext 5
onComplete
Process finished with exit code 0
takeLast
takeLast操作符主要用來篩選最後的幾個元素
Observable.just(1, 2, 3, 4, 5, 6, 7, 8, 9)
.takeLast(5)
.subscribeWith(new CommonObserver<>());
輸出:
onNext 5
onNext 6
onNext 7
onNext 8
onNext 9
onComplete
Process finished with exit code 0
firstElement/lastElement
firstElement和lastElement操作符分別用來過濾第一個和最後一個元素
Observable.just(1, 2, 3, 4, 5, 6, 7)
.firstElement().subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer value) throws Exception {
System.out.println("accept: " + value);
}
});
輸出:
accept: 1
Process finished with exit code 0
同理lastElement操作符
會輸出最後一個元素7
first/last
first和last操作符分別用來過濾第一個和最後一個元素,但它可以指定一個預設元素的引數
Observable.just(1, 2, 3, 4, 5, 6, 7)
.first(5).subscribe(new CommonComsumer<>());
輸出:
accept 1
Process finished with exit code 0
同理last操作符
會輸出最後一個元素7
,這裡要注意的是如果前面的使用的empty操作符
,這裡輸出的就是預設的5
這個元素
firstOrError/lastOrError
firstOrError/lastOrError操作符主要用於取第一個元素和最後一個元素如果為空,則會丟擲異常
Observable.empty()
.firstOrError().subscribe(new CommonComsumer<>());
輸出:
Exception in thread "main" io.reactivex.exceptions.OnErrorNotImplementedException
at io.reactivex.internal.functions.Functions$OnErrorMissingConsumer.accept(Functions.java:704)
at io.reactivex.internal.functions.Functions$OnErrorMissingConsumer.accept(Functions.java:701)
at io.reactivex.internal.observers.ConsumerSingleObserver.onError(ConsumerSingleObserver.java:47)
at io.reactivex.internal.operators.observable.ObservableElementAtSingle$ElementAtObserver.onComplete(ObservableElementAtSingle.java:117)
at io.reactivex.internal.disposables.EmptyDisposable.complete(EmptyDisposable.java:53)
at io.reactivex.internal.operators.observable.ObservableEmpty.subscribeActual(ObservableEmpty.java:28)
at io.reactivex.Observable.subscribe(Observable.java:12030)
at io.reactivex.internal.operators.observable.ObservableElementAtSingle.subscribeActual(ObservableElementAtSingle.java:37)
at io.reactivex.Single.subscribe(Single.java:3394)
at io.reactivex.Single.subscribe(Single.java:3380)
at io.reactivex.Single.subscribe(Single.java:3351)
at com.onzhou.rxjava2.filter.SampleFirstOrError.invoke(SampleFirstOrError.java:18)
at com.onzhou.rxjava2.Main.main(Main.java:38)
Caused by: java.util.NoSuchElementException
... 10 more
Process finished with exit code 0
elementAt/elementAtOrError
elementAt操作符主要用來指定發射第幾個元素支援越界,而elementAtOrError也是指定發射第幾個元素,但是越界會丟擲異常
elementAt操作符:
Observable.just(1, 2, 3, 4, 5, 6, 7)
.elementAt(3).subscribe(new CommonComsumer<>());
輸出:
accept 4
Process finished with exit code 0
elementAtOrError操作符:
Observable.just(1, 2, 3, 4, 5, 6, 7)
.elementAtOrError(16).subscribe(new CommonComsumer<>());
輸出:
Exception in thread "main" io.reactivex.exceptions.OnErrorNotImplementedException
at io.reactivex.internal.functions.Functions$OnErrorMissingConsumer.accept(Functions.java:704)
at io.reactivex.internal.functions.Functions$OnErrorMissingConsumer.accept(Functions.java:701)
at io.reactivex.internal.observers.ConsumerSingleObserver.onError(ConsumerSingleObserver.java:47)
at io.reactivex.internal.operators.observable.ObservableElementAtSingle$ElementAtObserver.onComplete(ObservableElementAtSingle.java:117)
at io.reactivex.internal.operators.observable.ObservableFromArray$FromArrayDisposable.run(ObservableFromArray.java:110)
at io.reactivex.internal.operators.observable.ObservableFromArray.subscribeActual(ObservableFromArray.java:36)
at io.reactivex.Observable.subscribe(Observable.java:12030)
at io.reactivex.internal.operators.observable.ObservableElementAtSingle.subscribeActual(ObservableElementAtSingle.java:37)
at io.reactivex.Single.subscribe(Single.java:3394)
at io.reactivex.Single.subscribe(Single.java:3380)
at io.reactivex.Single.subscribe(Single.java:3351)
at com.onzhou.rxjava2.filter.SampleElementAtOrError.invoke(SampleElementAtOrError.java:18)
at com.onzhou.rxjava2.Main.main(Main.java:40)
Caused by: java.util.NoSuchElementException
... 10 more
Process finished with exit code 0
ofType
ofType操作符主要用來篩選特定型別的資料
Observable.just(1, "name", true)
.ofType(Integer.class).subscribeWith(new CommonObserver<>());
輸出:
onNext 1
onComplete
Process finished with exit code 0
skip/skipLast
skip或者skipLast操作符,可以跳過若干個元素,或者跳過一段時間
Observable.just(1, 2, 3, 4, 5, 6, 7, 8, 9)
.skip(5).subscribeWith(new CommonObserver<>());
輸出:
onNext 6
onNext 7
onNext 8
onNext 9
onComplete
Process finished with exit code 0
skipLast操作符
Observable.just(1, 2, 3, 4, 5, 6, 7, 8, 9)
.skipLast(5).subscribeWith(new CommonObserver<>());
輸出:
onNext 1
onNext 2
onNext 3
onNext 4
onComplete
Process finished with exit code 0
ignoreElements
ignoreElements操作符會忽略所有的元素,回撥onError或者onComplete
Observable.just(1, 2, 3, 4, 5, 6, 7, 8, 9)
.ignoreElements().subscribe(new Action() {
@Override
public void run() throws Exception {
System.out.println("onComplete");
}
});
輸出:
onComplete
Process finished with exit code 0
distinct/distinctUntilChanged
distinct操作符主要用來過濾重複的元素
Observable.just(1, 2, 3, 1, 3, 1, 2)
.distinct().subscribe(new CommonComsumer<>());
輸出:
accept 1
accept 2
accept 3
Process finished with exit code 0
distinctUntilChanged操作符主要用來過濾連續重複的元素
Observable.just(1, 2, 2, 1, 1, 6, 7)
.distinctUntilChanged().subscribe(new CommonComsumer<>());
輸出:
accept 1
accept 2
accept 1
accept 6
accept 7
Process finished with exit code 0
timeout
timeout操作符主要用來進行超時過濾操作
Observable.intervalRange(0, 10, 0, 2, TimeUnit.SECONDS)
.timeout(1, TimeUnit.SECONDS).subscribeWith(new CommonObserver<>());
執行1秒後超時丟擲異常
輸出:
onNext 0
onError java.util.concurrent.TimeoutException
Process finished with exit code 0
throttleFirst
throttleFirst操作符主要用來連續時間段內響應一次操作
Observable.intervalRange(0, 6, 0, 1, TimeUnit.SECONDS)
//每秒中只處理第一個資料
.throttleFirst(1, TimeUnit.SECONDS).subscribeWith(new CommonObserver<>());
輸出:
onNext 0
onNext 2
onNext 4
onComplete
Process finished with exit code 0
throttleLast/sample
throttleLast和sample操作符類似都是隔一段時間採集資料
Observable.intervalRange(0, 8, 0, 1, TimeUnit.SECONDS)
//每秒中只處理第一個資料
//.sample(2, TimeUnit.SECONDS).subscribeWith(new CommonObserver<>());
.throttleLast(2, TimeUnit.SECONDS).subscribeWith(new CommonObserver<>());
輸出:
onNext 1
onNext 3
onNext 5
onComplete
Process finished with exit code 0
throttleWithTimeout/debounce
throttleWithTimeout/debounce操作符主要用來處理一段時間內不響應才輸出結果
Observable.intervalRange(0, 10, 0, 1, TimeUnit.SECONDS)
//2秒內有新資料則拋棄舊資料
//.debounce(2, TimeUnit.SECONDS);
.throttleWithTimeout(2, TimeUnit.SECONDS).subscribeWith(new CommonObserver<>());
輸出:
onNext 9
onComplete
Process finished with exit code 0
複習文件
https://github.com/byhook/rxjava2-study
參考:
https://maxwell-nc.github.io/android/rxjava2-3.html