1. 程式人生 > >RxJava2詳解(二)--操作符

RxJava2詳解(二)--操作符

操作符簡介

Observable和Observer只是ReactiveX的開始,他們自己只不過是標準觀察者模式的輕微擴充套件,更適合處理事件序列而不是單個回撥。
ReactiveX真正強大的是那些讓你可以隨意變換、組合、操作Observable發射的資料序列的操作符(Operators),這些操作符可以讓你宣告式地組合非同步序列,同時具備回撥的所有效率優勢,但沒有傳統非同步系統的巢狀回撥處理的缺點。

操作符分類

ReactiveX為了能夠更好進行非同步處理操作,定義了非常多的操作符,每個平臺實現可以根據需要實現,也可以自定義更多的操作符:

建立Observable(Creating Observables)
Create, Defer, Empty/Never/Throw, From, Interval, Just, Range, Repeat, Start, and Timer
變換Observable的Item(Transforming Observable Items)
Buffer, FlatMap, GroupBy, Map, Scan, and Window
過濾Observable(Filtering Observables)
Debounce, Distinct, ElementAt, Filter, First, IgnoreElements, Last, Sample, Skip
, SkipLast, Take, and TakeLast
組合多個Observable(Combining Observables)
And/Then/When, CombineLatest, Join, Merge, StartWith, Switch, and Zip
錯誤處理(Error Handling Operators)
Catch and Retry
Observable工具(Utility Operators)
Delay, Do, Materialize/Dematerialize, ObserveOn, Serialize, Subscribe, SubscribeOn
, TimeInterval, Timeout, Timestamp, and Using
條件及布林判斷(Conditional and Boolean Operators)
All, Amb, Contains, DefaultIfEmpty, SequenceEqual, SkipUntil, SkipWhile, TakeUntil, and TakeWhile
數學及集合操作符(Mathematical and Aggregate Operators)
Average, Concat, Count, Max, Min, Reduce, and Sum
轉換Observable(Converting Observables)
To
Connectable Observable操作符(Connectable Observable Operators)
Connect, Publish, RefCount, and Replay
背壓操作符(Backpressure Operators)
一些可以進行事件/資料流控制的操作符

操作符的鏈式呼叫

很多操作符都作用於Observable並返回一個Observable,這就意味著你可以一個接一個的鏈式使用這些操作符,鏈中的每個操作符都會修改之前操作符操作產生的Observable。
其它的鏈式呼叫模式,像Builder模式,也可以連續的呼叫一系列操作方法。Builder模式一般都是鏈式地修改同一個例項的屬性,所以操作方法的呼叫順序一般並沒有什麼影響,但是Observable操作符的使用順序卻很重要,因為每個操作符操作Observable後都會馬上將新生成的Observable交給下一個操作符去處理。

一些“核心”的操作符

Create

建立Observable:
create

Observable.create(new ObservableOnSubscribe<String>() {
    @Override
    public void subscribe(@NonNull ObservableEmitter<String> e) throws Exception {
        e.onNext("one");
        e.onNext("two");
        e.onNext("three");
        e.onComplete();
    }
}).subscribe(new Consumer<String>() {
    @Override
    public void accept(@NonNull String s) throws Exception {
        System.out.println("accept: " + s);
    }
});

Defer

Defer操作符會等到有一個Observer訂閱才會生成一個Observable,也就是說每個訂閱者都有自己序列,這可以確保最後一刻(訂閱時)建立的Observable包含最新的資料。
defer

Observable<String> defer = Observable.defer(new Callable<Observab
    @Override
    public ObservableSource<? extends String> call() throws Excep
        Object o = new Object();
        System.out.println("emit: " + "object" + o.hashCode());
        return Observable.just("object" + o.hashCode());
    }
});
Consumer<String> consumer0 = new Consumer<String>() {
    @Override
    public void accept(@NonNull String s) throws Exception {
        System.out.println("accept: " + s);
    }
};
Consumer<String> consumer1 = new Consumer<String>() {
    @Override
    public void accept(@NonNull String s) throws Exception {
        System.out.println("accept: " + s);
    }
};
defer.subscribe(consumer0);
defer.subscribe(consumer1);
emit: object124101560
accept: object124101560
emit: object913896849
accept: object913896849

From

把其他物件或資料結構轉成Observable
from
RxJava2的實現為fromArrayfromCallablefromFuturefromIterablefromPublisher等方法。

Just

把一個item轉換成發射這個item的Observable。Just和From類似,From會把陣列或iterable或其它有序東西內部的所有item取出來發射,而Just只會簡單地將陣列或iterable或者其它原來的東西不做任何更改地作為一個item發射。
Just

Interval

建立一個每隔給定的時間間隔發射一個遞增整數的Observable
Interval

Timer

