1. 程式人生 > >【android】RxJava1原理解析

【android】RxJava1原理解析

寫在前面

讀了大神的部落格,為了學習,跪著看完再整理了出來,以便加深印象。

RxJava 是什麼,能解決什麼問題

  • github 官方介紹

RxJava is a Java VM implementation of Reactive Extensions: a library for composing asynchronous and event-based programs by using observable sequences.

一個執行在 JVM 上的庫,通過可觀測的序列來組成非同步的,基於事件的程式。

  • 解決問題

    讓複雜的程式邏輯迴歸簡單、清晰。​

API介紹和原理簡析

1、觀察者模式

  • RxJava 的四個基本概念
    • Observable 可觀察者、被觀察者
    • Observer 觀察者、訂閱者
    • subscribe 訂閱
    • Event 事件

Observable 和 Observer 通過 subscribe() 方法實現訂閱關係,從而 Observable 可以在需要的時候發出事件來通知 Observer。

與傳統觀察者模式不同,RxJAva的事件回撥方法除了普通事件 onNext()(相當於onClick()/onEvent())之外,還定義了兩個特殊的事件:onCompleted()onError()

  • onCompleted()

    : 事件佇列完結。RxJava 不僅把每個事件單獨處理,還會把它們看做一個佇列。RxJava 規定,當不會再有新的 onNext() 發出時,需要觸發 onCompleted() 方法作為標誌。

  • onError(): 事件佇列異常。在事件處理過程中出異常時,onError() 會被觸發,同時佇列自動終止,不允許再有事件發出。

  • 在一個正確執行的事件序列中, onCompleted()onError() 有且只有一個,並且是事件序列中的最後一個。需要注意的是,onCompleted()onError() 二者也是互斥的,即在佇列中呼叫了其中一個,就不應該再呼叫另一個。

這裡寫圖片描述

2、基本實現

基於以上的概念,RxJava的基本實現主要有三點:

(1)建立Observer

Observer<String> observer = new Observer<String>() {
        @Override
        public void onCompleted() {
            Log.d(TAG,"Completed!");
        }

        @Override
        public void onError(Throwable e) {
            Log.d(TAG,"Error!");
        }

        @Override
        public void onNext(String s) {
            Log.d(TAG,"Item:" + s);
        }
    };

除了Observer介面外,RxJava還內建了一個實現了Observer的抽象類:Subscriber。Subscriber堆Observer介面進行了一些擴充套件,但是它們的基本使用方式是完全一樣的。不僅基本使用方式一樣,實質上,在RxJava的subscribe過程中,Observer也總是會先被轉換成一個Subscriber再使用。它們區別對於使用者來說主要有兩點:

  1. onStart():這是 Subscriber增加的方法。它會在subscribe剛開始,而事件還未傳送之前被呼叫,可以用於做一些準備工作,例如資料的清零或重置。需要注意的是:如果準備工作的執行緒有要求,onStart()就不適用了,因為它總是在subscribe所發生的執行緒被呼叫,而不能指定執行緒。要再指定的執行緒來你做準備工作,可以適用doOnSubscribe()方法。
  2. unsubscribe():這是Subscriber所實現的另一個介面Subscription的方法,用於取消訂閱。在這個方法被呼叫後,Subscriber將不再接收事件。避免記憶體洩漏。

(2)建立Observable

RxJava適用create()方法來建立一個Observable,併為它定義事件觸發規則:

Observable observable = Observable.create(new Observable.OnSubscribe<String>() {
        @Override
        public void call(Subscriber<? super String> subscriber) {
            subscriber.onNext("I'm a");
            subscriber.onNext("I'm b");
            subscriber.onNext("I'm c");
            subscriber.onCompleted();
        }
    });

可以看到,這裡傳入了一個OnSubscribe物件作為引數。OnSubscribe會被儲存在返回的Observable物件中,它的作用相當於一個計劃表,當Observable被訂閱的時候,OnSubscribe的call()方法會自動被呼叫。這樣,由被觀察者呼叫了觀察者的回掉方法,就能實現由被觀察者向觀察者的事件傳遞,即觀察者模式。

