1. 程式人生 > >RxJava操作符一覽

RxJava操作符一覽



把現在接觸到的操作符全部整理進來,方便查閱,遇到新的也會新增進來。和RxJavaLearn 的README.md同步更新。

操作符決策樹

直接建立一個Observable(建立操作)
組合多個Observable(組合操作)
對Observable發射的資料執行變換操作(變換操作)
從Observable發射的資料中取特定的值(過濾操作)
轉發Observable的部分值(條件/布林/過濾操作)
對Observable發射的資料序列求值(算術/聚合操作)
建立操作

用於建立Observable的操作符

Create 
通過呼叫觀察者的方法從頭建立一個Observable

Empty/Never/Throw 
建立行為受限的特殊Observable

Observable.empty() 
Returns an Observable that emits no items to the Observer and immediately invokes its onCompleted method. 直接呼叫onComplete(),在zip()操作符中也是。

Defer 
在觀察者訂閱之前不建立這個Observable,為每一個觀察者建立一個新的Observable

Just 
將物件或者物件集合轉換為一個會發射這些物件的Observable

From 
將其它的物件或資料結構轉換為Observable

Range 
建立發射指定範圍的整數序列的Observable,range操作符,發射從start開始的count個數

Interval 
間隔一定時間傳送一個數字,從0開始.本身執行在Schedulers.computation() 執行緒內

Repeat 
Repeat作用在Observable上,會對其重複發射count次

Timer 
Timer會在指定時間後發射一個數字0,注意其也是執行在computation Scheduler

變換操作

對Observable發射的資料進行變,能將資料轉化為我們想要的格式,

Buffer 
快取,可以簡單的理解為快取,它定期從Observable收集資料到一個集合,然後把這些資料集合打包發射,而不是一次發射一個

FlatMap 
扁平對映,將Observable發射的資料變換為Observables集合,然後將這些Observable發射的資料平坦化的放進一個單獨的Observable,可以認為是一個將巢狀的資料結構展開的過程。

GroupBy 
分組,將原來的Observable分拆為Observable集合,將原始Observable發射的資料按Key分組,每一個Observable發射一組不同的資料

Map 
對映,通過對序列的每一項都應用一個函式變換Observable發射的資料,實質是對序列中的每一項執行一個函式,函式的引數就是這個資料項

Scan 
掃描, 連續地對資料序列的每一項應用一個函式,然後連續發射結果,每一項結果基於之前的結果.累加器函式.

Window 
視窗,定期將來自Observable的資料分拆成一些Observable視窗,然後發射這些視窗,而不是每次發射一項。類似於Buffer,但Buffer發射的是資料,Window發射的是Observable,每一個Observable發射原始Observable的資料的一個子集

toList 
Returns an Observable that emits a single item, a list composed of all the items emitted by the source Observable. 將一個Observable轉換為一個List.

過濾操作

濾掉資料集合中我們不想要的資料。用於從Observable發射的資料中進行選擇。

throttleWithTimeout 
源Observable每次發射出來一個數據後就會進行計時,如果在設定好的時間結束前源Observable有新的資料發射出來,這個資料就會被丟棄,同時重新開始計時。

Debounce 
只有在空閒了一段時間後才發射資料,通俗的說,就是如果一段時間沒有操作,就執行一次操作.還可以根據一個函式來進行限流。這個函式的返回值是一個臨時Observable, 
如果源Observable在發射一個新的資料的時候,上一個資料根據函式所生成的臨時Observable還沒有結束,沒有呼叫onComplete,那麼上一個資料就會被過濾掉。如果是最後一個,還是會發射.

Distinct 
去重,過濾掉所有的重複資料項

DistinctUtilChanged 
過濾掉相鄰的重複項

ElementAt 
取值,取特定位置的資料項

Filter 
過濾,過濾掉沒有通過謂詞測試的資料項,只發射通過測試的. 
和zip()連用時,只要有一個沒通過filter,直接呼叫onComplete(),然不是指將這一個Observable過濾掉。

First 
取滿足條件的第一個,如無滿足條件資料丟擲異常.可以使用takeFisrt(),僅會調動onComplete. 
有null資料時會拋空指標異常,要判空處理. 
只取滿足條件的第一個資料.可以和BlockingObservable連用。可以Observable.toBlocking或者BlockingObservable.from方法來將一個Observable物件轉化為BlockingObservable物件

Last 
末項,只發射最後一條資料.

IgnoreElements 
忽略所有的資料,只保留終止通知(onError或onCompleted)

Sample 
取樣,定期發射最新的資料,等於是資料抽樣,有的實現裡叫ThrottleFirst

throttleFirst 
會定期發射這個時間段裡源Observable發射的第一個資料

Skip 
跳過前面的若干項資料

SkipLast 
跳過後面的若干項資料

Take 
只保留前面的若干項資料

TakeLast 
只保留後面的若干項資料

takeFirst 
有null資料時會拋空指標異常,要判空處理 
和first一樣,但在所有資料不滿足條件時不會丟擲異常,僅僅呼叫onComplete.

組合操作

And/Then/When 
通過模式(And條件)和計劃(Then次序)組合兩個或多個Observable發射的資料集

CombineLatest 
當兩個Observables中的任何一個發射了一個數據時,通過一個指定的函式組合每個Observable發射的最新資料(一共兩個資料),然後發射這個函式的結果

必須滿足兩個條件: 
1)所有的Observable都發射過資料。 
2)滿足條件1的時候任何一個Observable發射一個數據,就將所有Observable最新發射的資料按照提供的函式組裝起來發射出去。

在這兩個條件下,可能會忽略掉一些發射的資料.

Join 
無論何時,如果一個Observable發射了一個數據項,只要在另一個Observable發射的資料項定義的時間視窗內,就將兩個Observable發射的資料合併發射

引數說明: 
1)源Observable所要組合的目標Observable 
2)一個函式,就收從源Observable發射來的資料,並返回一個Observable,這個Observable的生命週期決定了源Observable發射出來資料的有效期 
3)一個函式,就收從目標Observable發射來的資料,並返回一個Observable,這個Observable的生命週期決定了目標Observable發射出來資料的有效期 
4)一個函式,接收從源Observable和目標Observable發射來的資料,並返回最終組合完的資料。

Merge 
將兩個Observable發射的資料組合併成一個 
Merge可能會讓合併的Observables發射的資料交錯(可以使用Concat操作符,不會讓資料交錯,它會按順序一個接著一個發射多個Observables的發射物)。

StartWith 
在發射原來的Observable的資料序列之前,先發射一個指定的資料序列或資料項

在資料序列的開頭插入一條指定的項 
你也可以傳遞一個Observable給startWith, 
它會將那個Observable的發射物插在原始Observable發射的資料序列之前.這可以看作是Concat的反轉。

Switch,在RxJava的實現為SwitchOnNext 
將一個發射Observable序列的Observable轉換為這樣一個Observable:它逐個發射那些Observable最近發射的資料 
用來將一個發射多個小Observable的源Observable轉化為一個Observable,然後發射這多個小Observable所發射的資料。 
需要注意的就是,如果一個小的Observable正在發射資料的時候,源Observable又發射出一個新的小Observable,則前一個Observable發射的資料會被拋棄,直接發射新的小Observable所發射的資料。

Zip

zip
zipWith 
打包,使用一個指定的函式將多個Observable發射的資料組合在一起,然後將這個函式的結果作為單項資料發射 
Zip操作符將多個Observable發射的資料按順序組合起來,每個資料只能組合一次,而且都是有序的。 
最終組合的資料的數量由發射資料最少的Observable來決定。
錯誤處理

這些操作符用於從錯誤通知中恢復

Catch 
捕獲,繼續序列操作,將錯誤替換為正常的資料,從onError通知中恢復

onErrorReturn 
當發生錯誤的時候,讓Observable發射一個預先定義好的資料並正常地終止,不會丟擲異常
onErrorResumeNext 
當發生錯誤的時候,由另外一個Observable來代替當前的Observable並繼續發射資料
onExceptionResumeNext 
類似於OnErrorResume,不同之處在於其會對onError丟擲的資料型別做判斷, 
如果是Exception,也會使用另外一個Observable代替原Observable繼續發射資料, 
否則會將錯誤分發給Subscriber。
Retry 
重試,如果Observable發射了一個錯誤通知,重新訂閱它,期待它正常終止

retry 
Retry操作符在發生錯誤的時候會重新進行訂閱,而且可以重複多次, 
所以發射的資料可能會產生重複。如果重複指定次數還有錯誤的話就會將錯誤返回給觀察者,會掉onError
retryWhen 
當錯誤發生時,retryWhen會接收onError的throwable作為引數,並根據定義好的函式返回一個Observable,如果這個Observable發射一個數據,就會重新訂閱。 
需要注意的是使用retryWhen的時候,因為每次重新訂閱都會產生錯誤,所以作為引數的obserbvable會不斷地發射資料,使用zipWith操作符可以限制重新訂閱的次數,否則會無限制地重新訂閱。 
會正常結束,呼叫onCompleted
輔助操作

