1. 程式人生 > >RxJava操作符(04-過濾操作)

RxJava操作符(04-過濾操作)

目錄:

“過濾操作”,顧名思義,就是過濾掉Observable發射的一些資料,不讓他發射出去,也就是忽略丟棄掉,至於需要過濾那些資料,就需要按照不同的規則,所以RxJava有一些列關於過濾的操作符,接下來看看RxJava中的過濾操作符:

1. Debounce

  Debounce操作符會過濾掉髮射速率過快的資料項, 僅在過了一段指定的時間還沒發射資料時才發射一個數據。RxJava將這個操作符實現為throttleWithTimeout和debounce

注意:這個操作符會接著最後一項資料發射原始Observable的onCompleted通知,即使這個通知發生在你指定的時間視窗內(從最後一項資料的發射算起)。也就是說,onCompleted通知不會觸發限流。

  • throttleWithTimeout
    根據你指定的時間間隔進行限流,時間單位通過TimeUnit引數指定。這種操作符預設在computation排程器上執行,但是你可以通過第三個引數指定。

  • debounce
    debounce操作符的一個變體通過對原始Observable的每一項應用一個函式進行限流,這個函式返回一個Observable。如果原始Observable在這個新生成的Observable終止之前發射了另一個數據,debounce會抑制(suppress)這個資料項。
    debounce的這個變體預設不在任何特定的排程器上執行。

    這裡寫圖片描述

示例程式碼:

Observable.create(new
Observable.OnSubscribe<Integer>() { @Override public void call(Subscriber<? super Integer> subscriber) { for (int i = 0; i < 10; i++) { int sleep = 100; if (i % 3 == 0) { sleep = 300; } try { Thread.sleep(sleep); } catch
(InterruptedException e) { e.printStackTrace(); } subscriber.onNext(i); } subscriber.onCompleted(); } }).subscribeOn(Schedulers.computation()) .throttleWithTimeout(200, TimeUnit.MILLISECONDS) .observeOn(AndroidSchedulers.mainThread()) .subscribe(new Subscriber<Integer>() { @Override public void onCompleted() { Log.d(TAG, "onCompleted:"); } @Override public void onError(Throwable e) { Log.d(TAG, "onError:"); } @Override public void onNext(Integer integer) { Log.d(TAG, "onNext:"+integer); } });

輸出:

onNext:2
onNext:5
onNext:8
onNext:9
onCompleted:

2.Distinct

  抑制(過濾掉)重複的資料項,distinct的過濾規則是:只允許還沒有發射過的資料項通過。
Distinct操作符有兩種,但一共有四種形式:

  • distinct():過濾掉所有資料中的重複資料

    這裡寫圖片描述

  • distinct(Func1): 這個操作符有一個變體接受一個函式函式。這個函式根據原始Observable發射的資料項產生一個Key,然後,比較這些Key而不是資料本身,來判定兩個資料是否是不同的。這跟上一篇的GroupBy有些類似,首先將原始資料根據不同的key分類,然後過濾掉所有key相同的資料

    這裡寫圖片描述

  • distinctUntilChanged: 它只判定一個數據和它的直接前驅是否是不同的,也就是說它只會過濾連續的重複資料

    這裡寫圖片描述

  • distinctUntilChanged(Func1): 和distinct(Func1)一樣,根據一個函式產生的Key判定兩個相鄰的資料項是不是不同的

    這裡寫圖片描述

示例程式碼:

//過濾所有的重複資料(比較原始資料)
Observable.just(1, 2, 1, 1, 2, 3)
        .distinct()
        .subscribe(new Action1<Integer>() {
            @Override
            public void call(Integer integer) {
                Log.d(TAG, "distinct:"+integer);
            }
        });