create()方法是RxJava最基本的創造事件序列的方法。基於這個方法,RxJava還提供了以下方法用來快捷建立事件佇列,例如:

  • just(T...):將傳入的引數一次傳送出來。

    Observable observable = Observable.just("I'm a","I'm b","I'm c");
    //會依次呼叫 
    // onNext("I'm a");
    // onNext("I'm ");
    // onNext("I'm c");
    // onCompleted();
  • from(T[])/from(Iterable<? extends T>):將傳入的陣列或者Iterable拆分成具體物件後,依次傳送出來。

    String[] words = {"I'm a","I'm b","I'm c"};
     Observable observable = Observable.from(words);

(3)Subscribe(訂閱)

建立了觀察者和被觀察者之後,再用subscribe()方法將它們聯合起來,整條鏈子就可以工作了。

observable.subscribe(observer);

Observable.subscribe(Subscriber)的內部是顯示這樣的(僅核心程式碼):

// 注意:這不是 subscribe() 的原始碼,而是將原始碼中與效能、相容性、擴充套件性有關的程式碼剔除後的核心程式碼。
// 如果需要看原始碼,可以去 RxJava 的 GitHub 倉庫下載。
public Subscription subscribe(Subscriber subscriber) {
    subscriber.onStart();
    onSubscribe.call(subscriber);//在RxJava中,Observable並不是在建立的時候就立即開始傳送事件,而是在它被訂閱的時候,即當subscribe()方法執行的時候。
    return subscriber;//作為Subscription返回。這是為了方便unsubscribe();
}

除了subscribe(Observer)subscribe(Subscriber),subscribe()還支援不完整定義的回掉,RxJava會自動根據定義創建出Subscriber。形式如下:

//Action1也是一個介面,,它同樣只有一個方法 call(T param),這個方法也無返回值,但有一個引數;與 Action0 同理,由於 onNext(T obj) 和 onError(Throwable error) 也是單引數無返回值的,因此 Action1 可以將 onNext(obj) 和 onError(error) 打包起來傳入 subscribe() 以實現不完整定義的回撥。
Action1<String> onNextAction = new Action1<String>() {
    @Override
    public void call(String s) {
      Log.d(TAG,s);
    }
};
Action1<Throwable> onErrorAction = new Action1<Throwable>() {
    @Override
    public void call(Throwable throwable) {

    }
};
//Action0使RxJava的一個介面,它只有一個方法call(),這個方法是無參無返回值的;由於 onCompleted() 方法也是無參無返回值的,因此 Action0 可以被當成一個包裝物件,將 onCompleted() 的內容打包起來將自己作為一個引數傳入 subscribe() 以實現不完整定義的回撥。這樣其實也可以看做將 onCompleted() 方法作為引數傳進了 subscribe(),相當於其他某些語言中的『閉包』。
Action0 onCompletedAction = new Action0() {
    @Override
    public void call() {
      Log.d(TAG,"completed");
    }
};
//自動建立 Subscriber,並使用 onNextAction 來定義 onNext() 
observable.subscribe(onNextAction);
//自動建立 Subscriber,並使用 onNextAction 和 onErrorAction 來定義 onNext() 和 onError()
observable.subscribe(onNextAction,onErrorAction);
//自動建立 Subscriber,並使用 onNextAction 、 onErrorAction 和 onCompletedAction 來定義
// onNext() 、 onError() 和 onCompleted()
observable.subscribe(onNextAction,onErrorAction,onCompletedAction);

然而,上面都是同步的,要實現非同步,則需要用到RxJava的另一個概念:Scheduler。

3、執行緒控制–Scheduler(一)

在不指定執行緒的情況下,RxJava遵循的是執行緒不變的原則,即在那個執行緒呼叫subscribe(),就在那個執行緒生產時間;在那個執行緒生產事件,就在那個執行緒消費事件。如果需要切換執行緒,就需要用到Scheduler(排程器)。

(1)Scheduler的API(一)

