1. 程式人生 > >RxJava學習 筆記

RxJava學習 筆記

RxJava學習筆記

#### 1.什麼是RxJava

一個實現非同步操作的庫

RxJava依賴:

   compile 'io.reactivex.rxjava2:rxjava:2.0.1'
    compile 'io.reactivex.rxjava2:rxandroid:2.0.1'

2.RxJava的好處

隨著程式邏輯變得越來越複雜,它依然能夠保持簡潔。

3.RxJava 的觀察者模式

​ RxJava有四個觀察者模式 :

3.1基本概念

Observable (被觀察者)

​  Observer (觀察者)

​  subscribe (訂閱)、事件。

​ Observable 和 Observer 通過 subscribe() 方法實現訂閱關係,從而 Observable 可以在需要的時候發出事件來通知 

Observer`。

3.2事件回撥的三種方式

​ onNext() 普通的回撥方法

​ onCompleted() 發出事件的集合

​ onError() 事件中的異常時呼叫 同時會終止事件傳送

    總結: onCompleted()和onError()在一個佇列中只能存在其中一種,並且在佇列末端.觀察者接收到其中一個回撥之後就會停止接收事件.

3.3 Observer的基本使用

​ 除了Observer的介面外,另外還有一個抽象類對其進行擴充套件:Subscriber 基本使用都一樣

    建立方法: 

​ Subscriber subscriber = new Subscriber() {
​ public void onNext(String s) {

}


​ public void onCompleted() {

}

     public void onError(Throwable e) {

          }

};

  • 帶有一個Consumer引數的方法表示下游只關心onNext事件, 其他的事件我假裝沒看見, 因此我們如果只需要onNext事件可以這麼寫:

    //訂閱觀察者
    subscribe(new Consumer<Integer>() {
              @Override
              public void accept(Integer integer) throws Exception {
                  Log.d(TAG, "onNext: " + integer);
              }
  • Subscriber的不同

    onStart() 在事件開始之前呼叫 可以用作資料的清理和重置 但不能做耗時操作, 可以使用 doOnsubscrible方法進行耗時操作

    unsubscribe() 取消訂閱的方法, 在此方法呼叫前可以使用isUnsubscribe進行狀態判斷,要注意關閉訂閱,以免發生記憶體洩露

    3.4 Observable的基本使用

    Observable observable = Observable.create(new Observable.OnSubscribe() {
    ​ @Override
    ​ public void call(Subscriber

//字串的傳遞
    //建立一個被觀察者          
Observable.create(new Observable.OnSubscribe<String>() {
    @Override
    public void call(Subscriber<? super String> subscriber) {
        //需要傳遞的物件
        observer.onNext("cccc");
    }
    //訂閱觀察者
}).subscribe(new Observer<String>() {
    @Override
    public void onCompleted() {

    }

    @Override
    public void onError(Throwable e) {

    }

    @Override
    public void onNext(String s) {
        //接收傳遞的物件
    }
});

上面兩個例子都是在同一執行緒中執行.

4.執行緒控制器 Scheduler

​ RxJava通過它來指定每一段程式碼應該執行在怎樣的執行緒,RxJava內建一下幾種控制器:

  • Schedulers.immediate(): 直接在當前執行緒執行,相當於不指定執行緒。這是預設的 Scheduler

  • Schedulers.newThread(): 總是啟用新執行緒,並在新執行緒執行操作

  • Schedulers.io(): I/O 操作(讀寫檔案、讀寫資料庫、網路資訊互動等)所使用的 

  • Schedulers.computation(): 計算所使用的 Scheduler。這個計算指的是 CPU 密集型計算,即不會被 I/O 等操作限制性能的操作,例如圖形的計算。這個 Scheduler 使用的固定的執行緒池,大小為 CPU 核數。不要把 I/O 操作放在 computation() 中,否則 I/O 操作的等待時間會浪費 CPU。

  • 另外, Android 還有一個專用的 AndroidSchedulers.mainThread(),它指定的操作將在 Android 主執行緒執行。

    有了這幾個 Scheduler ,就可以使用 subscribeOn() 和 observeOn() 兩個方法來對執行緒進行控制了。

    subscribeOn(): 指定 subscribe() 所發生的執行緒,即 Observable.OnSubscribe被啟用時所處的執行緒。或者叫做事件產生的執行緒。

    observeOn(): 指定 Subscriber 所執行在的執行緒。或者叫做事件消費的執行緒。

    //執行緒切換演示
    Observable.just(1,2,3)
          .subscribeOn(Schedulers.io())   //讓事件在子執行緒在子執行緒執行
          .observeOn(AndroidSchedulers.mainThread())      //觀察者返回主執行緒執行
          .subscribe(new Action1<Integer>() {
              @Override
              public void call(Integer integer) {
                  Toast.makeText(MainActivity.this, "出來了", Toast.LENGTH_SHORT).show();
              }
          });

4.1 操作符

所謂變換,就是將事件序列中的物件或整個序列進行加工處理,轉換成不同的事件或事件序列

  • map轉化符 : 可以轉換髮送資料的型別.
//變換的演示
Observable.just("圖片地址")
        .map(new Func1<String, Bitmap>() {
            @Override
            public Bitmap call(String s) {
                //呼叫方法把字串轉化圖片
                return ;
            }
        })
        .subscribe(new Action1<Bitmap>() {
            @Override
            public void call(Bitmap bitmap) {
                //顯示圖片
            }
        });
  • flatMap :將一個傳送事件的上游Observable變換為多個傳送事件的Observables,然後將它們發射的事件合併後放進一個單獨的Observable裡.
  • Zip通過一個函式將多個Observable傳送的事件結合到一起,然後傳送這些組合到一起的事件. 它按照嚴格的順序應用這個函式。它只發射與發射資料項最少的那個Observable一樣多的資料。
  • sample : 個操作符每隔指定的時間就從上游中取出一個事件傳送給下游. 這裡我們讓它每隔2秒取一個事件給下游, 來看看這次的執行結果吧:

5 .Disposable物件

​ 當呼叫它的dispose()方法時, 它就會將兩根管道切斷, 從而導致下游收不到事件.

​ 注意: 呼叫dispose()並不會導致上游不再繼續傳送事件, 上游會繼續傳送剩餘的事件

​ 那如果有多個Disposable 該怎麼辦呢, RxJava中已經內建了一個容器CompositeDisposable, 每當我們得到一個Disposable時就呼叫CompositeDisposable.add()將它新增到容器中, 在退出的時候, 呼叫CompositeDisposable.clear() 即可切斷所有的水管.