1. 程式人生 > >RxJava 2.x 之過濾操作符

RxJava 2.x 之過濾操作符

最近幾天想把rxjava2的操作符都整理一下,看到網上的很多文章都總結的很好,但是時間久了依然會忘記。

過濾操作符

filter

filter操作符,可以自己設定任意的規則來過濾資料

Observable.just(1, 2, 3, 4, 5, 6, 7)
          .filter(new Predicate<Integer>() {
                @Override
                public boolean test(Integer value)
throws Exception { return value % 2 == 0; } }).subscribeWith(new CommonObserver<>());

輸出:

onNext 2
onNext 4
onNext 6
onComplete

Process finished with exit code 0

take

take操作符主要用來限制發射的數量

Observable.just(1, 2, 3, 4, 5, 6, 7, 8, 9)
          .
take(5) .subscribeWith(new CommonObserver<>());

輸出:

onNext 1
onNext 2
onNext 3
onNext 4
onNext 5
onComplete

Process finished with exit code 0

takeLast

takeLast操作符主要用來篩選最後的幾個元素

Observable.just(1, 2, 3, 4, 5, 6, 7, 8, 9)
          .takeLast(5)
          .subscribeWith(new CommonObserver<>());

輸出:

onNext 5
onNext 6
onNext 7
onNext 8
onNext 9
onComplete

Process finished with exit code 0

firstElement/lastElement

firstElement和lastElement操作符分別用來過濾第一個和最後一個元素

Observable.just(1, 2, 3, 4, 5, 6, 7)
          .firstElement().subscribe(new Consumer<Integer>() {
            @Override
            public void accept(Integer value) throws Exception {
                System.out.println("accept: " + value);
            }
        });

輸出:

accept: 1

Process finished with exit code 0

同理lastElement操作符會輸出最後一個元素7

first/last

first和last操作符分別用來過濾第一個和最後一個元素,但它可以指定一個預設元素的引數

Observable.just(1, 2, 3, 4, 5, 6, 7)
          .first(5).subscribe(new CommonComsumer<>());

輸出:

accept 1

Process finished with exit code 0

同理last操作符會輸出最後一個元素7,這裡要注意的是如果前面的使用的empty操作符,這裡輸出的就是預設的5這個元素

firstOrError/lastOrError

firstOrError/lastOrError操作符主要用於取第一個元素和最後一個元素如果為空,則會丟擲異常

Observable.empty()
          .firstOrError().subscribe(new CommonComsumer<>());

輸出:

Exception in thread "main" io.reactivex.exceptions.OnErrorNotImplementedException
	at io.reactivex.internal.functions.Functions$OnErrorMissingConsumer.accept(Functions.java:704)
	at io.reactivex.internal.functions.Functions$OnErrorMissingConsumer.accept(Functions.java:701)
	at io.reactivex.internal.observers.ConsumerSingleObserver.onError(ConsumerSingleObserver.java:47)
	at io.reactivex.internal.operators.observable.ObservableElementAtSingle$ElementAtObserver.onComplete(ObservableElementAtSingle.java:117)
	at io.reactivex.internal.disposables.EmptyDisposable.complete(EmptyDisposable.java:53)
	at io.reactivex.internal.operators.observable.ObservableEmpty.subscribeActual(ObservableEmpty.java:28)
	at io.reactivex.Observable.subscribe(Observable.java:12030)
	at io.reactivex.internal.operators.observable.ObservableElementAtSingle.subscribeActual(ObservableElementAtSingle.java:37)
	at io.reactivex.Single.subscribe(Single.java:3394)
	at io.reactivex.Single.subscribe(Single.java:3380)
	at io.reactivex.Single.subscribe(Single.java:3351)
	at com.onzhou.rxjava2.filter.SampleFirstOrError.invoke(SampleFirstOrError.java:18)
	at com.onzhou.rxjava2.Main.main(Main.java:38)
Caused by: java.util.NoSuchElementException
	... 10 more

Process finished with exit code 0

elementAt/elementAtOrError

elementAt操作符主要用來指定發射第幾個元素支援越界,而elementAtOrError也是指定發射第幾個元素,但是越界會丟擲異常
elementAt操作符:

Observable.just(1, 2, 3, 4, 5, 6, 7)
          .elementAt(3).subscribe(new CommonComsumer<>());

輸出:

accept 4

Process finished with exit code 0

elementAtOrError操作符:

Observable.just(1, 2, 3, 4, 5, 6, 7)
          .elementAtOrError(16).subscribe(new CommonComsumer<>());

輸出:

Exception in thread "main" io.reactivex.exceptions.OnErrorNotImplementedException
	at io.reactivex.internal.functions.Functions$OnErrorMissingConsumer.accept(Functions.java:704)
	at io.reactivex.internal.functions.Functions$OnErrorMissingConsumer.accept(Functions.java:701)
	at io.reactivex.internal.observers.ConsumerSingleObserver.onError(ConsumerSingleObserver.java:47)
	at io.reactivex.internal.operators.observable.ObservableElementAtSingle$ElementAtObserver.onComplete(ObservableElementAtSingle.java:117)
	at io.reactivex.internal.operators.observable.ObservableFromArray$FromArrayDisposable.run(ObservableFromArray.java:110)
	at io.reactivex.internal.operators.observable.ObservableFromArray.subscribeActual(ObservableFromArray.java:36)
	at io.reactivex.Observable.subscribe(Observable.java:12030)
	at io.reactivex.internal.operators.observable.ObservableElementAtSingle.subscribeActual(ObservableElementAtSingle.java:37)
	at io.reactivex.Single.subscribe(Single.java:3394)
	at io.reactivex.Single.subscribe(Single.java:3380)
	at io.reactivex.Single.subscribe(Single.java:3351)
	at com.onzhou.rxjava2.filter.SampleElementAtOrError.invoke(SampleElementAtOrError.java:18)
	at com.onzhou.rxjava2.Main.main(Main.java:40)
Caused by: java.util.NoSuchElementException
	... 10 more

Process finished with exit code 0

ofType

ofType操作符主要用來篩選特定型別的資料

Observable.just(1, "name", true)
          .ofType(Integer.class).subscribeWith(new CommonObserver<>());

輸出:

onNext 1
onComplete

Process finished with exit code 0

skip/skipLast

skip或者skipLast操作符,可以跳過若干個元素,或者跳過一段時間

Observable.just(1, 2, 3, 4, 5, 6, 7, 8, 9)
          .skip(5).subscribeWith(new CommonObserver<>());

輸出:

onNext 6
onNext 7
onNext 8
onNext 9
onComplete

Process finished with exit code 0

skipLast操作符

Observable.just(1, 2, 3, 4, 5, 6, 7, 8, 9)
          .skipLast(5).subscribeWith(new CommonObserver<>());

輸出:

onNext 1
onNext 2
onNext 3
onNext 4
onComplete

Process finished with exit code 0

ignoreElements

ignoreElements操作符會忽略所有的元素,回撥onError或者onComplete

Observable.just(1, 2, 3, 4, 5, 6, 7, 8, 9)
          .ignoreElements().subscribe(new Action() {
            @Override
            public void run() throws Exception {
                System.out.println("onComplete");
            }
        });

輸出:

onComplete

Process finished with exit code 0

distinct/distinctUntilChanged

distinct操作符主要用來過濾重複的元素

Observable.just(1, 2, 3, 1, 3, 1, 2)
          .distinct().subscribe(new CommonComsumer<>());

輸出:

accept 1
accept 2
accept 3

Process finished with exit code 0

distinctUntilChanged操作符主要用來過濾連續重複的元素

Observable.just(1, 2, 2, 1, 1, 6, 7)
          .distinctUntilChanged().subscribe(new CommonComsumer<>());

輸出:

accept 1
accept 2
accept 1
accept 6
accept 7

Process finished with exit code 0

timeout

timeout操作符主要用來進行超時過濾操作

Observable.intervalRange(0, 10, 0, 2, TimeUnit.SECONDS)
          .timeout(1, TimeUnit.SECONDS).subscribeWith(new CommonObserver<>());

執行1秒後超時丟擲異常
輸出:

onNext 0
onError java.util.concurrent.TimeoutException

Process finished with exit code 0

throttleFirst

throttleFirst操作符主要用來連續時間段內響應一次操作

Observable.intervalRange(0, 6, 0, 1, TimeUnit.SECONDS)
          //每秒中只處理第一個資料
          .throttleFirst(1, TimeUnit.SECONDS).subscribeWith(new CommonObserver<>());

輸出:

onNext 0
onNext 2
onNext 4
onComplete

Process finished with exit code 0

throttleLast/sample

throttleLast和sample操作符類似都是隔一段時間採集資料

Observable.intervalRange(0, 8, 0, 1, TimeUnit.SECONDS)
          //每秒中只處理第一個資料
          //.sample(2, TimeUnit.SECONDS).subscribeWith(new CommonObserver<>());
          .throttleLast(2, TimeUnit.SECONDS).subscribeWith(new CommonObserver<>());

輸出:

onNext 1
onNext 3
onNext 5
onComplete

Process finished with exit code 0

throttleWithTimeout/debounce

throttleWithTimeout/debounce操作符主要用來處理一段時間內不響應才輸出結果

Observable.intervalRange(0, 10, 0, 1, TimeUnit.SECONDS)
          //2秒內有新資料則拋棄舊資料
          //.debounce(2, TimeUnit.SECONDS);
         .throttleWithTimeout(2, TimeUnit.SECONDS).subscribeWith(new CommonObserver<>());

輸出:

onNext 9
onComplete

Process finished with exit code 0

複習文件
https://github.com/byhook/rxjava2-study
參考:
https://maxwell-nc.github.io/android/rxjava2-3.html