在RxJava中,Scheduler———排程器,相當於執行緒控制器,RxJava通過它來指定每一段程式碼應該執行在什麼執行緒。RxJava已經內建了幾個Scheduler,它們已經適合大多數的使用場景:

  • Scheduler.immediate():直接在當前執行緒執行,相當於不指定執行緒。這也是預設的。
  • Scheduler.newThread():總是啟用新執行緒,並在新執行緒執行操作。
  • Scheduler.io():IO操作(讀寫檔案、讀寫資料庫、網路資訊互動等)所使用的Scheduler。行為模式和 newThread() 差不多,區別在於 io()的內部實現是使用一個無數量上限的執行緒池,可以重用空閒的執行緒,因此多數情況下 io()newThread()更有效率。不要把計算工作放在 io()中,可以避免建立不必要的執行緒。
  • Scheduler.computation():計算所使用的Scheduler。這個計算指的是CPU密集型計算,即不會被IO等操作限制性能的操作,例如圖形的計算。這個Scheduler所使用的固定的執行緒池,大小為CPU核數。不要把IO操作放在computation()中,否則IO操作的等待事件會浪費CPU。
  • 另外,Android還有一個專用的AndroidSchedulers.mainThread(),它指定的操作將在Android主執行緒執行。

有了這幾個Scheduler,就可以使用subscribeOn()observeOn()兩個方法來對執行緒進行控制了。

  • subscribeOn():指定subscribe()所發生的執行緒,即Observable.OnSubscribe()被啟用時所處的執行緒,或者叫做事件產生的執行緒。
  • observeOn():指定Subscriber所執行在的執行緒。或者叫做事件消費的執行緒。

上程式碼:

Observable.just(1, 2, 3, 4)
    .subscribeOn(Schedulers.io()) // 指定 subscribe() 發生在 IO 執行緒,意思是被建立的事件內容1,2,3,4會在io執行緒發出。
    .observeOn(AndroidSchedulers.mainThread()) // 指定 Subscriber 的回調發生在主執行緒,意思是subscriber數字的列印將發生在主執行緒。
    .subscribe(new Action1<Integer>() {
        @Override
        public void call(Integer number) {
            Log.d(tag, "number:" + number);
        }
    });

上面這段程式碼中,適用於多數的後臺執行緒讀取資料,主執行緒顯示的策略。

(2)Scheduler的原理(一)

放在後面,它的原理是以變換的原理作為基礎的。

4、變換

RxJava提供了對事件序列進行變換的支援,這是它的核心功能之一。所謂變換,就是將事件序列中的物件或整個序列進行加工處理,轉換成不同的事件或事件序列。

(1)API

首先看一個map()的例子:

observable.just("images/logo.png")
                .map(new Func1<String, Bitmap>() {
                    @Override
                    public Bitmap call(String s) {
                        return getBitmapFromPath(s);
                    }
                })
                .subscribe(new Action1<Bitmap>() {
                    @Override
                    public void call(Bitmap bitmap) {
                        showBitmap(bitmap);
                    }
                });

這裡出現了一個叫做Func1的類。它和Action1非常相似,也是RxJava的一個藉口,用於包裝含有一個引數的方法。Func1和Action1的區別在於,Func1包裝的是有返回值的方法。另外,和ActionX一樣,FuncX也有多個,用於不同引數個數的方法。

可以看到,map()方法將引數中的String物件轉換成一個Bitmap物件後返回,而在經過map()方法後,事件的引數型別也有String轉換為了Bitmap。這種直接變換物件並返回的,是最常見的也是最容易理解的變換。不過RxJava的比那還遠不止這樣,它不僅可以針對事件物件,還可以針對整個事件佇列,這使得RxJava變得非常靈活。列舉幾個常用的變換:

  • map():如上,事件物件的直接變換。

