1. 程式人生 > >Android RxJava操作符的學習---建立操作符

Android RxJava操作符的學習---建立操作符

  • RxJava如此受歡迎的原因,在於其提供了豐富 & 功能強大的操作符,幾乎能完成所有的功能需求

1. 簡介

RxJava 操作符的具體簡介如下:

2. 型別

  • RxJava功能強大,所以其對應的操作符的型別也非常多
  • 根據功能作用的不同,RxJava的操作符分類如下:

3. 操作符詳解

注:在使用RxJava 2操作符前,記得在專案的Gradle中新增依賴:


dependencies {
      compile 'io.reactivex.rxjava2:rxandroid:2.0.1'
      compile 'io.reactivex.rxjava2:rxjava:2.0.7'
      // 注:RxJava2 與 RxJava1 不能共存,即依賴不能同時存在
}

3.1 建立操作符

  • 作用
    建立 被觀察者( Observable) 物件 & 傳送事件。

  • 應用場景

  • 型別
    根據上述應用場景,建立操作符的型別包括:

 

3.1.1 基本建立

  • 需求場景
    完整的建立被觀察者物件

  • 對應操作符型別

create()

  • 作用
    完整建立1個被觀察者物件(Observable

RxJava 中建立被觀察者物件最基本的操作符

  • 具體使用
  • / **
       * 1. 通過creat()建立被觀察者 Observable 物件
       */ 
            Observable<Integer> observable = Observable.create(new ObservableOnSubscribe<Integer>() {
              // 傳入引數: OnSubscribe 物件
              // 當 Observable 被訂閱時,OnSubscribe 的 call() 方法會自動被呼叫,即事件序列就會依照設定依次被觸發
              // 即觀察者會依次呼叫對應事件的複寫方法從而響應事件
              // 從而實現由被觀察者向觀察者的事件傳遞 & 被觀察者呼叫了觀察者的回撥方法 ,即觀察者模式
    / **
       * 2. 在複寫的subscribe()裡定義需要傳送的事件
       */ 
                @Override
                public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                    // 通過 ObservableEmitter類物件 產生 & 傳送事件
                    // ObservableEmitter類介紹
                        // a. 定義:事件發射器
                        // b. 作用:定義需要傳送的事件 & 向觀察者傳送事件
                       // 注:建議傳送事件前檢查觀察者的isUnsubscribed狀態,以便在沒有觀察者時,讓Observable停止發射資料
                        if (!observer.isUnsubscribed()) {
                               emitter.onNext(1);
                               emitter.onNext(2);
                               emitter.onNext(3);
                    }
                    emitter.onComplete();
                }
            });
    
    // 至此,一個完整的被觀察者物件(Observable)就建立完畢了。
    
    
    

    在具體使用時,一般採用 鏈式呼叫 來建立

            // 1. 通過creat()建立被觀察者物件
            Observable.create(new ObservableOnSubscribe<Integer>() {
    
                // 2. 在複寫的subscribe()裡定義需要傳送的事件
                @Override
                public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
    
                        emitter.onNext(1);
                        emitter.onNext(2);
                        emitter.onNext(3);
    
                    emitter.onComplete();
                }  // 至此,一個被觀察者物件(Observable)就建立完畢
            }).subscribe(new Observer<Integer>() {
                // 以下步驟僅為展示一個完整demo,可以忽略
                // 3. 通過通過訂閱(subscribe)連線觀察者和被觀察者
                // 4. 建立觀察者 & 定義響應事件的行為
                @Override
                public void onSubscribe(Disposable d) {
                    Log.d(TAG, "開始採用subscribe連線");
                }
                // 預設最先呼叫複寫的 onSubscribe()
    
                @Override
                public void onNext(Integer value) {
                    Log.d(TAG, "接收到了事件"+ value  );
                }
    
                @Override
                public void onError(Throwable e) {
                    Log.d(TAG, "對Error事件作出響應");
                }
    
                @Override
                public void onComplete() {
                    Log.d(TAG, "對Complete事件作出響應");
                }
    
            });
        }
    

    具體例子見前文

3.1.2 快速建立 & 傳送事件

  • 需求場景
    快速的建立被觀察者物件

  • 對應操作符型別

