1. 程式人生 > >RxJava 過濾操作符(Filtering Observables Operators)

RxJava 過濾操作符(Filtering Observables Operators)

RxJava系列教程:

“過濾操作”,顧名思義,就是過濾掉Observable發射的一些資料,不讓他發射出去,也就是忽略丟棄掉,至於需要過濾那些資料,就需要按照不同的規則,所以RxJava有一些列關於過濾的操作符,接下來看看RxJava中的過濾操作符。

Debounce

Debounce操作符會過濾掉髮射速率過快的資料項, 僅在過了一段指定的時間還沒發射資料時才發射一個數據。RxJava將這個操作符實現為throttleWithTimeout和debounce。

注意:這個操作符會接著最後一項資料發射原始Observable的onCompleted通知,即使這個通知發生在你指定的時間視窗內(從最後一項資料的發射算起)。也就是說,onCompleted通知不會觸發限流。

  • throttleWithTimeout
    根據你指定的時間間隔進行限流,時間單位通過TimeUnit引數指定。這種操作符預設在computation排程器上執行,但是你可以通過第三個引數指定。

  • debounce
    debounce操作符的一個變體通過對原始Observable的每一項應用一個函式進行限流,這個函式返回一個Observable。如果原始Observable在這個新生成的Observable終止之前發射了另一個數據,debounce會抑制(suppress)這個資料項。
    debounce的這個變體預設不在任何特定的排程器上執行。

這裡寫圖片描述

示例程式碼:

Observable.create(new
Observable.OnSubscribe<Integer>() { @Override public void call(Subscriber<? super Integer> subscriber) { for (int i = 0; i < 10; i++) { int sleep = 100; if (i % 3 == 0) { sleep = 300; } try { Thread.sleep(sleep); } catch
(InterruptedException e) { e.printStackTrace(); } subscriber.onNext(i); } subscriber.onCompleted(); } }).subscribeOn(Schedulers.computation()) .throttleWithTimeout(200, TimeUnit.MILLISECONDS) .observeOn(AndroidSchedulers.mainThread()) .subscribe(new Subscriber<Integer>() { @Override public void onCompleted() { Log.d(TAG, "onCompleted"); } @Override public void onError(Throwable e) { Log.d(TAG, "onError"); } @Override public void onNext(Integer integer) { Log.d(TAG, "onNext:"+integer); } });

輸出:

onNext:2 
onNext:5 
onNext:8 
onNext:9 
onCompleted

Distinct

過濾掉重複的資料項,distinct的過濾規則是:只允許還沒有發射過的資料項通過。
Distinct操作符有兩種,但一共有四種形式:

  • distinct():過濾掉所有資料中的重複資料
    這裡寫圖片描述

distinct(Func1): 這個操作符有一個變體接受一個函式函式。這個函式根據原始Observable發射的資料項產生一個Key,然後,比較這些Key而不是資料本身,來判定兩個資料是否是不同的。這跟上一篇的GroupBy有些類似,首先將原始資料根據不同的key分類,然後過濾掉所有key相同的資料

這裡寫圖片描述

distinctUntilChanged: 它只判定一個數據和它的直接前驅是否是不同的,也就是說它只會過濾連續的重複資料

這裡寫圖片描述

distinctUntilChanged(Func1): 和distinct(Func1)一樣,根據一個函式產生的Key判定兩個相鄰的資料項是不是不同的

這裡寫圖片描述

示例程式碼:

//過濾所有的重複資料(比較原始資料)
Observable.just(1, 2, 4, 1, 3, 5)
        .distinct()
        .subscribe(new Action1<Integer>() {

            @Override
            public void call(Integer value) {
                System.out.println("value = " + value);             
            }
        });

輸出:

value = 1
value = 2
value = 4
value = 3
value = 5

ElementAt

只發射第N項資料。elementAt和elementAtOrDefault預設不在任何特定的排程器上執行。

  • elementAt(int):給它傳遞一個基於0的索引值,它會發射原始Observable資料序列對應索引位置的值,比如你傳遞給elementAt的值為5,那麼它會發射第六項的資料。如果你傳遞的是一個負數,或者原始Observable的資料項數小於index+1,將會丟擲一個IndexOutOfBoundsException異常。
    elementAt

-elementAtOrDefault(int,T): 與elementAt的區別是,如果索引值大於資料項數,它會發射一個預設值(通過額外的引數指定),而不是丟擲異常。但是如果你傳遞一個負數索引值,它仍然會丟擲一個IndexOutOfBoundsException異常。 
這裡寫圖片描述
示例程式碼:

Observable.just(1, 2, 3, 4, 5, 6, 7, 8, 9)
       .elementAt(5)     //只發射索引值為5(0開始)的資料
       .subscribe(new Action1<Integer>() {
            @Override
            public void call(Integer integer) {
                Log.d(TAG, "elementAt:"+integer);
        }
});

Observable.just(1, 2, 3, 4, 5, 6, 7, 8, 9)
        //只發射索引值為20(0開始)的資料,角標越界會發射預設值100
        .elementAtOrDefault(20, 100)
        .subscribe(new Action1<Integer>() {
            @Override
            public void call(Integer integer) {
                Log.d(TAG, "elementAtOrDefault:"+integer);
        }
});

輸出:

elementAt:6 
elementAtOrDefault:100

Filter

Filter操作符使用你指定的一個謂詞函式測試資料項,只有通過測試的資料才會被髮射。也就是說原始資料必須滿足我們給的限制條件,才能被髮射。 filter預設不在任何特定的排程器上執行。

  • filter(Func1):根據給定的Func1中的條件發射滿足條件的資料
       這裡寫圖片描述

  • ofType(Class): filter操作符的一個特殊形式。它過濾一個Observable只返回指定型別的資料。
       這裡寫圖片描述

示例程式碼:

Observable.from(studentList)
        .filter(new Func1<Student, Boolean>() {

            @Override
            public Boolean call(Student t) {
                return t.age>25;
            }
        })
        .subscribe(new Action1<Student>() {

            @Override
            public void call(Student student) {
                System.out.println("student = " + student);         
            }
        });

Observable.just("a", 2, 3.0).ofType(String.class)
        .subscribe(new Action1<String>() { 
              @Override public void call(String value) { 
                  System.out.println("ofType value = " + value);    
              }
            });

static List<Student> studentList = new ArrayList<Student>(){
        {
            add(new Student("Stay", 28));
            add(new Student("谷歌小弟", 23));
            add(new Student("Star", 25));
        }
    };

執行結果如下:

student = Student [name=Stay, age=28]

ofType value = a

First

  如果你只對Observable發射的第一項資料,或者滿足某個條件的第一項資料感興趣,你可以使用First操作符。 first系列的這幾個操作符預設不在任何特定的排程器上執行。

  • first():只發射第一個資料
    first

  • first(Func1): 傳遞一個謂詞函式給first,然後發射這個函式判定為true的第一項資料(發射第一個滿足條件的資料)
    first

  • firstOrDefault(T): firstOrDefault與first類似,但是在Observagle沒有發射任何資料時發射一個你在引數中指定的預設值

firstOrDefaultN
- firstOrDefault(T, Func1): 傳遞一個謂詞函式給firstOrDefault,然後發射這個函式判定為true的第一項資料,如果沒有資料通過條件測試就發射一個預設值。

  • takeFirst(Func1): takeFirst與first類似,除了這一點:如果原始Observable沒有發射任何滿足條件的資料,first會丟擲一個NoSuchElementException,takeFist會返回一個空的Observable(不呼叫onNext()但是會呼叫onCompleted)

  • single(): single操作符也與first類似,但是如果原始Observable在完成之前不是正好發射一次資料,它會丟擲一個NoSuchElementException

single
- single(Func1):single的變體接受一個謂詞函式,發射滿足條件的單個值,如果不是正好只有一個數據項滿足條件,會以錯誤通知終止

single

  • singleOrDefault(T): 和firstOrDefault類似,但是如果原始Observable發射超過一個的資料,會以錯誤通知終止

single
- singleOrDefault(Func1,T): 和firstOrDefault(T, Func1)類似,如果沒有資料滿足條件,返回預設值;如果有多個數據滿足條件,以錯誤通知終止。

single
示例程式碼:

//只發射第一個資料1
Observable.just(1, 2, 3)
    .first()
    .subscribe(new Action1<Integer>() {
        @Override
        public void call(Integer integer) {
            Log.d(TAG, "Next1: " + integer);
        }
    });

Observable.just(1, 2, 3)
        .first(new Func1<Integer, Boolean>() {
            @Override
            public Boolean call(Integer integer) {
                //只發射第一個大於2的資料
                return integer>2;
            }
        }).subscribe(new Action1<Integer>() {
            @Override
            public void call(Integer integer) {
                Log.d(TAG, "Next2: " + integer);
            }
        });

Observable.just(1, 2, 3)
        .firstOrDefault(10, new Func1<Integer, Boolean>() {
            @Override
            public Boolean call(Integer integer) {
                //只發射第一個大於9的資料,如果沒有傳送預設值10
                return integer>9;
            }
        }).subscribe(new Action1<Integer>() {
            @Override
            public void call(Integer integer) {
                Log.d(TAG, "Next3: " + integer);
            }
        });

輸出:

Next1: 1 

Next2: 3 

Next3: 10

Last

只發射最後一項(或者滿足某個條件的最後一項)資料。

  • last():只發射最後一項 資料

  • last(Func1): 接受一個謂詞函式,返回一個發射原始Observable中滿足條件的最後一項資料的Observable(發射滿足條件的最後一個數據)

  • lastOrDefault(T):lastOrDefault與last類似,不同的是,如果原始Observable沒有發射任何值,它發射你指定的預設值。

  • lastOrDefault(T, Func1): 接受一個謂詞函式,如果有資料滿足條件,返回的Observable就發射原始Observable滿足條件的最後一項資料,否則發射預設值

示例程式碼:

//只發射最後一個數據
Observable.just(1, 2, 3)
        .last()
        .subscribe(new Action1<Integer>() {
            @Override
            public void call(Integer integer) {
                Log.d(TAG, "Next1: " + integer);
            }
        });

Observable.just(1, 2, 3)
        .last(new Func1<Integer, Boolean>() {
            @Override
            public Boolean call(Integer integer) {
                //只發射大於等於2的最後一個數據
                return integer>=2;
            }
        }).subscribe(new Action1<Integer>() {
    @Override
    public void call(Integer integer) {
        Log.d(TAG, "Next2: " + integer);
    }
});

Observable.just(1, 2, 3)
        .lastOrDefault(10, new Func1<Integer, Boolean>() {
            @Override
            public Boolean call(Integer integer) {
                //只發射大於9的最後一個數據,如果沒有傳送預設值10
                return integer>9;
            }
        }).subscribe(new Action1<Integer>() {
    @Override
    public void call(Integer integer) {
        Log.d(TAG, "Next3: " + integer);
    }
});

輸出:

Next1: 3 
Next2: 3 
Next3: 10

IgnoreElements

如果你不關心一個Observable發射的資料,但是希望在它完成時或遇到錯誤終止時收到通知,你可以對Observable使用ignoreElements操作符,它會確保永遠不會呼叫觀察者的onNext()方法。IgnoreElements操作符忽略原始Observable發射的所有資料,只允許它的終止通知(onError或onCompleted)通過。 ignoreElements預設不在任何特定的排程器上執行。
這裡寫圖片描述
示例程式碼:

//只會呼叫onCompleted或者onError
Observable.just(1, 2, 3)
        .ignoreElements()
        .subscribe(new Subscriber<Integer>() {
            @Override
            public void onCompleted() {
                Log.d(TAG, "onCompleted");
            }
            @Override
            public void onError(Throwable e) {
                Log.d(TAG, "onError:"+ e.getMessage());
            }
            @Override
            public void onNext(Integer integer) {
                Log.d(TAG, "onNext:"+integer);
            }
        });

執行結果如下:

onCompleted

Sample/ThrottleFirst

Sample (別名throttleLast)操作符定時檢視一個Observable,然後發射自上次取樣以來它最近發射的資料。
ThrottleFirst操作符的功能類似,但不是發射取樣期間的最近的資料,而是發射在那段時間內的第一項資料。
    

注意:如果自上次取樣以來,原始Observable沒有發射任何資料,這個操作返回的Observable在那段時間內也不會發射任何資料。

這裡寫圖片描述

這裡寫圖片描述

示例程式碼:

Observable.create(new Observable.OnSubscribe<Integer>() {
    @Override
    public void call(Subscriber<? super Integer> subscriber) {
        for (int i = 0; i <= 10; i++) {
            try {
                Thread.sleep(100);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            subscriber.onNext(i);
        }
        subscriber.onCompleted();
    }
}).sample(300, TimeUnit.MILLISECONDS)
        .subscribe(new Action1<Integer>() {
            @Override
            public void call(Integer integer) {
                Log.d(TAG, "sample: " + integer);
            }
        });

Observable.create(new Observable.OnSubscribe<Integer>() {
    @Override
    public void call(Subscriber<? super Integer> subscriber) {
        for (int i = 0; i <= 10; i++) {
            try {
                Thread.sleep(100);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            subscriber.onNext(i);
        }
        subscriber.onCompleted();
    }
}).throttleFirst(300, TimeUnit.MILLISECONDS)
        .subscribe(new Action1<Integer>() {
            @Override
            public void call(Integer integer) {
                Log.d(TAG, "throttleFirst: " + integer);
            }
        });

執行結果如下:

sample: 1 
sample: 4 
sample: 7 
sample: 10

throttleFirst: 0 
throttleFirst: 3 
throttleFirst: 6 
throttleFirst: 9

Skip/SkipLast

  • skip(int):使用Skip操作符,你可以忽略Observable發射的前N項資料,只保留之後的資料 。

這裡寫圖片描述

  • skipLast(int): 忽略原始Observable發射的後N項資料,只保留之前的資料。注意:這個機制是這樣實現的:延遲原始Observable發射的任何資料項,直到過了發射了N項資料的時間,再開始傳送資料,這樣後面N項資料就沒有時間發射了。

skipLast
示例程式碼:

Observable.just(0, 1, 2, 3, 4, 5)
        .skip(3)
        .subscribe(new Action1<Integer>() {
            @Override
            public void call(Integer integer) {
                Log.d(TAG, "skip(int): " + integer);
            }
        });

//捨棄掉前1000ms內發射的資料,保留後面發射的資料
Observable.interval(100, TimeUnit.MILLISECONDS)
        .skip(1000, TimeUnit.MILLISECONDS)
        .take(5)   //發射5條資料
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe(new Subscriber<Long>() {
            @Override
            public void onCompleted() {
                Log.d(TAG,"onCompleted" );
            }
            @Override
            public void onError(Throwable e) {
                Log.d(TAG,"onError:" + e.getMessage());
            }
            @Override
            public void onNext(Long aLong) {
                Log.d(TAG,"skip(long, TimeUnit):" + aLong);
            }
        });

Observable.just(0, 1, 2, 3, 4, 5)
        .skipLast(4)
        .subscribe(new Action1<Integer>() {
            @Override
            public void call(Integer integer) {
                Log.d(TAG, "skipLast(int): " + integer);
            }
        });

輸出:

skip(int): 3 
skip(int): 4 
skip(int): 5

skip(long, TimeUnit):9 
skip(long, TimeUnit):10 
skip(long, TimeUnit):11 
skip(long, TimeUnit):12 
skip(long, TimeUnit):13 
onCompleted

skipLast(int): 0 
skipLast(int): 1

Take/TakeLast

  • take(int):只發射前面的N項資料,然後發射完成通知,忽略剩餘的資料。
    注意: 如果你對一個Observable使用take(n)(或它的同義詞limit(n))操作符,而那個Observable發射的資料少於N項,那麼take操作生成的Observable不會拋異常或發射onError通知,在完成前它只會發射相同的少量資料。

這裡寫圖片描述
- takeLast(int): 只發射原始Observable發射的後N項資料,忽略之前的資料。
注意:這會延遲原始Observable發射的任何資料項,直到它全部完成。

這裡寫圖片描述

  • takeLastBuffer: 它和takeLast類似,唯一的不同是它把所有的資料項收集到一個List再發射,而不是依次發射一個。

這裡寫圖片描述

示例程式碼:

//只發射前面3個數據
Observable.just(1, 2, 3, 4, 5, 6, 7, 8)
        .take(3)
        .subscribe(new Action1<Integer>() {
            @Override
            public void call(Integer integer) {
                Log.d(TAG,"take(int): " + integer);
            }
        });
//只發射後面3個數據
Observable.just(1, 2, 3, 4, 5, 6, 7, 8)
        .takeLast(3)
        .subscribe(new Action1<Integer>() {
            @Override
            public void call(Integer integer) {
                Log.d(TAG,"takeLast(int): " + integer);
            }
        });
//只發射後面3個數據
Observable.just(1, 2, 3, 4, 5, 6, 7, 8)
        .takeLastBuffer(3)
        .subscribe(new Action1<List<Integer>>() {
            @Override
            public void call(List<Integer> integers) {
                Log.d(TAG,"takeLastBuffer(int): " + integers);
            }
        });

輸出:

take(int): 1 
take(int): 2 
take(int): 3 
takeLast(int): 6 
takeLast(int): 7 
takeLast(int): 8 
takeLastBuffer(int): [6, 7, 8]