這裡寫圖片描述

  • flatMap():這是一個很有用但是非常難理解的變換。首先假設由這麼一種需求:假設有一個數據結構【學生】,現在需要打印出一組學生的名字。實現方式非常簡單:

        Student[] students = ...;
          Subscriber<String> subscriber = new Subscriber<String>() {
              @Override
              public void onNext(String s) {
                  Log.d(tag,s);
              }
              ...
          };
          Observable.from(students)
                  .map(new Func1<Student, String>() {
                      @Override
                      public String call(Student student) {
                          return student.getName();
                      }
                  })
                  .subscribe(subscriber);

    很簡單,再假設:如果要打印出每個學生所需要修的所有課程的名稱呢?(需求的區別在於,每個學生只有一個名字,但卻有多個課程。)首先可以這樣實現:

        Student[] students = ...;
          Subscriber<Student> subscriber = new Subscriber<Student>() {
              @Override
              public void onNext(Student student) {
                  List<Course> courses = student.getCourses();
                  for (int i = 0; i < courses.size(); i++) {
                      Course course = courses.get(i);
                      Log.d(tag, course.getName());
                  }
              }
            ...
          };
          Observable.from(students)
                  .subscribe(subscriber);
      }

    依然很簡單,那麼如果我不想在Subscriber中使用for迴圈,而是希望Subscriber中直接傳入單個的Course物件呢(這對於程式碼複用很重要)。用map()顯然是不行的,因為map()是一對一的轉換。那怎麼才能把一個Student轉化成多個Course呢?

    這時候,就要用到flatMap()了:

        Student[] students = ...;
          Subscriber<Course> subscriber = new Subscriber<Course>() {
              @Override
              public void onCompleted() {
    
              }
    
              @Override
              public void onError(Throwable e) {
    
              }
    
              @Override
              public void onNext(Course course) {
                  Log.d(TAG,course.getName());
              }
          };
          Observable.from(students)
                  .flatMap(new Func1<Student, Observable<Course>>() {
                      @Override
                      public Observable<Course> call(Student student) {
                          return Observable.from(student.getCourses());
                      }
                  })
                  .subscribe(subscriber);

    從上面的程式碼可以看出,flatMap()map()有一個相同點:它也是把傳入的引數轉化之後返回另一個物件。但需要注意的,和map()不同的是,flatMap()中返回的是個Observable物件,並且這個Observable物件並不是北直接傳送到Subscriber的回撥方法中。

    flatMap()的原理是這樣的:

    1. 使用傳入的事件物件建立一個Observable物件;
    2. 並不傳送這個Observable物件,而是將它啟用,於是它開始傳送事件;
    3. 每一個創建出來的Observable傳送的事件,都被匯入同一個Observable,而這個Observable負責將這些事件統一交給Subscriber的回掉方法。

    這三個步驟,把事件拆分成兩級,通過一組新建立的Observable將初始的物件【鋪平】之後通過統一路徑分發了下去。而這個【鋪平】就是flatMap()所謂的flat。

    flatMap()示意圖:

這裡寫圖片描述

擴充套件:由於可以在巢狀的Observable中新增非同步程式碼,flatMap()也常用語巢狀的非同步操作,例如巢狀的網路請求。示例程式碼(Retrofit+RxJava):

java
networkClient.token()
.flatMap(new Func1<String,Observable<Messages>>(){
@Override
public Observable<Messages> call(String token) {
// 返回 Observable<Messages>,在訂閱時請求訊息列表,並在響應後傳送請求到的訊息列表
return networkClient.messages();
}
})
.subscribe(new Action1<Messages>(){
@Override
public void call(Messages messages) {
// 處理顯示訊息列表
showMessages(messages);
}
});

傳統的巢狀請求需要使用巢狀的Callback來實現。而通過flatMap(),可以把巢狀的請求寫在一條鏈中,從而保持程式邏輯的清晰。

  • throttleFirst():在每次事件觸發後的一定時間間隔內丟棄新的事件。常用作去抖動過濾,例如按鈕的點選監聽器:RxJava.clickEvents(button) //RxBinding程式碼。 .throttleFirst(500,Timeunit.MILLISECONDS)//設定防抖間隔為500ms .subscribe(subscriber)。不必再但因使用者手都點來兩個重複的介面了。

    RxJava還提供很多便捷的方法來實現事件序列的變換。就不一一列舉了。

(2)變換的原理:lift()

這些變換雖然功能各有不同,但實質上都是針對事件序列的處理和再發送。而在RxJava內部,它們是基於同一個基礎的變換方法lift(Operator)。首先看一下lift()的內部實現(僅核心程式碼):