//過濾所有的重複資料(比較key)
Observable.just(1, 2, 1, 1, 2, 3)
        .distinct(new Func1<Integer, String>() {
            @Override
            public String call(Integer integer) {
                return integer%2==0?"偶":"奇";
            }
        }).subscribe(new Action1<Integer>() {
            @Override
            public void call(Integer integer) {
                Log.d(TAG, "distinct(Func1):"+integer);
            }
        });
//過濾連續的重複資料(比較原始資料)
Observable.just(1, 2, 1, 1, 2, 3)
        .distinctUntilChanged()
        .subscribe(new Action1<Integer>() {
            @Override
            public void call(Integer integer) {
                Log.d(TAG, "distinctUntilChanged:"+integer);
            }
        });
//過濾連續的重複資料(比較key)
Observable.just(1, 2, 1, 1, 2, 3)
        .distinctUntilChanged(new Func1<Integer, String>() {
            @Override
            public String call(Integer integer) {
                return integer%2==0?"偶":"奇";
            }
        }).subscribe(new Action1<Integer>() {
    @Override
    public void call(Integer integer) {
        Log.d(TAG, "distinctUntilChanged(Func1):"+integer);
    }
});

輸出:

distinct:1
distinct:2
distinct:3

distinct(Func1):1
distinct(Func1):2

distinctUntilChanged:1
distinctUntilChanged:2
distinctUntilChanged:1
distinctUntilChanged:2
distinctUntilChanged:3

distinctUntilChanged(Func1):1
distinctUntilChanged(Func1):2
distinctUntilChanged(Func1):1
distinctUntilChanged(Func1):2
distinctUntilChanged(Func1):3

3. ElementAt

  只發射第N項資料。elementAt和elementAtOrDefault預設不在任何特定的排程器上執行

  • elementAt(int):給它傳遞一個基於0的索引值,它會發射原始Observable資料序列對應索引位置的值,比如你傳遞給elementAt的值為5,那麼它會發射第六項的資料。如果你傳遞的是一個負數,或者原始Observable的資料項數小於index+1,將會丟擲一個IndexOutOfBoundsException異常。

    這裡寫圖片描述

  • elementAtOrDefault(int,T): 與elementAt的區別是,如果索引值大於資料項數,它會發射一個預設值(通過額外的引數指定),而不是丟擲異常。但是如果你傳遞一個負數索引值,它仍然會丟擲一個IndexOutOfBoundsException異常

    這裡寫圖片描述

示例程式碼:

Observable.just(1, 2, 3, 4, 5, 6, 7, 8, 9)
       .elementAt(5)     //只發射索引值為5(0開始)的資料
       .subscribe(new Action1<Integer>() {
            @Override
            public void call(Integer integer) {
                Log.d(TAG, "elementAt:"+integer);
        }
});

Observable.just(1, 2, 3, 4, 5, 6, 7, 8, 9)
        //只發射索引值為20(0開始)的資料,角標越界會發射預設值100
        .elementAtOrDefault(20, 100)
        .subscribe(new Action1<Integer>() {
            @Override
            public void call(Integer integer) {
                Log.d(TAG, "elementAtOrDefault:"+integer);
        }
});

輸出:

elementAt:6
elementAtOrDefault:100

4. Filter

  Filter操作符使用你指定的一個謂詞函式測試資料項,只有通過測試的資料才會被髮射。也就是說原始資料必須滿足我們給的限制條件,才能被髮射。 filter預設不在任何特定的排程器上執行。

  • filter(Func1):根據給定的Func1中的條件發射滿足條件的資料
        這裡寫圖片描述

  • ofType(Class): filter操作符的一個特殊形式。它過濾一個Observable只返回指定型別的資料。
        這裡寫圖片描述

示例程式碼:

Observable.just(1, 2, 3, 4, 5)
    .filter(new Func1<Integer, Boolean>() {
        @Override
        public Boolean call(Integer item) {
            //只發射小於4的整數
            return( item < 4 );
        }
    }).subscribe(new Action1<Integer>() {
        @Override
        public void call(Integer integer) {
            Log.d(TAG, "Next: " + integer);
        }
    });