just()

  • 作用
    1. 快速建立1個被觀察者物件(Observable
    2. 傳送事件的特點:直接傳送 傳入的事件

注:最多隻能傳送10個引數

  • 應用場景
    快速建立 被觀察者物件(Observable) & 傳送10個以下事件

  • 具體使用

        // 1. 建立時傳入整型1、2、3、4
        // 在建立後就會發送這些物件,相當於執行了onNext(1)、onNext(2)、onNext(3)、onNext(4)
        Observable.just(1, 2, 3,4)   
            // 至此,一個Observable物件建立完畢,以下步驟僅為展示一個完整demo,可以忽略
            // 2. 通過通過訂閱(subscribe)連線觀察者和被觀察者
            // 3. 建立觀察者 & 定義響應事件的行為
         .subscribe(new Observer<Integer>() {
            
            @Override
            public void onSubscribe(Disposable d) {
                Log.d(TAG, "開始採用subscribe連線");
            }
            // 預設最先呼叫複寫的 onSubscribe()

            @Override
            public void onNext(Integer value) {
                Log.d(TAG, "接收到了事件"+ value  );
            }

            @Override
            public void onError(Throwable e) {
                Log.d(TAG, "對Error事件作出響應");
            }

            @Override
            public void onComplete() {
                Log.d(TAG, "對Complete事件作出響應");
            }

        });
    }
  • 測試結果

fromArray()

  • 作用
    1. 快速建立1個被觀察者物件(Observable
    2. 傳送事件的特點:直接傳送 傳入的陣列資料

會將陣列中的資料轉換為Observable物件

  • 應用場景

    1. 快速建立 被觀察者物件(Observable) & 傳送10個以上事件(陣列形式)
    2. 陣列元素遍歷
  • 具體使用

      // 1. 設定需要傳入的陣列
     Integer[] items = { 0, 1, 2, 3, 4 };

        // 2. 建立被觀察者物件(Observable)時傳入陣列
        // 在建立後就會將該陣列轉換成Observable & 傳送該物件中的所有資料
        Observable.fromArray(items) 
        .subscribe(new Observer<Integer>() {
            @Override
            public void onSubscribe(Disposable d) {
                Log.d(TAG, "開始採用subscribe連線");
            }

            @Override
            public void onNext(Integer value) {
                Log.d(TAG, "接收到了事件"+ value  );
            }

            @Override
            public void onError(Throwable e) {
                Log.d(TAG, "對Error事件作出響應");
            }

            @Override
            public void onComplete() {
                Log.d(TAG, "對Complete事件作出響應");
            }

        });
    }

// 注:
// 可傳送10個以上引數
// 若直接傳遞一個list集合進去,否則會直接把list當做一個數據元素髮送

/*
  * 陣列遍歷
  **/
        // 1. 設定需要傳入的陣列
        Integer[] items = { 0, 1, 2, 3, 4 };

        // 2. 建立被觀察者物件(Observable)時傳入陣列
        // 在建立後就會將該陣列轉換成Observable & 傳送該物件中的所有資料
        Observable.fromArray(items)
                .subscribe(new Observer<Integer>() {
                    @Override
                    public void onSubscribe(Disposable d) {
                        Log.d(TAG, "陣列遍歷");
                    }

                    @Override
                    public void onNext(Integer value) {
                        Log.d(TAG, "陣列中的元素 = "+ value  );
                    }

                    @Override
                    public void onError(Throwable e) {
                        Log.d(TAG, "對Error事件作出響應");
                    }

                    @Override
                    public void onComplete() {
                        Log.d(TAG, "遍歷結束");
                    }

                });

  • 測試結果