//注意:這裡不是lift()的原始碼,而是將原始碼中與效能、相容性、擴充套件性有關的程式碼剔除後的核心程式碼。
public <R> Observable<R> lift(Operator<? extends R,? super T> operator){
  return Observable.create(new onSubscribe<R>(){
    @Override
    public void call(Subscriber subscriber){
      Subscriber newSubscriber = operator.call(subscriber);
      newSubscriber.onStart();
      onSubscribe.call(newSubscriber);
    }
  });
}

這段程式碼很有意思:它生成了一個新的Observable並返回,而且建立新的Observable所用的引數OnSubscribe的回撥方法call()的實現竟然和前面說過的Observable.subscribe()一樣!然而它們並不一樣哦,不一樣的地方關鍵就在於第二行onSubscribe.call(subscriber) 中的OnSubscribe**所指代的物件不同**

  • subscribe()中這句話的 OnSubscribe 指的是 Observable 中的 onSubscribe 物件,這個沒有問題,但是lift()之後的情況就複雜了點。

  • 當有lift()時:

    1. lift()建立了一個 Observable 後,加上之前原始的 Observable,已經有兩個 Observable 了;
    2. 而同樣的,新 Observable 裡的新 OnSubscribe 加上之前的原始的 Observable 中的原始 OnSubscribe ,也就有了兩個 OnSubscribe;
    3. 當用戶呼叫經過 lift()後的 Observable 的 subscribe()的時候,使用的是lift()所返回的新的 Observable,於是它觸發的onSubscribe.call(subscriber),也是用的新 OnSubscribe,即在lift()中生成的那個 OnSubscribe;
    4. 而這個新的 OnSubscribe 的 call()方法中的 onSubscribe,就是指的原始 Observable 中的原始 OnSubscribe,在這個call()方法裡,新 OnSubscribe 利用operator.call(subscriber)生成了一個新的 Subscriber( Operator 就是在這裡,通過自己的call()方法將新 Subscriber 和原始 Subscriber 進行關聯,並插入自己的【變換】程式碼以實現變換),然後利用這個新的 Subscriber 向原始 Observable 進行訂閱。

    這樣就實現了lift()過程,有點像一種代理機制,通過事件攔截和處理實現事件序列的變換。

精簡掉細節的話,也可以這麼說:在 Observable 執行了 lift(Operator)方法之後,會返回一個新的 Observable,這個新的 Observable 會像一個代理一樣,負責接收原始的 Observable 發出的事件,並在處理之後傳送給 Subscriber。

如圖:

這裡寫圖片描述

兩次和多次的lift()同理,如下圖:

這裡寫圖片描述

舉一個具體的 Operator 的實現。下面這是一個將事件中的 Integer 物件轉換成 String 的例子,僅供參考:

Observable.lift(new Observable.Operator<String,Integer>(){
  @Override
  public Subscriber<? super Integer> call(final Subscriber<? super String> subscriber){
    //將事件序列中的Integer物件轉換為String物件
    return new Subscriber<Integer>{
      @Override
      public void onNext(Integer integer){
        subscriber.onNext("" + integer);
      }

      @Override
      public void onCompleted(){
        subscriber.onCompleted();
      }

      @Override
      public void onError(Throwable e){
        subscriber.onError(e);
      }
    };
  }
});

講述lift()的原理只是為了更好的瞭解RxJAva,從而可以更好的使用它。然而不管是否瞭解lift()的原理,RxJava都不建議開發者自定義Operator來直接使用lift(),而是建議使用已有的lift()包裝方法(如map() flatMap()等)進行組合來實現需求,因為直接使用lift()非常容易發生一些難以發現的錯誤。

(3)compose: 對Observable整體的變換·

除了lift()之外,Observable還有一個變換方法叫做compose(Transformer)。它和lift()的區別在於,lift()是針對事件項和事件序列的,而compose()是針對Observable自身進行變換。舉個例子,假設在程式中有多個Observable,並且他們都需要應用一組相同的lift()變換。可以這麼寫:

observable1
  .lift1()
  .lift2()
  .lift3()
  .lift4()
  .subscribe(subscriber1);