Observable.just(new Person(), new Dog(), new Person(), new Dog(), new Dog())
        //只發射屬於人類的資料
        .ofType(Person.class)
        .subscribe(new Action1<Person>() {
            @Override
            public void call(Person person) {
                Log.d(TAG, "Next: " + person.getClass().getSimpleName());
            }
        });

輸出:

Next: 1
Next: 2
Next: 3
Next: Person
Next: Person

5. First

  如果你只對Observable發射的第一項資料,或者滿足某個條件的第一項資料感興趣,你可以使用First操作符。 first系列的這幾個操作符預設不在任何特定的排程器上執行。

  • first():只發射第一個資料

    這裡寫圖片描述

  • first(Func1): 傳遞一個謂詞函式給first,然後發射這個函式判定為true的第一項資料(發射第一個滿足條件的資料)

    這裡寫圖片描述

  • firstOrDefault(T): firstOrDefault與first類似,但是在Observagle沒有發射任何資料時發射一個你在引數中指定的預設值

  • firstOrDefault(T, Func1): 傳遞一個謂詞函式給firstOrDefault,然後發射這個函式判定為true的第一項資料,如果沒有資料通過條件測試就發射一個預設值。

  • takeFirst(Func1): takeFirst與first類似,除了這一點:如果原始Observable沒有發射任何滿足條件的資料,first會丟擲一個NoSuchElementException,takeFist會返回一個空的Observable(不呼叫onNext()但是會呼叫onCompleted)

  • single(): single操作符也與first類似,但是如果原始Observable在完成之前不是正好發射一次資料,它會丟擲一個NoSuchElementException

  • single(Func1):single的變體接受一個謂詞函式,發射滿足條件的單個值,如果不是正好只有一個數據項滿足條件,會以錯誤通知終止

  • singleOrDefault(T): 和firstOrDefault類似,但是如果原始Observable發射超過一個的資料,會以錯誤通知終止

  • singleOrDefault(Func1,T): 和firstOrDefault(T, Func1)類似,如果沒有資料滿足條件,返回預設值;如果有多個數據滿足條件,以錯誤通知終止。

示例程式碼:

//只發射第一個資料1
Observable.just(1, 2, 3)
    .first()
    .subscribe(new Action1<Integer>() {
        @Override
        public void call(Integer integer) {
            Log.d(TAG, "Next: " + integer);
        }
    });

Observable.just(1, 2, 3)
        .first(new Func1<Integer, Boolean>() {
            @Override
            public Boolean call(Integer integer) {
                //只發射第一個大於2的資料
                return integer>2;
            }
        }).subscribe(new Action1<Integer>() {
            @Override
            public void call(Integer integer) {
                Log.d(TAG, "Next: " + integer);
            }
        });

Observable.just(1, 2, 3)
        .firstOrDefault(10, new Func1<Integer, Boolean>() {
            @Override
            public Boolean call(Integer integer) {
                //只發射第一個大於9的資料,如果沒有傳送預設值10
                return integer>9;
            }
        }).subscribe(new Action1<Integer>() {
            @Override
            public void call(Integer integer) {
                Log.d(TAG, "Next: " + integer);
            }
        });

輸出:

Next: 1
Next: 3
Next: 10

6. Last

  只發射最後一項(或者滿足某個條件的最後一項)資料

  • last():只發射最後一項 資料

  • last(Func1): 接受一個謂詞函式,返回一個發射原始Observable中滿足條件的最後一項資料的Observable(發射滿足條件的最後一個數據)

  • lastOrDefault(T):lastOrDefault與last類似,不同的是,如果原始Observable沒有發射任何值,它發射你指定的預設值。

  • lastOrDefault(T, Func1): 接受一個謂詞函式,如果有資料滿足條件,返回的Observable就發射原始Observable滿足條件的最後一項資料,否則發射預設值

