Android RxJava操作符的學習---建立操作符
阿新 • • 發佈:2018-11-09
RxJava
如此受歡迎的原因,在於其提供了豐富 & 功能強大的操作符,幾乎能完成所有的功能需求
1. 簡介
RxJava
操作符的具體簡介如下:
2. 型別
RxJava
功能強大,所以其對應的操作符的型別也非常多- 根據功能作用的不同,RxJava的操作符分類如下:
3. 操作符詳解
注:在使用RxJava 2
操作符前,記得在專案的Gradle
中新增依賴:
dependencies { compile 'io.reactivex.rxjava2:rxandroid:2.0.1' compile 'io.reactivex.rxjava2:rxjava:2.0.7' // 注:RxJava2 與 RxJava1 不能共存,即依賴不能同時存在 }
3.1 建立操作符
-
作用
建立 被觀察者(Observable
) 物件 & 傳送事件。 -
應用場景
- 型別
根據上述應用場景,建立操作符的型別包括:
3.1.1 基本建立
-
需求場景
完整的建立被觀察者物件 -
對應操作符型別
create()
- 作用
完整建立1個被觀察者物件(Observable
)
RxJava
中建立被觀察者物件最基本的操作符
- 具體使用
-
/ ** * 1. 通過creat()建立被觀察者 Observable 物件 */ Observable<Integer> observable = Observable.create(new ObservableOnSubscribe<Integer>() { // 傳入引數: OnSubscribe 物件 // 當 Observable 被訂閱時,OnSubscribe 的 call() 方法會自動被呼叫,即事件序列就會依照設定依次被觸發 // 即觀察者會依次呼叫對應事件的複寫方法從而響應事件 // 從而實現由被觀察者向觀察者的事件傳遞 & 被觀察者呼叫了觀察者的回撥方法 ,即觀察者模式 / ** * 2. 在複寫的subscribe()裡定義需要傳送的事件 */ @Override public void subscribe(ObservableEmitter<Integer> emitter) throws Exception { // 通過 ObservableEmitter類物件 產生 & 傳送事件 // ObservableEmitter類介紹 // a. 定義:事件發射器 // b. 作用:定義需要傳送的事件 & 向觀察者傳送事件 // 注:建議傳送事件前檢查觀察者的isUnsubscribed狀態,以便在沒有觀察者時,讓Observable停止發射資料 if (!observer.isUnsubscribed()) { emitter.onNext(1); emitter.onNext(2); emitter.onNext(3); } emitter.onComplete(); } }); // 至此,一個完整的被觀察者物件(Observable)就建立完畢了。
在具體使用時,一般採用 鏈式呼叫 來建立
// 1. 通過creat()建立被觀察者物件 Observable.create(new ObservableOnSubscribe<Integer>() { // 2. 在複寫的subscribe()裡定義需要傳送的事件 @Override public void subscribe(ObservableEmitter<Integer> emitter) throws Exception { emitter.onNext(1); emitter.onNext(2); emitter.onNext(3); emitter.onComplete(); } // 至此,一個被觀察者物件(Observable)就建立完畢 }).subscribe(new Observer<Integer>() { // 以下步驟僅為展示一個完整demo,可以忽略 // 3. 通過通過訂閱(subscribe)連線觀察者和被觀察者 // 4. 建立觀察者 & 定義響應事件的行為 @Override public void onSubscribe(Disposable d) { Log.d(TAG, "開始採用subscribe連線"); } // 預設最先呼叫複寫的 onSubscribe() @Override public void onNext(Integer value) { Log.d(TAG, "接收到了事件"+ value ); } @Override public void onError(Throwable e) { Log.d(TAG, "對Error事件作出響應"); } @Override public void onComplete() { Log.d(TAG, "對Complete事件作出響應"); } }); }
具體例子見前文
3.1.2 快速建立 & 傳送事件
-
需求場景
快速的建立被觀察者物件 -
對應操作符型別
just()
- 作用
- 快速建立1個被觀察者物件(
Observable
) - 傳送事件的特點:直接傳送 傳入的事件
- 快速建立1個被觀察者物件(
注:最多隻能傳送10個引數
-
應用場景
快速建立 被觀察者物件(Observable
) & 傳送10個以下事件 -
具體使用
// 1. 建立時傳入整型1、2、3、4
// 在建立後就會發送這些物件,相當於執行了onNext(1)、onNext(2)、onNext(3)、onNext(4)
Observable.just(1, 2, 3,4)
// 至此,一個Observable物件建立完畢,以下步驟僅為展示一個完整demo,可以忽略
// 2. 通過通過訂閱(subscribe)連線觀察者和被觀察者
// 3. 建立觀察者 & 定義響應事件的行為
.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "開始採用subscribe連線");
}
// 預設最先呼叫複寫的 onSubscribe()
@Override
public void onNext(Integer value) {
Log.d(TAG, "接收到了事件"+ value );
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "對Error事件作出響應");
}
@Override
public void onComplete() {
Log.d(TAG, "對Complete事件作出響應");
}
});
}
- 測試結果
fromArray()
- 作用
- 快速建立1個被觀察者物件(
Observable
) - 傳送事件的特點:直接傳送 傳入的陣列資料
- 快速建立1個被觀察者物件(
會將陣列中的資料轉換為
Observable
物件
-
應用場景
- 快速建立 被觀察者物件(
Observable
) & 傳送10個以上事件(陣列形式) - 陣列元素遍歷
- 快速建立 被觀察者物件(
-
具體使用
// 1. 設定需要傳入的陣列
Integer[] items = { 0, 1, 2, 3, 4 };
// 2. 建立被觀察者物件(Observable)時傳入陣列
// 在建立後就會將該陣列轉換成Observable & 傳送該物件中的所有資料
Observable.fromArray(items)
.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "開始採用subscribe連線");
}
@Override
public void onNext(Integer value) {
Log.d(TAG, "接收到了事件"+ value );
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "對Error事件作出響應");
}
@Override
public void onComplete() {
Log.d(TAG, "對Complete事件作出響應");
}
});
}
// 注:
// 可傳送10個以上引數
// 若直接傳遞一個list集合進去,否則會直接把list當做一個數據元素髮送
/*
* 陣列遍歷
**/
// 1. 設定需要傳入的陣列
Integer[] items = { 0, 1, 2, 3, 4 };
// 2. 建立被觀察者物件(Observable)時傳入陣列
// 在建立後就會將該陣列轉換成Observable & 傳送該物件中的所有資料
Observable.fromArray(items)
.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "陣列遍歷");
}
@Override
public void onNext(Integer value) {
Log.d(TAG, "陣列中的元素 = "+ value );
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "對Error事件作出響應");
}
@Override
public void onComplete() {
Log.d(TAG, "遍歷結束");
}
});
- 測試結果
fromIterable()
- 作用
- 快速建立1個被觀察者物件(
Observable
) - 傳送事件的特點:直接傳送 傳入的集合
List
資料
- 快速建立1個被觀察者物件(
會將陣列中的資料轉換為
Observable
物件
-
應用場景
- 快速建立 被觀察者物件(
Observable
) & 傳送10個以上事件(集合形式) - 集合元素遍歷
- 快速建立 被觀察者物件(
-
具體使用
/*
* 快速傳送集合
**/
// 1. 設定一個集合
List<Integer> list = new ArrayList<>();
list.add(1);
list.add(2);
list.add(3);
// 2. 通過fromIterable()將集合中的物件 / 資料傳送出去
Observable.fromIterable(list)
.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "開始採用subscribe連線");
}
@Override
public void onNext(Integer value) {
Log.d(TAG, "接收到了事件"+ value );
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "對Error事件作出響應");
}
@Override
public void onComplete() {
Log.d(TAG, "對Complete事件作出響應");
}
});
/*
* 集合遍歷
**/
// 1. 設定一個集合
List<Integer> list = new ArrayList<>();
list.add(1);
list.add(2);
list.add(3);
// 2. 通過fromIterable()將集合中的物件 / 資料傳送出去
Observable.fromIterable(list)
.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "集合遍歷");
}
@Override
public void onNext(Integer value) {
Log.d(TAG, "集合中的資料元素 = "+ value );
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "對Error事件作出響應");
}
@Override
public void onComplete() {
Log.d(TAG, "遍歷結束");
}
});
- 測試結果
額外
// 下列方法一般用於測試使用
<-- empty() -->
// 該方法建立的被觀察者物件傳送事件的特點:僅傳送Complete事件,直接通知完成
Observable observable1=Observable.empty();
// 即觀察者接收後會直接呼叫onCompleted()
<-- error() -->
// 該方法建立的被觀察者物件傳送事件的特點:僅傳送Error事件,直接通知異常
// 可自定義異常
Observable observable2=Observable.error(new RuntimeException())
// 即觀察者接收後會直接呼叫onError()
<-- never() -->
// 該方法建立的被觀察者物件傳送事件的特點:不傳送任何事件
Observable observable3=Observable.never();
// 即觀察者接收後什麼都不呼叫
3.1.3 延遲建立
- 需求場景
- 定時操作:在經過了x秒後,需要自動執行y操作
- 週期性操作:每隔x秒後,需要自動執行y操作
defer()
- 作用
直到有觀察者(Observer
)訂閱時,才動態建立被觀察者物件(Observable
) & 傳送事件
- 通過
Observable
工廠方法建立被觀察者物件(Observable
)- 每次訂閱後,都會得到一個剛建立的最新的
Observable
物件,這可以確保Observable
物件裡的資料是最新的
-
應用場景
動態建立被觀察者物件(Observable
) & 獲取最新的Observable
物件資料 -
具體使用
-
<-- 1. 第1次對i賦值 ->> Integer i = 10; // 2. 通過defer 定義被觀察者物件 // 注:此時被觀察者物件還沒建立 Observable<Integer> observable = Observable.defer(new Callable<ObservableSource<? extends Integer>>() { @Override public ObservableSource<? extends Integer> call() throws Exception { return Observable.just(i); } }); <-- 2. 第2次對i賦值 ->> i = 15; <-- 3. 觀察者開始訂閱 ->> // 注:此時,才會呼叫defer()建立被觀察者物件(Observable) observable.subscribe(new Observer<Integer>() { @Override public void onSubscribe(Disposable d) { Log.d(TAG, "開始採用subscribe連線"); } @Override public void onNext(Integer value) { Log.d(TAG, "接收到的整數是"+ value ); } @Override public void onError(Throwable e) { Log.d(TAG, "對Error事件作出響應"); } @Override public void onComplete() { Log.d(TAG, "對Complete事件作出響應"); } });
- 測試結果
-
因為是在訂閱時才建立,所以i值會取第2次的賦值
timer()
- 作用
- 快速建立1個被觀察者物件(
Observable
) - 傳送事件的特點:延遲指定時間後,傳送1個數值0(
Long
型別)
- 快速建立1個被觀察者物件(
本質 = 延遲指定時間後,呼叫一次
onNext(0)
- 應用場景
延遲指定事件,傳送一個0,一般用於檢測
- 具體使用
// 該例子 = 延遲2s後,傳送一個long型別數值
Observable.timer(2, TimeUnit.SECONDS)
.subscribe(new Observer<Long>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "開始採用subscribe連線");
}
@Override
public void onNext(Long value) {
Log.d(TAG, "接收到了事件"+ value );
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "對Error事件作出響應");
}
@Override
public void onComplete() {
Log.d(TAG, "對Complete事件作出響應");
}
});
// 注:timer操作符預設執行在一個新執行緒上
// 也可自定義執行緒排程器(第3個引數):timer(long,TimeUnit,Scheduler)
- 測試結果
interval()
- 作用
- 快速建立1個被觀察者物件(
Observable
) - 傳送事件的特點:每隔指定時間 就傳送 事件
- 快速建立1個被觀察者物件(
傳送的事件序列 = 從0開始、無限遞增1的的整數序列
- 具體使用
// 引數說明:
// 引數1 = 第1次延遲時間;
// 引數2 = 間隔時間數字;
// 引數3 = 時間單位;
Observable.interval(3,1,TimeUnit.SECONDS)
// 該例子傳送的事件序列特點:延遲3s後傳送事件,每隔1秒產生1個數字(從0開始遞增1,無限個)
.subscribe(new Observer<Long>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "開始採用subscribe連線");
}
// 預設最先呼叫複寫的 onSubscribe()
@Override
public void onNext(Long value) {
Log.d(TAG, "接收到了事件"+ value );
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "對Error事件作出響應");
}
@Override
public void onComplete() {
Log.d(TAG, "對Complete事件作出響應");
}
});
// 注:interval預設在computation排程器上執行
// 也可自定義指定執行緒排程器(第3個引數):interval(long,TimeUnit,Scheduler)
- 測試結果
intervalRange()
- 作用
- 快速建立1個被觀察者物件(
Observable
) - 傳送事件的特點:每隔指定時間 就傳送 事件,可指定傳送的資料的數量
- 快速建立1個被觀察者物件(
a. 傳送的事件序列 = 從0開始、無限遞增1的的整數序列
b. 作用類似於interval()
,但可指定傳送的資料的數量
- 具體使用
// 引數說明:
// 引數1 = 事件序列起始點;
// 引數2 = 事件數量;
// 引數3 = 第1次事件延遲傳送時間;
// 引數4 = 間隔時間數字;
// 引數5 = 時間單位
Observable.intervalRange(3,10,2, 1, TimeUnit.SECONDS)
// 該例子傳送的事件序列特點:
// 1. 從3開始,一共傳送10個事件;
// 2. 第1次延遲2s傳送,之後每隔2秒產生1個數字(從0開始遞增1,無限個)
.subscribe(new Observer<Long>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "開始採用subscribe連線");
}
// 預設最先呼叫複寫的 onSubscribe()
@Override
public void onNext(Long value) {
Log.d(TAG, "接收到了事件"+ value );
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "對Error事件作出響應");
}
@Override
public void onComplete() {
Log.d(TAG, "對Complete事件作出響應");
}
});
- 測試結果
range()
- 作用
- 快速建立1個被觀察者物件(
Observable
) - 傳送事件的特點:連續傳送 1個事件序列,可指定範圍
- 快速建立1個被觀察者物件(
a. 傳送的事件序列 = 從0開始、無限遞增1的的整數序列
b. 作用類似於intervalRange()
,但區別在於:無延遲傳送事件
- 具體使用
// 引數說明:
// 引數1 = 事件序列起始點;
// 引數2 = 事件數量;
// 注:若設定為負數,則會丟擲異常
Observable.range(3,10)
// 該例子傳送的事件序列特點:從3開始傳送,每次傳送事件遞增1,一共傳送10個事件
.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "開始採用subscribe連線");
}
// 預設最先呼叫複寫的 onSubscribe()
@Override
public void onNext(Integer value) {
Log.d(TAG, "接收到了事件"+ value );
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "對Error事件作出響應");
}
@Override
public void onComplete() {
Log.d(TAG, "對Complete事件作出響應");
}
});
- 測試結果
rangeLong()
- 作用:類似於
range()
,區別在於該方法支援資料型別 =Long
- 具體使用
與range()
類似,此處不作過多描述
總結