observable2
  .lift1()
  .lift2()
  .lift3()
  .lift4()
  .subscribe(subscriber2);
observable3
  .lift1()
  .lift2()
  .lift3()
  .lift4()
  .subscribe(subscriber3);
observable4
  .lift1()
  .lift2()
  .lift3()
  .lift4()
  .subscribe(subscriber4);

這樣太不軟體工程了,於是你改成了這樣:

private Observable liftAll(Observable observable){
  return observable
        .lift1()
        .lift2()
        .lift3()
        .lift4();
}
...
liftAll(observable1).subscribe(subscriber1);
liftAll(observable2).subscribe(subscriber2);
liftAll(observable3).subscribe(subscriber3);
liftAll(observable4).subscribe(subscriber4);

可讀性、可維護性都提高了。可是Observable被一個方法包起來,這種方式對於Observable的靈活性似乎還是增添了那麼點限制。怎麼辦?這個時候,就應該用compose()來解決了:

public class LiftAllTransfomer implements Observable.Transformer<Integer,String>{
  @Override
  public Observable<String> call(Observable<Integer> observable){
    return observable
      .lift1()
      .lift2()
      .lift3()
      .lift4();
  } 
}
...
Transformer liftAll = new LiftAllTransformer();
observable1.compose(liftAll).subscribe(subscriber1);
observable2.compose(liftAll).subscribe(subscriber2);
observable3.compose(liftAll).subscribe(subscriber3);
observable4.compose(liftAll).subscribe(subscriber4);

像上面這樣,使用compose()方法,Observable可以利用傳入的Transformer物件的call()方法直接對自身進行處理,也就不必被包在方法裡面了。

5、執行緒控制:Scheduler(二)

除了靈活的變換,RxJava另一個牛逼的地方,就是執行緒的自由控制。

(1)Scheduler的API(二)

前面說過了,可以利用subscribeOn()結合observeOn()來實現執行緒控制,讓事件的產生和消費發生在不同的執行緒。可是在瞭解map() flatMap()等變換方法後,有些人就問了:能不能多切換幾次執行緒?

答案是:能。因為observeOn()指定的是Subscriber的執行緒,而這個Subscriber並不是(嚴格來說應該是【不一定是】,但這裡不妨理解為【不是】)subscribe()引數中的Subscriber,而是observeOn()執行時的當前Observable所對應的Subscriber,即它的直接下級Subscriber。換句話說,observeOn()指定的是它之後的操作所在的執行緒。因此如果有多次執行緒切換的需求,只要在每個想要切換執行緒的位置呼叫依次observeOn()即可。上程式碼:

Observable.just(1,2,3,4)//IO執行緒,由subscribeOn()指定
  .subscribeOn(Schedulers.io())
  .observeOn(Schedulers.newThread())
  .map(mapOperator)//新執行緒,由observeOn()指定
  .observeOn(Schedulers.io())
  .map(mapOperator2)//IO執行緒,由observeOn()指定
  .observeOn(AndroidSchedulers.mainThread)
  .subscribe(subscriber);//Android組合縣城,由observeOn()指定

如上,通過observeOn()的多次呼叫,程式實現了執行緒的多次切換。

不過,不同於observeOn(),subscribeOn()的位置放在那裡都可以,但它是隻能呼叫一次的。

(2)Scheduler的原理(二)

其實,subscribeOn()observeOn()的內部實現,也是用的lift()。具體看圖(不同顏色的箭頭表示不同的執行緒):

subscribeOn()原理圖:

這裡寫圖片描述

observeOn()原理圖:

這裡寫圖片描述

從圖中可以看出,subscribeOn()observeOn()都做了執行緒切換的工作(圖中的”schedule…”部位)。不同的是,subscribeOn()的執行緒切換髮生在 OnSubscribe 中,即在它通知上一級 OnSubscribe 時,這時事件還沒有開始傳送,因此subscribeOn()的執行緒控制可以從事件發出的開端就造成影響;而observeOn()的執行緒切換則發生在它內建的 Subscriber 中,即發生在它即將給下一級 Subscriber 傳送事件時,因此observeOn()控制的是它後面的執行緒。

最後,用一張圖來解釋當多個subscribeOn()observeOn()混合使用時,執行緒排程時怎麼發生的(由於圖中物件較多,相對於上面的圖對結構做了一些簡化調整):

這裡寫圖片描述

圖中共有5處含有對事件的操作。由圖中可以看出,1和2兩處受第一個subscribeOn()影響,執行在紅色執行緒;3和4受第一個observeOn()的影響,執行在綠色執行緒;5處受第二個onserveOn()的影響,執行在紫色執行緒;而第二個subscribeOn()由於在通知過程就被第一個subscribeOn()截斷,因此對整個流程並沒有任何影響。所以:當使用了多個subscribeOn()的時候,只有第一個起作用。

(3)延伸:doOnSubscribe()

然而,雖然超過一個的subscribeOn()對事件處理的流程沒有影響,但在流程之前卻是可以利用的。

在前面說到Subscriber的時候,提到過Subscriber的onStart()可以用作流程開始前的初始化。然而onStart()由於在subscribe()發生時就被呼叫了,因此不能指定執行緒,而是隻能執行在subscribe()被呼叫的執行緒。這就導致如果onStart()中含有對執行緒有要求的程式碼(例如在介面上顯示一個ProgressBar,這必須在主執行緒中執行),將會有執行緒非法的風險,因為有時你無法預測subscribe()將會在什麼執行緒執行。

而與subscriber.onStart()相對應的,有一個方法Observable.doOnSubscribe()。它和Subscriber.onStart()同樣是在subscribe()呼叫後而且在事件傳送前執行,但區別在於它可以指定執行緒。預設情況下,doOnSubscribe()執行在subscribe()發生的執行緒;而如果在doOnSubscribe()之後有subscribeOn()的話,它將執行在離它最近的subscribeOn()所指定的執行緒。

示例程式碼:

Observable.create(onSubscribe)
    .subscribeOn(Schedulers.io())
    .doOnSubscribe(new Action0() {
        @Override
        public void call() {
            progressBar.setVisibility(View.VISIBLE); // 需要在主執行緒執行
        }
    })
    .subscribeOn(AndroidSchedulers.mainThread()) // 指定主執行緒
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(subscriber);

如上,在doOnSubscribe()的後面跟一個subscribeOn(),就能指定準備工作的執行緒了。

RxJava的適用場景和使用方式

1、與Retrofit的結合

Retrofit 是 Square 的一個著名的網路請求庫。

Retrofit 除了提供了傳統的 Callback 形式的API,還有 RxJava 版本的 Observable 形式的API。下面用對比的方式來介紹 Retrofit 的 RxJava 版 API 和傳統版本的區別。

以獲取一個 User 物件的介面作為例子。使用 Retrofit 的傳統 API,可以用這樣的方式定義請求:

@GET("/user")
public void getUser(@Query("userId") String userId,Callback<user> callback);

在程式的構建過程中,Retrofit 會自動把方法實現並生成程式碼,然後開發者可以利用下面的方法來獲取特定使用者並處理響應:

getUser(userId,new Callback<User>(){
  @Override
  public void success(User user){
    userView.setUser(user);
  }
  @Override
  public void failure(RetrofitError error){
    //error handing
    ...
  }
});

而使用RxJava形式的API,定義同樣的請求是這樣的:

@GET("/user")
public Observable<User> getUser(@Query("userId") String userId);

使用的時候是這樣的:

getUser(userId)
  .observeOn(AndroidSchedulers.mainThread())
  .subscribe(new Observer<User>(){
        @Override
        public void onNext(User user) {
            userView.setUser(user);
        }

        @Override
        public void onCompleted() {
        }

        @Override
        public void onError(Throwable error) {
            // Error handling
            ...
        }
  });

區別就在於Retrofit把請求封裝進Observable,在請求結束後呼叫onNext()或在請求失敗後呼叫onError()。但本質都差不多,而且在細節上 Observable的形式似乎還比Cakkback形式要差一些,那Retrofit為什麼還要提供RxJava的支援呢?

因為它好用啊,從這個例子看不出來因為這只是最簡單的情況。而情況一旦複雜起來,Callback形式馬上就會開始讓人頭疼。

比如,這樣一種情況:你的程式渠道的User並不應該直接顯示,而是需要先於資料庫中的資料進行比對和修正後再顯示。使用Callback方式大概可以這麼寫:

getUser(userId,new Callback<User>(){
    @Override
    public void success(User user) {
        processUser(user); // 嘗試修正 User 資料
        userView.setUser(user);
    }

    @Override
    public void failure(RetrofitError error) {
        // Error handling
        ...
    }

});

很簡便,但是不要這樣做,為什麼?因為這樣做會影響效能。資料庫的操作很重,一次讀寫操作花費10ms-20ms是很常見的,這樣的耗時操作容易造成介面卡頓。所以通常情況下,如果可以的話一定要避免在主執行緒中處理資料庫。所以為了提升效能,這段程式碼可以優化以下:

getUser(userId, new Callback<User>() {
    @Override
    public void success(User user) {
        new Thread() {
            @Override
            public void run() {
                processUser(user); // 嘗試修正 User 資料
                runOnUiThread(new Runnable() { // 切回 UI 執行緒
                    @Override
                    public void run() {
                        userView.setUser(user);
                    }
                });
            }).start();
    }

    @Override
    public void failure(RetrofitError error) {
        // Error handling
        ...
    }
});

效能問題解決,但是單嗎就變得比較亂了,蜜汁縮排!

看RxJava的形式:

getUser(userId)
  .doOnNext(new Action1<User>(){
        @Override
        public void call(User user) {
            processUser(user);
        }
  })
  .observeOn(AndroidSchedulers.mainThread())
  .subscribe(new Observer<User>(){
        @Override
        public void onNext(User user) {
            userView.setUser(user);
        }

        @Override
        public void onCompleted() {
        }

        @Override
        public void onError(Throwable error) {
            // Error handling
            ...
        }
  });

2、RxBinding

RxBinding是Jake Wharton 的一個開源庫,它提供了一套在 Android 平臺上的基於 RxJava 的 Binding API。所謂 Binding,就是類似設定 OnClickListener、設定TextWatcher這樣的註冊繫結物件API。

舉個設定點選監聽的例子。使用RxBinding,可以把事件監聽用這樣的方法來設定:

Button button = ...;
RxView.clickEvents(button) // 以 Observable 形式來反饋點選事件
    .subscribe(new Action1<ViewClickEvent>() {
        @Override
        public void call(ViewClickEvent event) {
            // Click handling
        }
    });

看起來除了形式變了沒什麼區別,實質上也是這樣。甚至如果你看一下它的原始碼,你會發現它連實現都沒什麼驚喜:它的內部是直接用一個包裹著的 setOnClickListener() 來實現的。然而,僅僅這一個形式的改變,卻恰好就是 RxBinding 的目的:擴充套件性。通過 RxBinding把點選監聽轉換成 Observable 之後,就有了對它進行擴充套件的可能。擴充套件的方式有很多,根據需求而定。一個例子是前面提到過的 throttleFirst() ,用於去抖動,也就是消除手抖導致的快速連環點選:

RxView.clickEvents(button)
    .throttleFirst(500,TimeUnit.MILLISECONDS)
    .subscribe(clickAvction);

3、各種非同步操作

前面舉的 RetrofitRxBinding 的例子,是兩個可以提供現成的 Observable 的庫。而如果你有某些非同步操作無法用這些庫來自動生成 Observable,也完全可以自己寫。例如資料庫的讀寫、大圖片的載入、檔案壓縮/解壓等各種需要放在後臺工作的耗時操作,都可以用 RxJava 來實現,有了之前幾章的例子,這裡應該不用再舉例了。

4、RxBus

RxBus 名字看起來像一個庫,但它並不是一個庫,而是一種模式,它的思想是使用 RxJava 來實現了 EventBus ,而讓你不再需要使用 Otto 或者 GreenRobot 的 EventBus。至於什麼是 RxBus,可以看這篇文章。順便說一句,Flipboard 已經用 RxBus 替換掉了 Otto ,目前為止沒有不良反應。