示例程式碼:

//只發射最後一個數據
Observable.just(1, 2, 3)
        .last()
        .subscribe(new Action1<Integer>() {
            @Override
            public void call(Integer integer) {
                Log.d(TAG, "Next: " + integer);
            }
        });

Observable.just(1, 2, 3)
        .last(new Func1<Integer, Boolean>() {
            @Override
            public Boolean call(Integer integer) {
                //只發射大於等於2的最後一個數據
                return integer>=2;
            }
        }).subscribe(new Action1<Integer>() {
    @Override
    public void call(Integer integer) {
        Log.d(TAG, "Next: " + integer);
    }
});

Observable.just(1, 2, 3)
        .lastOrDefault(10, new Func1<Integer, Boolean>() {
            @Override
            public Boolean call(Integer integer) {
                //只發射大於9的最後一個數據,如果沒有傳送預設值10
                return integer>9;
            }
        }).subscribe(new Action1<Integer>() {
    @Override
    public void call(Integer integer) {
        Log.d(TAG, "Next: " + integer);
    }
});

輸出:

Next: 3
Next: 3
Next: 10

7. IgnoreElements

  如果你不關心一個Observable發射的資料,但是希望在它完成時或遇到錯誤終止時收到通知,你可以對Observable使用ignoreElements操作符,它會確保永遠不會呼叫觀察者的onNext()方法。IgnoreElements操作符忽略原始Observable發射的所有資料,只允許它的終止通知(onError或onCompleted)通過。 ignoreElements預設不在任何特定的排程器上執行。

示例程式碼:

//只會呼叫onCompleted或者onError
Observable.just(1, 2, 3)
        .ignoreElements()
        .subscribe(new Subscriber<Integer>() {
            @Override
            public void onCompleted() {
                Log.d(TAG, "onCompleted");
            }
            @Override
            public void onError(Throwable e) {
                Log.d(TAG, "onError:"+ e.getMessage());
            }
            @Override
            public void onNext(Integer integer) {
                Log.d(TAG, "onNext:"+integer);
            }
        });

輸出:

onCompleted

8. Sample/ThrottleFirst

  Sample (別名throttleLast)操作符定時檢視一個Observable,然後發射自上次取樣以來它最近發射的資料。
  ThrottleFirst操作符的功能類似,但不是發射取樣期間的最近的資料,而是發射在那段時間內的第一項資料。
  注意:如果自上次取樣以來,原始Observable沒有發射任何資料,這個操作返回的Observable在那段時間內也不會發射任何資料。

    這裡寫圖片描述

示例程式碼:

Observable.create(new Observable.OnSubscribe<Integer>() {
    @Override
    public void call(Subscriber<? super Integer> subscriber) {
        for (int i = 0; i <= 10; i++) {
            try {
                Thread.sleep(100);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            subscriber.onNext(i);
        }
        subscriber.onCompleted();
    }
}).sample(300, TimeUnit.MILLISECONDS)
        .subscribe(new Action1<Integer>() {
            @Override
            public void call(Integer integer) {
                Log.d(TAG, "sample: " + integer);
            }
        });

Observable.create(new Observable.OnSubscribe<Integer>() {
    @Override
    public void call(Subscriber<? super Integer> subscriber) {
        for (int i = 0; i <= 10; i++) {
            try {
                Thread.sleep(100);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            subscriber.onNext(i);
        }
        subscriber.onCompleted();
    }
}).throttleFirst(300, TimeUnit.MILLISECONDS)
        .subscribe(new Action1<Integer>() {
            @Override
            public void call(Integer integer) {
                Log.d(TAG, "throttleFirst: " + integer);
            }
        });

輸出:

sample: 1
sample: 4
sample: 7
sample: 10

throttleFirst: 0
throttleFirst: 3
throttleFirst: 6
throttleFirst: 9

9. Skip/SkipLast

  • skip(int):使用Skip操作符,你可以忽略Observable發射的前N項資料,只保留之後的資料 。

    這裡寫圖片描述

  • skipLast(int): 忽略原始Observable發射的後N項資料,只保留之前的資料。注意:這個機制是這樣實現的:延遲原始Observable發射的任何資料項,直到過了發射了N項資料的時間,再開始傳送資料,這樣後面N項資料就沒有時間發射了。

    這裡寫圖片描述

示例程式碼:

Observable.just(0, 1, 2, 3, 4, 5)
        .skip(3)
        .subscribe(new Action1<Integer>() {
            @Override
            public void call(Integer integer) {
                Log.d(TAG, "skip(int): " + integer);
            }
        });

//捨棄掉前1000ms內發射的資料,保留後面發射的資料
Observable.interval(100, TimeUnit.MILLISECONDS)
        .skip(1000, TimeUnit.MILLISECONDS)
        .take(5)   //發射5條資料
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe(new Subscriber<Long>() {
            @Override
            public void onCompleted() {
                Log.d(TAG,"onCompleted" );
            }
            @Override
            public void onError(Throwable e) {
                Log.d(TAG,"onError:" + e.getMessage());
            }
            @Override
            public void onNext(Long aLong) {
                Log.d(TAG,"skip(long, TimeUnit):" + aLong);
            }
        });

Observable.just(0, 1, 2, 3, 4, 5)
        .skipLast(4)
        .subscribe(new Action1<Integer>() {
            @Override
            public void call(Integer integer) {
                Log.d(TAG, "skipLast(int): " + integer);
            }
        });

輸出:

skip(int): 3
skip(int): 4
skip(int): 5

skip(long, TimeUnit):9
skip(long, TimeUnit):10
skip(long, TimeUnit):11
skip(long, TimeUnit):12
skip(long, TimeUnit):13
onCompleted

skipLast(int): 0
skipLast(int): 1

10. Take/TakeLast

  • take(int):只發射前面的N項資料,然後發射完成通知,忽略剩餘的資料。
    注意: 如果你對一個Observable使用take(n)(或它的同義詞limit(n))操作符,而那個Observable發射的資料少於N項,那麼take操作生成的Observable不會拋異常或發射onError通知,在完成前它只會發射相同的少量資料

    這裡寫圖片描述

  • takeLast(int): 只發射原始Observable發射的後N項資料,忽略之前的資料。
    注意:這會延遲原始Observable發射的任何資料項,直到它全部完成。

    這裡寫圖片描述

  • takeLastBuffer: 它和takeLast類似,唯一的不同是它把所有的資料項收集到一個List再發射,而不是依次發射一個

    這裡寫圖片描述

示例程式碼:

//只發射前面3個數據
Observable.just(1, 2, 3, 4, 5, 6, 7, 8)
        .take(3)
        .subscribe(new Action1<Integer>() {
            @Override
            public void call(Integer integer) {
                Log.d(TAG,"take(int): " + integer);
            }
        });
//只發射後面3個數據
Observable.just(1, 2, 3, 4, 5, 6, 7, 8)
        .takeLast(3)
        .subscribe(new Action1<Integer>() {
            @Override
            public void call(Integer integer) {
                Log.d(TAG,"takeLast(int): " + integer);
            }
        });
//只發射後面3個數據
Observable.just(1, 2, 3, 4, 5, 6, 7, 8)
        .takeLastBuffer(3)
        .subscribe(new Action1<List<Integer>>() {
            @Override
            public void call(List<Integer> integers) {
                Log.d(TAG,"takeLastBuffer(int): " + integers);
            }
        });

輸出:

take(int): 1
take(int): 2
take(int): 3
takeLast(int): 6
takeLast(int): 7
takeLast(int): 8
takeLastBuffer(int): [6, 7, 8]



有問題請留言,有幫助請點贊(^__^)

原始碼下載: