RxJava2詳解(二)--操作符
操作符簡介
Observable和Observer只是ReactiveX的開始,他們自己只不過是標準觀察者模式的輕微擴充套件,更適合處理事件序列而不是單個回撥。
ReactiveX真正強大的是那些讓你可以隨意變換、組合、操作Observable發射的資料序列的操作符(Operators),這些操作符可以讓你宣告式地組合非同步序列,同時具備回撥的所有效率優勢,但沒有傳統非同步系統的巢狀回撥處理的缺點。
操作符分類
ReactiveX為了能夠更好進行非同步處理操作,定義了非常多的操作符,每個平臺實現可以根據需要實現,也可以自定義更多的操作符:
- 建立Observable(Creating Observables)
Create
,Defer
,Empty
/Never
/Throw
,From
,Interval
,Just
,Range
,Repeat
,Start
, andTimer
- 變換Observable的Item(Transforming Observable Items)
Buffer
,FlatMap
,GroupBy
,Map
,Scan
, andWindow
- 過濾Observable(Filtering Observables)
Debounce
,Distinct
,ElementAt
,Filter
,First
,IgnoreElements
,Last
,Sample
,Skip
SkipLast
,Take
, andTakeLast
- 組合多個Observable(Combining Observables)
And
/Then
/When
,CombineLatest
,Join
,Merge
,StartWith
,Switch
, andZip
- 錯誤處理(Error Handling Operators)
Catch
andRetry
- Observable工具(Utility Operators)
Delay
,Do
,Materialize
/Dematerialize
,ObserveOn
,Serialize
,Subscribe
,SubscribeOn
TimeInterval
,Timeout
,Timestamp
, andUsing
- 條件及布林判斷(Conditional and Boolean Operators)
All
,Amb
,Contains
,DefaultIfEmpty
,SequenceEqual
,SkipUntil
,SkipWhile
,TakeUntil
, andTakeWhile
- 數學及集合操作符(Mathematical and Aggregate Operators)
Average
,Concat
,Count
,Max
,Min
,Reduce
, andSum
- 轉換Observable(Converting Observables)
To
- Connectable Observable操作符(Connectable Observable Operators)
Connect
,Publish
,RefCount
, andReplay
- 背壓操作符(Backpressure Operators)
- 一些可以進行事件/資料流控制的操作符
操作符的鏈式呼叫
很多操作符都作用於Observable並返回一個Observable,這就意味著你可以一個接一個的鏈式使用這些操作符,鏈中的每個操作符都會修改之前操作符操作產生的Observable。
其它的鏈式呼叫模式,像Builder模式,也可以連續的呼叫一系列操作方法。Builder模式一般都是鏈式地修改同一個例項的屬性,所以操作方法的呼叫順序一般並沒有什麼影響,但是Observable操作符的使用順序卻很重要,因為每個操作符操作Observable後都會馬上將新生成的Observable交給下一個操作符去處理。
一些“核心”的操作符
Create
建立Observable:
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(@NonNull ObservableEmitter<String> e) throws Exception {
e.onNext("one");
e.onNext("two");
e.onNext("three");
e.onComplete();
}
}).subscribe(new Consumer<String>() {
@Override
public void accept(@NonNull String s) throws Exception {
System.out.println("accept: " + s);
}
});
Defer
Defer操作符會等到有一個Observer訂閱才會生成一個Observable,也就是說每個訂閱者都有自己序列,這可以確保最後一刻(訂閱時)建立的Observable包含最新的資料。
Observable<String> defer = Observable.defer(new Callable<Observab
@Override
public ObservableSource<? extends String> call() throws Excep
Object o = new Object();
System.out.println("emit: " + "object" + o.hashCode());
return Observable.just("object" + o.hashCode());
}
});
Consumer<String> consumer0 = new Consumer<String>() {
@Override
public void accept(@NonNull String s) throws Exception {
System.out.println("accept: " + s);
}
};
Consumer<String> consumer1 = new Consumer<String>() {
@Override
public void accept(@NonNull String s) throws Exception {
System.out.println("accept: " + s);
}
};
defer.subscribe(consumer0);
defer.subscribe(consumer1);
emit: object124101560
accept: object124101560
emit: object913896849
accept: object913896849
From
把其他物件或資料結構轉成Observable
RxJava2的實現為fromArray
、fromCallable
、fromFuture
、fromIterable
、fromPublisher
等方法。
Just
把一個item轉換成發射這個item的Observable。Just和From類似,From會把陣列或iterable或其它有序東西內部的所有item取出來發射,而Just只會簡單地將陣列或iterable或者其它原來的東西不做任何更改地作為一個item發射。
Interval
建立一個每隔給定的時間間隔發射一個遞增整數的Observable
Timer
建立一個給定延遲後發射一個item(0L)的Observable
Range
建立發射給定範圍的連續整數的Observable
Map
通過給每個item應用函式來轉換要發射的item,Map操作符將返回發射函式應用結果item的新的Observable
FlatMap
FlatMap操作符會應用你指定的函式到每個源Observable要發射的item,該函式會返回一個自己發射item的Observable,然後FlatMap會merge這些新的Observable,把merge後的item序列作為新Observable的發射序列。由於是merge操作所以item發射順序可能是交錯的,如果想保證嚴格的發射順序可以使用ConcatMap操作符。
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
emitter.onNext(1);
emitter.onNext(2);
emitter.onNext(3);
}
}).flatMap(new Function<Integer, ObservableSource<String>>() {
@Override
public ObservableSource<String> apply(@NonNull Integer integer) throws Exception {
final List<String> list = new ArrayList<>();
for (int i = 0; i < 3; i++) {
list.add("I am value " + integer + "-" + i);
}
return Observable.fromIterable(list).delay(50, TimeUnit.MILLISECONDS);
}
}).subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
System.out.println("accept: " + s);
}
});
accept: I am value 1-1
accept: I am value 2-1
accept: I am value 2-2
accept: I am value 1-2
accept: I am value 3-1
accept: I am value 3-2
Filter
只發射Observable中那些通過判定測試(predicate test)的item
Take
只發射Observable前n個item
Merge
把多個Observable merge為一個Observable,merge發射的item可能是交錯的,且如果任何源Observable出現onError
都會馬上終止merge過程並傳給最終Observable。如果想延遲onError
到merge結束可以使用MergeDelayError操作符。
Zip
根據你給定的方法把多個Observable發射的item結合在一起,每組item結合後都作為單個item(給定方法的返回值)發射。Zip操作符嚴格按序應用給定的方法,所以新生成的Observable的第一個item肯定是Observable #1第一個item和Observable #2第一個item的結合(即方法返回值),新生成的Observable的第二個item肯定是Observable #1第二個item和Observable #2第二個item的結合,以此類推,只發射與 發射最少item的源Observable的發射item數 一樣多的item
Delay
返回一個每發射一個源Observable的item之前都延遲給定時間的Observable,但onError
不會被延遲。
SubscribeOn
指定Observable要操作在哪個Scheduler上。
ObserveOn
指定Observer將要在哪個Scheduler上訂閱它的Observable。
Subscribe
把Observable和Observer連線起來,只有通過Subscribe操作符訂閱Observable才能收到Observable發射的item以及onError
、onComplete
訊號。
All
判斷Observable發射的所有的item是否都滿足指定條件。當且僅當源Observable正常終止且每個發射的item都被給定的判定函式判斷為true
時,All操作符才會返回一個只發射一個true
的Observable。如果源Observable發射的任何一個item被給定的判定函式判斷為false,All操作符會返回一個只發射一個false
的Observable。
Amb
Ambiguous(模稜兩可的)的縮寫。對於給定的兩個或多個源Observable,只發射 第一個發射item或通知(onError
或onCompleted
)的那個Observable 的所有item及通知,Amb會忽略並丟棄其它源Observable發射的item及通知。
Contains
判斷Observable是否發射了指定的item,如果源Observable發射了指定的item就返回一個發射true
的Observable,如果源Observable直到結束都沒發射指定的item就返回一個發射false
的Observable。類似的IsEmpty操作符會在源Observable直到終止都沒發射任何item時返回一個發射true
的Observable。
SkipUntil
在第二個Observable發射一個item之前丟棄源Observable要發射的item,之後會映象發射源Observable的item。
SkipWhile
在你給定的條件變成false之前丟棄源Observable要發射的item,之後會映象發射源Observable的item。
TakeUntil
TakeUtil會映象源Observable並監視你給定的第二個Observable,在第二個Observable發射一個item或終止訊號(onError
或onCompleted
)後丟棄源Observable的任何item(即停止映象源Observable並終止)。
Concat
簡單地將多個Observable連線(無交錯的)成一個Observable,即只有第一個Observable的item都被髮射完才會發射第二個Observable的item,以此類推。由於Concat會等待訂閱 給定的多個Observable 直到之前的Observable完成,如果你想連線一個"熱Observable"(在被訂閱之前就立即發射item的Observable),Concat將看不到這些也就不會發射任何item。
References