fromIterable()

  • 作用
    1. 快速建立1個被觀察者物件(Observable
    2. 傳送事件的特點:直接傳送 傳入的集合List資料

會將陣列中的資料轉換為Observable物件

  • 應用場景

    1. 快速建立 被觀察者物件(Observable) & 傳送10個以上事件(集合形式)
    2. 集合元素遍歷
  • 具體使用

/*
 * 快速傳送集合
 **/
// 1. 設定一個集合
        List<Integer> list = new ArrayList<>();
        list.add(1);
        list.add(2);
        list.add(3);

// 2. 通過fromIterable()將集合中的物件 / 資料傳送出去
        Observable.fromIterable(list)
                .subscribe(new Observer<Integer>() {
                    @Override
                    public void onSubscribe(Disposable d) {
                        Log.d(TAG, "開始採用subscribe連線");
                    }

                    @Override
                    public void onNext(Integer value) {
                        Log.d(TAG, "接收到了事件"+ value  );
                    }

                    @Override
                    public void onError(Throwable e) {
                        Log.d(TAG, "對Error事件作出響應");
                    }

                    @Override
                    public void onComplete() {
                        Log.d(TAG, "對Complete事件作出響應");
                    }
                });


/*
 * 集合遍歷
 **/
        // 1. 設定一個集合
        List<Integer> list = new ArrayList<>();
        list.add(1);
        list.add(2);
        list.add(3);

        // 2. 通過fromIterable()將集合中的物件 / 資料傳送出去
        Observable.fromIterable(list)
                .subscribe(new Observer<Integer>() {
                    @Override
                    public void onSubscribe(Disposable d) {
                        Log.d(TAG, "集合遍歷");
                    }

                    @Override
                    public void onNext(Integer value) {
                        Log.d(TAG, "集合中的資料元素 = "+ value  );
                    }

                    @Override
                    public void onError(Throwable e) {
                        Log.d(TAG, "對Error事件作出響應");
                    }

                    @Override
                    public void onComplete() {
                        Log.d(TAG, "遍歷結束");
                    }
                });
  • 測試結果

額外

// 下列方法一般用於測試使用

<-- empty()  -->
// 該方法建立的被觀察者物件傳送事件的特點:僅傳送Complete事件,直接通知完成
Observable observable1=Observable.empty(); 
// 即觀察者接收後會直接呼叫onCompleted()

<-- error()  -->
// 該方法建立的被觀察者物件傳送事件的特點:僅傳送Error事件,直接通知異常
// 可自定義異常
Observable observable2=Observable.error(new RuntimeException())
// 即觀察者接收後會直接呼叫onError()

<-- never()  -->
// 該方法建立的被觀察者物件傳送事件的特點:不傳送任何事件
Observable observable3=Observable.never();
// 即觀察者接收後什麼都不呼叫


3.1.3 延遲建立

  • 需求場景
    1. 定時操作:在經過了x秒後,需要自動執行y操作
    2. 週期性操作:每隔x秒後,需要自動執行y操作

defer()

  • 作用
    直到有觀察者(Observer )訂閱時,才動態建立被觀察者物件(Observable) & 傳送事件
  1. 通過 Observable工廠方法建立被觀察者物件(Observable
  2. 每次訂閱後,都會得到一個剛建立的最新的Observable物件,這可以確保Observable物件裡的資料是最新的
  • 應用場景
    動態建立被觀察者物件(Observable) & 獲取最新的Observable物件資料

  • 具體使用

  •        <-- 1. 第1次對i賦值 ->>
            Integer i = 10;
    
            // 2. 通過defer 定義被觀察者物件
            // 注:此時被觀察者物件還沒建立
            Observable<Integer> observable = Observable.defer(new Callable<ObservableSource<? extends Integer>>() {
                @Override
                public ObservableSource<? extends Integer> call() throws Exception {
                    return Observable.just(i);
                }
            });
    
            <-- 2. 第2次對i賦值 ->>
            i = 15;
    
            <-- 3. 觀察者開始訂閱 ->>
            // 注:此時,才會呼叫defer()建立被觀察者物件(Observable)
            observable.subscribe(new Observer<Integer>() {
    
                @Override
                public void onSubscribe(Disposable d) {
                    Log.d(TAG, "開始採用subscribe連線");
                }
    
                @Override
                public void onNext(Integer value) {
                    Log.d(TAG, "接收到的整數是"+ value  );
                }
    
                @Override
                public void onError(Throwable e) {
                    Log.d(TAG, "對Error事件作出響應");
                }
    
                @Override
                public void onComplete() {
                    Log.d(TAG, "對Complete事件作出響應");
                }
            });
    
  • 測試結果
  • 因為是在訂閱時才建立,所以i值會取第2次的賦值

 

timer()

  • 作用
    1. 快速建立1個被觀察者物件(Observable
    2. 傳送事件的特點:延遲指定時間後,傳送1個數值0(Long型別)

本質 = 延遲指定時間後,呼叫一次 onNext(0)

  • 應用場景
    延遲指定事件,傳送一個0,一般用於檢測
  • 具體使用
        // 該例子 = 延遲2s後,傳送一個long型別數值
        Observable.timer(2, TimeUnit.SECONDS) 
                  .subscribe(new Observer<Long>() {
            @Override
            public void onSubscribe(Disposable d) {
                Log.d(TAG, "開始採用subscribe連線");
            }

            @Override
            public void onNext(Long value) {
                Log.d(TAG, "接收到了事件"+ value  );
            }

            @Override
            public void onError(Throwable e) {
                Log.d(TAG, "對Error事件作出響應");
            }

            @Override
            public void onComplete() {
                Log.d(TAG, "對Complete事件作出響應");
            }

        });

// 注:timer操作符預設執行在一個新執行緒上
// 也可自定義執行緒排程器(第3個引數):timer(long,TimeUnit,Scheduler) 
  • 測試結果

interval()

  • 作用
    1. 快速建立1個被觀察者物件(Observable
    2. 傳送事件的特點:每隔指定時間 就傳送 事件

傳送的事件序列 = 從0開始、無限遞增1的的整數序列

  • 具體使用
       // 引數說明:
        // 引數1 = 第1次延遲時間;
        // 引數2 = 間隔時間數字;
        // 引數3 = 時間單位;
        Observable.interval(3,1,TimeUnit.SECONDS)
                // 該例子傳送的事件序列特點:延遲3s後傳送事件,每隔1秒產生1個數字(從0開始遞增1,無限個)
                .subscribe(new Observer<Long>() {
                    @Override
                    public void onSubscribe(Disposable d) {
                        Log.d(TAG, "開始採用subscribe連線");
                    }
                    // 預設最先呼叫複寫的 onSubscribe()

                    @Override
                    public void onNext(Long value) {
                        Log.d(TAG, "接收到了事件"+ value  );
                    }

                    @Override
                    public void onError(Throwable e) {
                        Log.d(TAG, "對Error事件作出響應");
                    }

                    @Override
                    public void onComplete() {
                        Log.d(TAG, "對Complete事件作出響應");
                    }

                });

// 注:interval預設在computation排程器上執行
// 也可自定義指定執行緒排程器(第3個引數):interval(long,TimeUnit,Scheduler)
  • 測試結果

 

intervalRange()

  • 作用
    1. 快速建立1個被觀察者物件(Observable
    2. 傳送事件的特點:每隔指定時間 就傳送 事件,可指定傳送的資料的數量

a. 傳送的事件序列 = 從0開始、無限遞增1的的整數序列
b. 作用類似於interval(),但可指定傳送的資料的數量

  • 具體使用
// 引數說明:
        // 引數1 = 事件序列起始點;
        // 引數2 = 事件數量;
        // 引數3 = 第1次事件延遲傳送時間;
        // 引數4 = 間隔時間數字;
        // 引數5 = 時間單位
        Observable.intervalRange(3,10,2, 1, TimeUnit.SECONDS)
                // 該例子傳送的事件序列特點:
                // 1. 從3開始,一共傳送10個事件;
                // 2. 第1次延遲2s傳送,之後每隔2秒產生1個數字(從0開始遞增1,無限個)
                .subscribe(new Observer<Long>() {
                    @Override
                    public void onSubscribe(Disposable d) {
                        Log.d(TAG, "開始採用subscribe連線");
                    }
                    // 預設最先呼叫複寫的 onSubscribe()

                    @Override
                    public void onNext(Long value) {
                        Log.d(TAG, "接收到了事件"+ value  );
                    }

                    @Override
                    public void onError(Throwable e) {
                        Log.d(TAG, "對Error事件作出響應");
                    }

                    @Override
                    public void onComplete() {
                        Log.d(TAG, "對Complete事件作出響應");
                    }

                });
  • 測試結果

range()

  • 作用
    1. 快速建立1個被觀察者物件(Observable
    2. 傳送事件的特點:連續傳送 1個事件序列,可指定範圍

a. 傳送的事件序列 = 從0開始、無限遞增1的的整數序列
b. 作用類似於intervalRange(),但區別在於:無延遲傳送事件

  • 具體使用

// 引數說明:
        // 引數1 = 事件序列起始點;
        // 引數2 = 事件數量;
        // 注:若設定為負數,則會丟擲異常
        Observable.range(3,10)
                // 該例子傳送的事件序列特點:從3開始傳送,每次傳送事件遞增1,一共傳送10個事件
                .subscribe(new Observer<Integer>() {
                    @Override
                    public void onSubscribe(Disposable d) {
                        Log.d(TAG, "開始採用subscribe連線");
                    }
                    // 預設最先呼叫複寫的 onSubscribe()

                    @Override
                    public void onNext(Integer value) {
                        Log.d(TAG, "接收到了事件"+ value  );
                    }

                    @Override
                    public void onError(Throwable e) {
                        Log.d(TAG, "對Error事件作出響應");
                    }

                    @Override
                    public void onComplete() {
                        Log.d(TAG, "對Complete事件作出響應");
                    }

                });

  • 測試結果

rangeLong()

  • 作用:類似於range(),區別在於該方法支援資料型別 = Long
  • 具體使用
    range()類似,此處不作過多描述

總結