RxJava 2.0操作符記錄
建立操作符
用於建立被觀察者物件(Observable)物件和傳送事件
create | 建立一個最基本的被觀察者物件Observable |
just |
將物件或者物件集合轉換成一個會發射這些物件的Observable |
defer |
在觀察者訂閱之前不建立這個Observable,為每一個觀察者建立一個新的Observable |
fromeArray | 建立一個可以將一個數組轉化為可被觀察的序列,並且將它的資料逐個發射的Observable |
fromIterable |
建立一個可以接收一個 Iterable 容器作為輸入,並且將它的資料逐個發射的Observable |
延時建立操作符 |
|
timer |
建立一個在指定的延遲之後發射單個數據的Observable |
interval |
建立一個定時發射整數序列的Observable |
intervalRange | 建立一個指定範圍內定時發射整數序列的Observable |
range | 建立一個發射指定範圍的整數序列的Observable |
rangeLong | 建立一個發射指定範圍的長整數序列的Observable |
特殊建立操作符 |
|
empty |
建立一個不發射任何資料,直接通知完成後終止的Observable |
error |
建立一個不發射任何資料,直接通知錯誤後終止的Observable |
never | 建立一個不發射任何資料,也不終止的Observable |
變換操作符
用於對Observable發射的資料進行變換。將Observable發射的資料按照一定的規則做一些變換,然後再將變換的資料發射出去
map | 對Observable發射的每一項資料都應用一個函式來變換 |
flatMap |
使用一個指定的函式對原始Observable發射的每一項資料執行變換操作,這個函式返回一個本身也發射資料的Observable,然後FlatMap合併這些Observables發射的資料,最後將合併後的結果當做它自己的資料序列發射。 |
flatMapIterable |
和flatMap的作用一樣,只不過返回的是Iterable而不是Observable |
concatMap |
它類似於flatMap,但是它最終輸出的資料序列和原資料序列是一致,它是按順序連結Observables,而不是合併(flatMap用的是合併) |
switchMap |
它和flatMap很像,除了一點,當原始Observable發射一個新的資料(Observable)時,它將取消訂閱並停止監視產生之前那個資料的Observable,只監視當前這一個 |
cast | 在發射之前強制將Observable發射的所有資料轉換為指定型別 |
scan |
對Observable發射的每一項資料應用一個函式,然後按順序依次發射這些值 |
buffer | 可以簡單的理解為快取,它定期從Observable收集資料到一個集合,然後把這些資料集合打包發射,而不是一次發射一個 |
groupBy |
將原來的Observable拆分為Observable集合,將原始Observable發射的資料按Key分組,每一個Observable發射一組不同的資料 |
window |
定期將來自Observable的資料拆分成一些Observable視窗,然後發射這些視窗,而不是每次發射一項。類似於Buffer,但buffer發射的是資料,window發射的是Observable,每一個Observable發射原始Observable的資料的一個子集 |
過濾操作符
這些操作符用於從Observable發射的資料中進行選擇,讓Observable返回我們所需要的資料
根據指定條件過濾事件 | |
filter | 過濾特定條件的事件,不滿足條件的事件將被過濾不發射 |
ofType | 過濾指定資料型別的事件 |
distinct | 過濾事件序列中重複的事件 |
distinctUntilChanged | 過濾事件序列中連續重複的事件 |
根據指定數量過濾事件 | |
take | 只發射開始的N項事件或者一定時間內的事件 |
takeLast | 只發射最後的N項事件或者一定時間內的事件 |
skip | 跳過開始的N項事件或者一定時間內的事件 |
skipLast | 跳過最後的N項事件或者一定時間內的事件 |
根據指定時間過濾事件 | |
sample | 在定期的時間內,只發送該段時間內最後一項事件,與 throttleLast() 操作符類似 |
debounce | 傳送資料事件時,若2次傳送事件的間隔<指定時間,就會丟棄前一次的資料重新計算時間,直到指定時間內都沒有新資料發射時才會傳送最後一次的資料事件 |
throttleFirst | 在定期的時間內,只發送該段時間內第一項事件 |
throttleLast | 在定期的時間內,只發送該段時間內最後一項事件 |
throttleWithTimeout | 傳送資料事件時,若2次傳送事件的間隔<指定時間,就會丟棄前一次的資料重新計算時間,直到指定時間內都沒有新資料發射時才會傳送最後一次的資料事件 |
timeout |
如果原始Observable在指定的時間內沒有發射任何資料,就發射一個onError通知終止這個Observable,或繼續執行一個備用的Observable |
根據指定位置過濾事件 | |
firstElement | 僅選取第一個資料事件 |
lastElement | 僅選取最後一個數據事件 |
ignoreElements |
丟棄所有資料,只保留髮射錯誤或正常終止的通知(onError或onCompleted) |
elementAt | 指定接收某個元素(通過索引值確定)允許越界,即獲取的位置索引 > 傳送事件序列長度,可設定一個預設值當發生越界時將發射預設值 |
elementAtOrError | 在elementAt() 的基礎上,當出現越界情況(即獲取的位置索引 > 傳送事件序列長度)時,即丟擲異常 |
條件/布林操作符
用於根據條件判斷被觀察者(Observable
)發射的事件是否符合條件或者對它們做布林運算
all |
判斷髮射的所有資料,是否都滿足設定的函式條件 |
contains | 判斷髮射的所有資料中,是否包含一個特定的值 |
isEmpty | 用於判斷Observable發射完畢時,有沒有發射資料。有資料false,如果只收到了onComplete通知則為true |
amb | 傳遞兩個或多個Observable,它只發射其中首先發射資料或通知(onError或onCompleted)的那個Observable的所有資料,而其他所有的Observable的發射物將被丟棄 |
takeWhile | 當發射的資料滿足某個條件時(不發射條件資料),Observable終止發射資料 |
takeUntil |
當發射的資料滿足某個條件後(發射條件資料),或者提供第二個Observable,如果第二個Observable發射了一項資料或者發射了一個終止通知(onError或onCompleted),會停止發射原始Observable資料並終止 |
skipWhile | SkipWhile訂閱原始的Observable,但是忽略它的發射物,直到你指定的某個條件變為false的那一刻,它開始發射原始Observable(發射條件資料) |
skipUntil | SkipUntil訂閱原始的Observable,但是忽略它的發射物,直到第二個Observable發射了一項資料那一刻(發射資料後被終止)或發射完成通知(onComplete),它開始發射原始Observable |
defaultIfEmpty | 如果原始Observable正常終止後仍然沒有發射任何資料,就發射一個預設值 |
switchIfEmpty | 如果原始Observable正常終止後仍然沒有發射任何資料,就使用備用的Observable |
sequenceEqual | 判定兩個Observables是否發射相同的資料序列。 傳遞兩個Observable給SequenceEqual操作符,它會比較兩個Observable的發射物,如果兩個序列是相同的(相同的資料,相同的順序,相同的終止狀態),它就發射true,否則發射false |
合併操作符
用於將多個被觀察者(Observable
)組合成一個單一的Observable和合並需要傳送的事件
zip | 合併多個被觀察者(Observable)傳送的事件,生成一個新的事件序列(即組合過後的事件序列),並最終傳送 |
concat |
組合多個被觀察者(Observable <4個)一起傳送資料,合併後按傳送順序序列執行,前一個沒有發射完,是不能發射後面的,需要注意的是Observable.concat(a,b)等價於a.concatWith(b) |
concatArray | 組合多個被觀察者(Observable)一起傳送資料,合併後按傳送順序序列執行 |
concatDelayError | 使用concat操作符時,若其中一個被觀察者發出onError事件,則會終止其他被觀察者繼續傳送事件,若希望onError事件推遲到其他被觀察者傳送完事件之後再觸發,即需要使用對應的concatDelayError操作符 |
concatArrayDelayError | 與concatDelayError() 操作符同理 |
merge | 組合多個被觀察者(Observable <4個)一起傳送資料,合併後按時間線並行執行,需要注意的是Observable.mergeWith(a,b)等價於a.mergeWith(b) |
mergeArray | 組合多個被觀察者(Observable)一起傳送資料,合併後按時間線並行執行 |
mergeDelayError | 使用merge操作符時,若其中一個被觀察者發出onError事件,則會終止其他被觀察者繼續傳送事件,若希望onError事件推遲到其他被觀察者傳送完事件之後再觸發,即需要使用對應的mergeDelayError操作符 |
mergeArrayDelayError | 與mergeDelayError() 操作符同理 |
combineLatest | 當兩個Observable 中的任何一個傳送了資料後,將先發送了資料的Observable 的最新(最後)一個數據 與 另外一個Observable 傳送的每個資料結合,最終基於該函式的結果傳送資料 |
combineLatestDelayError | 作用類似於concatDelayError()和 mergeDelayError()操作符,即錯誤處理 |
startWith | 在發射原來的Observable的資料序列之前,追加發射一個指定的資料序列或資料項 |
startWithArray | 在發射原來的Observable的資料序列之前,追加發射多個指定的資料序列或資料項 |
算術/聚合操作符
算術操作符是屬於可選的rxjava-math模組,這個模組需要導下面這個包,不支援Rxjava2所以Rxjava2用不了:
compile 'io.reactivex:rxjava-math:1.0.0'
用於對整個資料序列執行演算法操作或其它操作,由於這些操作必須等待資料發射完成(通常也必須快取這些資料),它們對於非常長或者無限的序列來說是危險的
算術操作符 | |
averageInteger | 求序列平均數併發射 |
averageLong | 求序列平均數併發射 |
averageFloat | 求序列平均數併發射 |
averageDouble | 求序列平均數併發射 |
max | 求序列最大值併發射 |
maxBy | 求最大key對應的值併發射 |
min | 求最小值併發射 |
minBy | 求最小Key對應的值併發射 |
sumInteger | 求和併發射 |
sumLong | 求和併發射 |
sumFloat | 求和併發射 |
sumDouble | 求和併發射 |
聚合操作符 | |
count | 計算資料項的個數併發射結果 |
reduce |
按順序對Observable發射的每項資料應用一個函式併發射最終的值,跟scan操作符很類似,只是scan會輸出每次計算的結果,而reduce只會輸出最後的結果。 |
collect | 將原始Observable發射的資料放到一個單一的可變的資料結構中,然後返回一個發射這個資料結構的Observable |
toList | 收集原始Observable發射的所有資料到一個列表,然後返回這個列表 |
toSortedList | 收集原始Observable發射的所有資料到一個有序列表,然後返回這個列表 |
toMap | 將序列資料轉換為一個Map,Map的key是根據一個函式計算的 |
toMultimap | 將序列資料轉換為一個列表,同時也是一個Map,Map的key是根據一個函式計算的 |
連線操作符
ConnectableObservable與普通的Observable差不多,但是可連線的Observable在被訂閱時並不開始發射資料,只有在它的connect()被呼叫時才開始
connect |
讓一個可連線的Observable開始發射資料(即使沒有任何訂閱者訂閱這個Observable,呼叫connect都會開始發射資料),connect方法返回一個Subscription物件,可以呼叫它的unsubscribe方法讓Observable停止發射資料給觀察者 |
publish |
將一個Observable轉換為一個可連線的Observable,如果一個ConnectableObservable已經開始發射資料,再對其進行訂閱只能接受之後發射的資料,訂閱之前已經發射過的資料就丟失了 |
replay | 使用Replay操作符返回的ConnectableObservable 會快取訂閱者訂閱之前已經發射的資料,這樣即使有訂閱者在其發射資料開始之後進行訂閱也能收到之前發射過的資料。Replay操作符能指定快取的大小或者時間,這樣能避免耗費太多記憶體 |
refCount |
RefCount操作符可以看做是Publish的逆向,它能將一個ConnectableObservable物件再重新轉化為一個普通的Observable物件,如果轉化後有訂閱者對其進行訂閱將會開始發射資料,後面如果有其他訂閱者訂閱,將只能接受之後發射的資料(這也是轉化之後的Observable 與普通的Observable的一點區別) |
錯誤/重試操作符
被觀察者(Observable
) 在傳送事件時出現錯誤時處理或重試,並且可以設定重試的時間間隔
onErrorResumeNext |
當原始Observable在遇到錯誤時,使用備用Observable |
onExceptionResumeNext | 當原始Observable在遇到異常時,使用備用的Observable。與onErrorResumeNext類似,區別在於onErrorResumeNext可以處理所有的錯誤,onExceptionResumeNext只能處理異常 |
onErrorReturn | 當原始Observable在遇到錯誤時發射一個特定的資料並正常終止 |
retry |
當原始Observable在遇到錯誤時發射錯誤通知(onError)事件後觸發重新訂閱 |
retryWhen | 當原始Observable在遇到錯誤時發射錯誤通知(onError)時,將發生的錯誤傳遞給一個新的被觀察者(Observable),並決定是否需要重新訂閱原始被觀察者(Observable)和傳送事件 |
retryUntil | 當原始Observable在遇到錯誤時發射錯誤通知(onError)時,判斷是否需要重新發送資料,具體使用類似於retry(Predicate predicate),唯一區別:返回 true 則不重新發送資料事件 |
repeat |
當原始Observable發射完成通知(onCompleted)事件後觸發重新訂閱 |
repeatWhen |
將原始Observable發射完成通知(onCompleted)轉換成1個 Object 型別資料傳遞給1個新被觀察者(Observable),以此決定是否重新訂閱和傳送原來的 Observable,如果返回的是onNext則觸發重訂閱,而返回的是onComplete/onError則不會觸發重訂閱 |
repeatUntil | 當原始Observable發射完成通知(onCompleted)時動態控制重複次數,如果返回true則不重複了,否則接著重複傳送事件 |
阻塞操作符
在Rxjava1中的BlockingObservable已經在Rxjava2中去掉了,在Rxjava2中已經整合到了Observable中
blockingForEach | 對Observable發射的每一項資料呼叫一個方法,會阻塞直到Observable完成 |
blockingFirst | 阻塞直到Observable發射了一個數據,然後返回第一項資料 |
blockingMostRecent | 返回一個總是返回Observable最近發射的資料的iterable |
blockingLatest | 返回一個iterable,會阻塞直到或者除非Observable發射了一個iterable沒有返回的值,然後返回這個值 |
blockingNext |
返回一個Iterable,阻塞直到Observable發射了另一個值,然後返回那個值 |
blockingLast | 阻塞直到Observable終止,然後返回最後一項資料 |
blockingIterable | 將Observable轉換返回一個iterable |
blockingSingle | 如果Observable終止時只發射了一個值,返回那個值,否則丟擲異常 |
blockingSubscribe | 在當前執行緒訂閱,和blockingForEach類似 |
功能性操作符
輔助被觀察者(Observable
) 在傳送事件時實現一些功能性需求
subscribe | 訂閱,即連線觀察者和被觀察者形成訂閱關係 |
delay |
使被觀察者在發射每項資料之前都延遲一段時間再發送事件,注意:delay不會平移onError通知,它會立即將這個通知傳遞給訂閱者,同時丟棄任何待發射的onNext通知。但是它會平移一個onCompleted通知 |
delaySubscription | 延遲訂閱原始Observable |
materialize | 將來自原始Observable的通知(onNext/onError/onComplete)都轉換為一個Notification物件,然後再按原來的順序發射出去 |
dematerialize | 與materialize操作符作用相反,將通知逆轉回一個Observable |
timestamp | 給Observable發射的每個資料項新增一個時間戳 |
timeInterval | 給Observable發射的兩個資料項間新增一個時間差,實現在OperatorTimeInterval中timeInterval |
cache | 快取Observable發射的資料序列併發射相同的資料序列給後續的訂閱者 |
using | 指示Observable建立一個只在它的生命週期記憶體在的資源,當Observable終止時這個資源會被自動釋放 |
執行緒排程 | |
subscribeOn | 指定被觀察者(Observable)自身在哪個排程器(執行緒)上執行,跟呼叫的位置沒有關係,而且多次呼叫並不會起作用,只有第一次呼叫時指定的排程器有效。但是有一種情況特殊,就是在doOnSubscribe操作符之後呼叫,可以使doOnSubscribe在指定的排程器中執行。 |
observeOn | 指定一個觀察者在哪個排程器(執行緒)上觀察這個Observable,當每次呼叫observeOn操作符時,之後的觀察者都會在選擇的排程器上進行觀察 |
do操作符在事件的生命週期中操作 | |
doOnNext | 在每次發射元素之前(onNext)呼叫 |
doOnComplete | 在發射完成通知之前(onCompleted)呼叫 |
doOnError | 在發射錯誤通知之前(onError )呼叫 |
doOnTerminate | Observable傳送終止事件通知之前呼叫,無論正常傳送完成 / 錯誤終止(onError / onCompleted) |
doOnEach | 對發射元素和發射通知進行了統一的封裝,用於在發射資料事件之前呼叫(在doOnNext之前呼叫) |
doAfterNext | 在每次發射元素之後(onNext)呼叫 |
doAfterTerminate | Observable傳送終止事件通知之後呼叫,無論正常傳送完成 / 錯誤終止(onError / onCompleted) |
doOnSubscribe | 觀察者訂閱時呼叫 |
doFinally | 當所有的事件發射完畢Observable終止之後會被呼叫,無論正常傳送完成 / 錯誤終止(onError / onCompleted)(在doAfterTerminate之前呼叫) |
doOnDispose | 當呼叫Disposable的dispose()之後呼叫 |
doOnLifecycle | Observable的生命週期監聽回撥,第一個引數的回撥方法在回撥onSubscribe()方法之前回調,第二個引數的回撥方法在呼叫Disposable的dispose()方法之後被回撥 |