Delay 
延遲一段時間發射結果資料 
可以使用Observable.delay(5,TimeUnit.SECONDS)來做延遲發射,預設在Schedule.computation()執行緒上。

DelaySubscription 
延遲註冊到Observer上

Do 
Do操作符就是給Observable的生命週期的各個階段加上一系列的回撥監聽,當Observable執行到這個階段的時候,這些回撥就會被觸發.

doOnEach 
Observable每發射一個數據的時候就會觸發這個回撥,不僅包括onNext還包括onError和onCompleted。

doOnNext 
只有onNext的時候才會被觸發。

doOnSubscribe,doOnUnSubscribe 
會在Subscriber進行訂閱和反訂閱的時候觸發回撥。 
當一個Observable通過OnError或者OnCompleted結束的時候,會反訂閱所有的Subscriber。在Android中和生命週期繫結起來,因為有些Observable執行不完啊.

doOnSubscribeOn 
在開始註冊前做一些工作。處於當前執行緒,而不是subscribeOn指定的執行緒。

DoOnError 
在OnError發生的時候觸發回撥,並將Throwable物件作為引數傳進回撥函式裡。

DoOnComplete 
會在OnCompleted發生的時候觸發回撥。

DoOnTerminate 
會在Observable結束前觸發回撥,無論是正常還是異常終止。

finallyDo,doAfterTerminate 
會在Observable結束後觸發回撥,無論是正常還是異常終止。

Materialize,dematerialize 
Meterialize操作符將OnNext/OnError/OnComplete都轉化為一個Notification物件並按照原來的順序發射出來,dematerialize相反 
使用integerNotification.getValue() +”, ” + integerNotification.getKind()可以看到列印值和型別.

ObserveOn 
指定Subscriber的排程程式(工作執行緒)

SubscribeOn 
指定Observable應該在哪個排程程式上執行

Serialize 
強制Observable按次序發射資料並且功能是有效的

Subscribe 
收到Observable發射的資料和通知後執行的操作

TimeInterval 
將一個Observable轉換為發射兩個資料之間所耗費時間的Observable 
TimeInterval會攔截髮射出來的資料,取代為前後兩個發射兩個資料的間隔時間。對於第一個發射的資料,其時間間隔為訂閱後到首次發射的間隔。

Timeout 
新增超時機制,如果過了指定的一段時間沒有發射資料,就發射一個錯誤通知 
Timeout操作符給Observable加上超時時間,每發射一個數據後就重置計時器,當超過預定的時間還沒有發射下一個資料,就丟擲一個超時的異常。 
Rxjava將Timeout實現為很多不同功能的操作符,比如說超時後用一個備用的Observable繼續發射資料等。

Timestamp 
給Observable發射的每個資料項新增一個時間戳 
TimeStamp會將每個資料項給重新包裝一下,加上了一個時間戳來標明每次發射的時間

Using 
建立一個只在Observable的生命週期記憶體在的一次性資源

Using操作符建立一個在Observable生命週期記憶體活的資源,也可以這樣理解: 
我們建立一個資源並使用它,用一個Observable來限制這個資源的使用時間,當這個Observable終止的時候,這個資源就會被銷燬。 
Using需要使用三個引數,分別是: 
1)建立這個一次性資源的函式 
2)建立Observable的函式 
3)釋放資源的函式

onBackpressureBuffer 
Instructs an Observable that is emitting items faster than its observer can consume them to buffer these items indefinitely until they can be emitted. 
Observable生產資料的速度大於Observer消費的速度,就會丟擲MissingBackpressureException。使用該操作符可以快取資料等待Observer消費。具體參考 RxJava 教程第四部分:併發 之資料流發射太快如何辦

條件和布林操作

這些操作符可用於單個或多個數據項,也可用於Observable

All 
判斷Observable發射的所有的資料項是否都滿足某個條件

All操作符根據一個函式對源Observable發射的所有資料進行判斷,最終返回的結果就是這個判斷結果。 
對發射的所有資料應用這個函式,如果全部都滿足則返回true,否則就返回false。

Amb 
Amb操作符可以將至多9個Observable結合起來,讓他們競爭。 
哪個Observable首先發射了資料(包括onError和onComplete)就會繼續發射這個Observable的資料,其他的Observable所發射的資料都會別丟棄。

Contains 
判斷Observable是否會發射一個指定的資料項 
Contains操作符用來判斷源Observable所發射的資料是否包含某一個數據,如果包含會返回true,如果源Observable已經結束了卻還沒有發射這個資料則返回false。

IsEmpty 
IsEmpty操作符用來判斷源Observable是否發射過資料,沒有發射過資料返回true. 
Null也是一個數據

DefaultIfEmpty 
發射來自原始Observable的資料,如果原始Observable沒有發射資料,就發射一個預設資料 
會判斷源Observable是否發射資料,如果源Observable發射了資料則正常發射這些資料,如果沒有則發射一個預設的資料

SequenceEqual 
判斷兩個Observable是否按相同的資料序列 
SequenceEqual操作符用來判斷兩個Observable發射的資料序列是否相同(發射的資料相同,資料的序列相同,結束的狀態相同),如果相同返回true,否則返回false

SkipUntil 
SkipUnitl是根據一個標誌Observable來判斷的,當這個標誌Observable沒有發射資料的時候,所有源Observable發射的資料都會被跳過;當標誌Observable發射了一個數據,則開始正常地發射資料。 
一直等到skipUntil發射了資料才能發射源Observable的資料,並忽略了此段時間內的資料

SkipWhile 
SkipWhile則是根據一個函式來判斷是否跳過資料,當函式返回值為true的時候則一直跳過源Observable發射的資料;當函式返回false的時候則開始正常發射資料。

TakeUntil 
和SkipUtil恰好相反,只獲取takeUntil裡的Observable之前的資料

TakeWhile 
和SkipWhile相反,獲取滿足skipWhile的資料

算術和聚合操作

Concat 
將多個Observable結合成一個Observable併發射資料,並且嚴格按照先後順序發射資料,前一個Observable的資料沒有發射完,不發射後面Observable的資料

Count 
Count操作符用來統計源Observable發射了多少個數據,最後將數目給發射出來; 
如果源Observable發射錯誤,則會將錯誤直接報出來;在源Observable沒有終止前,count是不會發射統計資料的。

Reduce 
Reduce操作符應用一個函式接收Observable發射的資料和函式的計算結果作為下次計算的引數,輸出最後的結果。 
跟前面我們瞭解過的scan操作符很類似,只是scan會輸出每次計算的結果,而reduce只會輸出最後的結果。

Collect 
collect用來將源Observable發射的資料給收集到一個數據結構裡面,需要使用兩個引數: 
一個產生收集資料結構的函式 
一個接收第一個函式產生的資料結構和源Observable發射的資料作為引數的函式。

連線操作

一些有精確可控的訂閱行為的特殊Observable

什麼是Connectable Observable: 
就是一種特殊的Observable物件,並不是Subscrib的時候就發射資料,而是隻有對其應用connect操作符的時候才開始發射資料, 
所以可以用來更靈活的控制資料發射的時機。

使用Publish操作符將Observable轉換為Connectable Observable,然後可以通過connect控制何時發射.

Publish 
將一個普通的Observable轉換為可連線的 
Publish操作符就是用來將一個普通的Observable物件轉化為一個Connectable Observable。需要注意的是如果發射資料已經開始了再進行訂閱只能接收以後發射的資料。

Connect 
Connect操作符就是用來觸發Connectable Observable發射資料的。 
應用Connect操作符後會返回一個Subscription物件,通過這個Subscription物件,我們可以呼叫其unsubscribe方法來終止資料的發射。 
另外,如果還沒有訂閱者訂閱的時候就應用Connect操作符也是可以使其開始發射資料的。

RefCount 
RefCount操作符就是將一個Connectable Observable 物件再重新轉化為一個普通的Observable物件,這時候訂閱者進行訂閱時就會觸發資料的發射。

Replay 
Replay操作符返回一個Connectable Observable 物件並且可以快取其發射過的資料,這樣即使有訂閱者在其發射資料之後進行訂閱也能收到其之前發射過的資料。 
不過使用Replay操作符我們最好還是限定其快取的大小,否則快取的資料太多了可會佔用很大的一塊記憶體。 
對快取的控制可以從空間和時間兩個方面來實現。

直接返回一個connectable observable,不用publish

轉換操作

to
自定義操作符

lift
compose