RxJava 過濾操作符(Filtering Observables Operators)
RxJava系列教程:
“過濾操作”,顧名思義,就是過濾掉Observable發射的一些資料,不讓他發射出去,也就是忽略丟棄掉,至於需要過濾那些資料,就需要按照不同的規則,所以RxJava有一些列關於過濾的操作符,接下來看看RxJava中的過濾操作符。
Debounce
Debounce操作符會過濾掉髮射速率過快的資料項, 僅在過了一段指定的時間還沒發射資料時才發射一個數據。RxJava將這個操作符實現為throttleWithTimeout和debounce。
注意:這個操作符會接著最後一項資料發射原始Observable的onCompleted通知,即使這個通知發生在你指定的時間視窗內(從最後一項資料的發射算起)。也就是說,onCompleted通知不會觸發限流。
throttleWithTimeout
根據你指定的時間間隔進行限流,時間單位通過TimeUnit引數指定。這種操作符預設在computation排程器上執行,但是你可以通過第三個引數指定。debounce
debounce操作符的一個變體通過對原始Observable的每一項應用一個函式進行限流,這個函式返回一個Observable。如果原始Observable在這個新生成的Observable終止之前發射了另一個數據,debounce會抑制(suppress)這個資料項。
debounce的這個變體預設不在任何特定的排程器上執行。
示例程式碼:
Observable.create(new Observable.OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> subscriber) {
for (int i = 0; i < 10; i++) {
int sleep = 100;
if (i % 3 == 0) {
sleep = 300;
}
try {
Thread.sleep(sleep);
} catch (InterruptedException e) {
e.printStackTrace();
}
subscriber.onNext(i);
}
subscriber.onCompleted();
}
}).subscribeOn(Schedulers.computation())
.throttleWithTimeout(200, TimeUnit.MILLISECONDS)
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Subscriber<Integer>() {
@Override
public void onCompleted() {
Log.d(TAG, "onCompleted");
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "onError");
}
@Override
public void onNext(Integer integer) {
Log.d(TAG, "onNext:"+integer);
}
});
輸出:
onNext:2
onNext:5
onNext:8
onNext:9
onCompleted
Distinct
過濾掉重複的資料項,distinct的過濾規則是:只允許還沒有發射過的資料項通過。
Distinct操作符有兩種,但一共有四種形式:
- distinct():過濾掉所有資料中的重複資料
distinct(Func1): 這個操作符有一個變體接受一個函式函式。這個函式根據原始Observable發射的資料項產生一個Key,然後,比較這些Key而不是資料本身,來判定兩個資料是否是不同的。這跟上一篇的GroupBy有些類似,首先將原始資料根據不同的key分類,然後過濾掉所有key相同的資料
distinctUntilChanged: 它只判定一個數據和它的直接前驅是否是不同的,也就是說它只會過濾連續的重複資料
distinctUntilChanged(Func1): 和distinct(Func1)一樣,根據一個函式產生的Key判定兩個相鄰的資料項是不是不同的
示例程式碼:
//過濾所有的重複資料(比較原始資料)
Observable.just(1, 2, 4, 1, 3, 5)
.distinct()
.subscribe(new Action1<Integer>() {
@Override
public void call(Integer value) {
System.out.println("value = " + value);
}
});
輸出:
value = 1
value = 2
value = 4
value = 3
value = 5
ElementAt
只發射第N項資料。elementAt和elementAtOrDefault預設不在任何特定的排程器上執行。
- elementAt(int):給它傳遞一個基於0的索引值,它會發射原始Observable資料序列對應索引位置的值,比如你傳遞給elementAt的值為5,那麼它會發射第六項的資料。如果你傳遞的是一個負數,或者原始Observable的資料項數小於index+1,將會丟擲一個IndexOutOfBoundsException異常。
-elementAtOrDefault(int,T): 與elementAt的區別是,如果索引值大於資料項數,它會發射一個預設值(通過額外的引數指定),而不是丟擲異常。但是如果你傳遞一個負數索引值,它仍然會丟擲一個IndexOutOfBoundsException異常。
示例程式碼:
Observable.just(1, 2, 3, 4, 5, 6, 7, 8, 9)
.elementAt(5) //只發射索引值為5(0開始)的資料
.subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
Log.d(TAG, "elementAt:"+integer);
}
});
Observable.just(1, 2, 3, 4, 5, 6, 7, 8, 9)
//只發射索引值為20(0開始)的資料,角標越界會發射預設值100
.elementAtOrDefault(20, 100)
.subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
Log.d(TAG, "elementAtOrDefault:"+integer);
}
});
輸出:
elementAt:6
elementAtOrDefault:100
Filter
Filter操作符使用你指定的一個謂詞函式測試資料項,只有通過測試的資料才會被髮射。也就是說原始資料必須滿足我們給的限制條件,才能被髮射。 filter預設不在任何特定的排程器上執行。
filter(Func1):根據給定的Func1中的條件發射滿足條件的資料
ofType(Class): filter操作符的一個特殊形式。它過濾一個Observable只返回指定型別的資料。
示例程式碼:
Observable.from(studentList)
.filter(new Func1<Student, Boolean>() {
@Override
public Boolean call(Student t) {
return t.age>25;
}
})
.subscribe(new Action1<Student>() {
@Override
public void call(Student student) {
System.out.println("student = " + student);
}
});
Observable.just("a", 2, 3.0).ofType(String.class)
.subscribe(new Action1<String>() {
@Override public void call(String value) {
System.out.println("ofType value = " + value);
}
});
static List<Student> studentList = new ArrayList<Student>(){
{
add(new Student("Stay", 28));
add(new Student("谷歌小弟", 23));
add(new Student("Star", 25));
}
};
執行結果如下:
student = Student [name=Stay, age=28]
ofType value = a
First
如果你只對Observable發射的第一項資料,或者滿足某個條件的第一項資料感興趣,你可以使用First操作符。 first系列的這幾個操作符預設不在任何特定的排程器上執行。
first():只發射第一個資料
first(Func1): 傳遞一個謂詞函式給first,然後發射這個函式判定為true的第一項資料(發射第一個滿足條件的資料)
firstOrDefault(T): firstOrDefault與first類似,但是在Observagle沒有發射任何資料時發射一個你在引數中指定的預設值
- firstOrDefault(T, Func1): 傳遞一個謂詞函式給firstOrDefault,然後發射這個函式判定為true的第一項資料,如果沒有資料通過條件測試就發射一個預設值。
takeFirst(Func1): takeFirst與first類似,除了這一點:如果原始Observable沒有發射任何滿足條件的資料,first會丟擲一個NoSuchElementException,takeFist會返回一個空的Observable(不呼叫onNext()但是會呼叫onCompleted)
single(): single操作符也與first類似,但是如果原始Observable在完成之前不是正好發射一次資料,它會丟擲一個NoSuchElementException
- single(Func1):single的變體接受一個謂詞函式,發射滿足條件的單個值,如果不是正好只有一個數據項滿足條件,會以錯誤通知終止
- singleOrDefault(T): 和firstOrDefault類似,但是如果原始Observable發射超過一個的資料,會以錯誤通知終止
- singleOrDefault(Func1,T): 和firstOrDefault(T, Func1)類似,如果沒有資料滿足條件,返回預設值;如果有多個數據滿足條件,以錯誤通知終止。
示例程式碼:
//只發射第一個資料1
Observable.just(1, 2, 3)
.first()
.subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
Log.d(TAG, "Next1: " + integer);
}
});
Observable.just(1, 2, 3)
.first(new Func1<Integer, Boolean>() {
@Override
public Boolean call(Integer integer) {
//只發射第一個大於2的資料
return integer>2;
}
}).subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
Log.d(TAG, "Next2: " + integer);
}
});
Observable.just(1, 2, 3)
.firstOrDefault(10, new Func1<Integer, Boolean>() {
@Override
public Boolean call(Integer integer) {
//只發射第一個大於9的資料,如果沒有傳送預設值10
return integer>9;
}
}).subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
Log.d(TAG, "Next3: " + integer);
}
});
輸出:
Next1: 1
Next2: 3
Next3: 10
Last
只發射最後一項(或者滿足某個條件的最後一項)資料。
last():只發射最後一項 資料
last(Func1): 接受一個謂詞函式,返回一個發射原始Observable中滿足條件的最後一項資料的Observable(發射滿足條件的最後一個數據)
lastOrDefault(T):lastOrDefault與last類似,不同的是,如果原始Observable沒有發射任何值,它發射你指定的預設值。
lastOrDefault(T, Func1): 接受一個謂詞函式,如果有資料滿足條件,返回的Observable就發射原始Observable滿足條件的最後一項資料,否則發射預設值
示例程式碼:
//只發射最後一個數據
Observable.just(1, 2, 3)
.last()
.subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
Log.d(TAG, "Next1: " + integer);
}
});
Observable.just(1, 2, 3)
.last(new Func1<Integer, Boolean>() {
@Override
public Boolean call(Integer integer) {
//只發射大於等於2的最後一個數據
return integer>=2;
}
}).subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
Log.d(TAG, "Next2: " + integer);
}
});
Observable.just(1, 2, 3)
.lastOrDefault(10, new Func1<Integer, Boolean>() {
@Override
public Boolean call(Integer integer) {
//只發射大於9的最後一個數據,如果沒有傳送預設值10
return integer>9;
}
}).subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
Log.d(TAG, "Next3: " + integer);
}
});
輸出:
Next1: 3
Next2: 3
Next3: 10
IgnoreElements
如果你不關心一個Observable發射的資料,但是希望在它完成時或遇到錯誤終止時收到通知,你可以對Observable使用ignoreElements操作符,它會確保永遠不會呼叫觀察者的onNext()方法。IgnoreElements操作符忽略原始Observable發射的所有資料,只允許它的終止通知(onError或onCompleted)通過。 ignoreElements預設不在任何特定的排程器上執行。
示例程式碼:
//只會呼叫onCompleted或者onError
Observable.just(1, 2, 3)
.ignoreElements()
.subscribe(new Subscriber<Integer>() {
@Override
public void onCompleted() {
Log.d(TAG, "onCompleted");
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "onError:"+ e.getMessage());
}
@Override
public void onNext(Integer integer) {
Log.d(TAG, "onNext:"+integer);
}
});
執行結果如下:
onCompleted
Sample/ThrottleFirst
Sample (別名throttleLast)操作符定時檢視一個Observable,然後發射自上次取樣以來它最近發射的資料。
ThrottleFirst操作符的功能類似,但不是發射取樣期間的最近的資料,而是發射在那段時間內的第一項資料。
注意:如果自上次取樣以來,原始Observable沒有發射任何資料,這個操作返回的Observable在那段時間內也不會發射任何資料。
示例程式碼:
Observable.create(new Observable.OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> subscriber) {
for (int i = 0; i <= 10; i++) {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
subscriber.onNext(i);
}
subscriber.onCompleted();
}
}).sample(300, TimeUnit.MILLISECONDS)
.subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
Log.d(TAG, "sample: " + integer);
}
});
Observable.create(new Observable.OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> subscriber) {
for (int i = 0; i <= 10; i++) {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
subscriber.onNext(i);
}
subscriber.onCompleted();
}
}).throttleFirst(300, TimeUnit.MILLISECONDS)
.subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
Log.d(TAG, "throttleFirst: " + integer);
}
});
執行結果如下:
sample: 1
sample: 4
sample: 7
sample: 10
throttleFirst: 0
throttleFirst: 3
throttleFirst: 6
throttleFirst: 9
Skip/SkipLast
- skip(int):使用Skip操作符,你可以忽略Observable發射的前N項資料,只保留之後的資料 。
- skipLast(int): 忽略原始Observable發射的後N項資料,只保留之前的資料。注意:這個機制是這樣實現的:延遲原始Observable發射的任何資料項,直到過了發射了N項資料的時間,再開始傳送資料,這樣後面N項資料就沒有時間發射了。
示例程式碼:
Observable.just(0, 1, 2, 3, 4, 5)
.skip(3)
.subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
Log.d(TAG, "skip(int): " + integer);
}
});
//捨棄掉前1000ms內發射的資料,保留後面發射的資料
Observable.interval(100, TimeUnit.MILLISECONDS)
.skip(1000, TimeUnit.MILLISECONDS)
.take(5) //發射5條資料
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Subscriber<Long>() {
@Override
public void onCompleted() {
Log.d(TAG,"onCompleted" );
}
@Override
public void onError(Throwable e) {
Log.d(TAG,"onError:" + e.getMessage());
}
@Override
public void onNext(Long aLong) {
Log.d(TAG,"skip(long, TimeUnit):" + aLong);
}
});
Observable.just(0, 1, 2, 3, 4, 5)
.skipLast(4)
.subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
Log.d(TAG, "skipLast(int): " + integer);
}
});
輸出:
skip(int): 3
skip(int): 4
skip(int): 5
skip(long, TimeUnit):9
skip(long, TimeUnit):10
skip(long, TimeUnit):11
skip(long, TimeUnit):12
skip(long, TimeUnit):13
onCompleted
skipLast(int): 0
skipLast(int): 1
Take/TakeLast
- take(int):只發射前面的N項資料,然後發射完成通知,忽略剩餘的資料。
注意: 如果你對一個Observable使用take(n)(或它的同義詞limit(n))操作符,而那個Observable發射的資料少於N項,那麼take操作生成的Observable不會拋異常或發射onError通知,在完成前它只會發射相同的少量資料。
- takeLast(int): 只發射原始Observable發射的後N項資料,忽略之前的資料。
注意:這會延遲原始Observable發射的任何資料項,直到它全部完成。
- takeLastBuffer: 它和takeLast類似,唯一的不同是它把所有的資料項收集到一個List再發射,而不是依次發射一個。
示例程式碼:
//只發射前面3個數據
Observable.just(1, 2, 3, 4, 5, 6, 7, 8)
.take(3)
.subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
Log.d(TAG,"take(int): " + integer);
}
});
//只發射後面3個數據
Observable.just(1, 2, 3, 4, 5, 6, 7, 8)
.takeLast(3)
.subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
Log.d(TAG,"takeLast(int): " + integer);
}
});
//只發射後面3個數據
Observable.just(1, 2, 3, 4, 5, 6, 7, 8)
.takeLastBuffer(3)
.subscribe(new Action1<List<Integer>>() {
@Override
public void call(List<Integer> integers) {
Log.d(TAG,"takeLastBuffer(int): " + integers);
}
});
輸出:
take(int): 1
take(int): 2
take(int): 3
takeLast(int): 6
takeLast(int): 7
takeLast(int): 8
takeLastBuffer(int): [6, 7, 8]