建立一個給定延遲後發射一個item(0L)的Observable
Timer

Range

建立發射給定範圍的連續整數的Observable
Range

Map

通過給每個item應用函式來轉換要發射的item,Map操作符將返回發射函式應用結果item的新的Observable
Map

FlatMap

FlatMap操作符會應用你指定的函式到每個源Observable要發射的item,該函式會返回一個自己發射item的Observable,然後FlatMap會merge這些新的Observable,把merge後的item序列作為新Observable的發射序列。由於是merge操作所以item發射順序可能是交錯的,如果想保證嚴格的發射順序可以使用ConcatMap操作符。
FlatMap

Observable.create(new ObservableOnSubscribe<Integer>() {
    @Override
    public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
        emitter.onNext(1);
        emitter.onNext(2);
        emitter.onNext(3);
    }
}).flatMap(new Function<Integer, ObservableSource<String>>() {
    @Override
    public ObservableSource<String> apply(@NonNull Integer integer) throws Exception {
        final List<String> list = new ArrayList<>();
        for (int i = 0; i < 3; i++) {
            list.add("I am value " + integer + "-" + i);
        }
        return Observable.fromIterable(list).delay(50, TimeUnit.MILLISECONDS);
    }
}).subscribe(new Consumer<String>() {
    @Override
    public void accept(String s) throws Exception {
        System.out.println("accept: " + s);
    }
});
accept: I am value 1-1
accept: I am value 2-1
accept: I am value 2-2
accept: I am value 1-2
accept: I am value 3-1
accept: I am value 3-2

Filter

只發射Observable中那些通過判定測試(predicate test)的item
Filter

Take

只發射Observable前n個item
Take

Merge

把多個Observable merge為一個Observable,merge發射的item可能是交錯的,且如果任何源Observable出現onError都會馬上終止merge過程並傳給最終Observable。如果想延遲onError到merge結束可以使用MergeDelayError操作符。
Merge

Zip

根據你給定的方法把多個Observable發射的item結合在一起,每組item結合後都作為單個item(給定方法的返回值)發射。Zip操作符嚴格按序應用給定的方法,所以新生成的Observable的第一個item肯定是Observable #1第一個item和Observable #2第一個item的結合(即方法返回值),新生成的Observable的第二個item肯定是Observable #1第二個item和Observable #2第二個item的結合,以此類推,只發射與 發射最少item的源Observable的發射item數 一樣多的item
Zip

Delay

返回一個每發射一個源Observable的item之前都延遲給定時間的Observable,但onError不會被延遲。
Delay

SubscribeOn

指定Observable要操作在哪個Scheduler上。
SubscribeOn

ObserveOn

指定Observer將要在哪個Scheduler上訂閱它的Observable。
ObserveOn

Subscribe

把Observable和Observer連線起來,只有通過Subscribe操作符訂閱Observable才能收到Observable發射的item以及onErroronComplete訊號。

All

判斷Observable發射的所有的item是否都滿足指定條件。當且僅當源Observable正常終止且每個發射的item都被給定的判定函式判斷為true時,All操作符才會返回一個只發射一個true的Observable。如果源Observable發射的任何一個item被給定的判定函式判斷為false,All操作符會返回一個只發射一個false的Observable。
All

Amb

Ambiguous(模稜兩可的)的縮寫。對於給定的兩個或多個源Observable,只發射 第一個發射item或通知(onErroronCompleted)的那個Observable 的所有item及通知,Amb會忽略並丟棄其它源Observable發射的item及通知。
Amb

Contains

判斷Observable是否發射了指定的item,如果源Observable發射了指定的item就返回一個發射true的Observable,如果源Observable直到結束都沒發射指定的item就返回一個發射false的Observable。類似的IsEmpty操作符會在源Observable直到終止都沒發射任何item時返回一個發射true的Observable。
Contains

SkipUntil

在第二個Observable發射一個item之前丟棄源Observable要發射的item,之後會映象發射源Observable的item。
SkipUntil

SkipWhile

在你給定的條件變成false之前丟棄源Observable要發射的item,之後會映象發射源Observable的item。
SkipWhile

TakeUntil

TakeUtil會映象源Observable並監視你給定的第二個Observable,在第二個Observable發射一個item或終止訊號(onErroronCompleted)後丟棄源Observable的任何item(即停止映象源Observable並終止)。
TakeUntil

Concat

簡單地將多個Observable連線(無交錯的)成一個Observable,即只有第一個Observable的item都被髮射完才會發射第二個Observable的item,以此類推。由於Concat會等待訂閱 給定的多個Observable 直到之前的Observable完成,如果你想連線一個"熱Observable"(在被訂閱之前就立即發射item的Observable),Concat將看不到這些也就不會發射任何item。
Concat

References