1. 程式人生 > >深入淺出RxJava就這一篇就夠了

深入淺出RxJava就這一篇就夠了

前言:

第一次接觸RxJava是在前不久,一個新Android專案的啟動,在評估時選擇了RxJava。RxJava是一個基於事件訂閱的非同步執行的一個類庫。聽起來有點複雜,其實是要你使用過一次,就會大概明白它是怎麼回事了!為是什麼一個Android專案啟動會聯絡到RxJava呢?因為在RxJava使用起來得到廣泛的認可,又是基於Java語言的。自然會有善於組織和總結的開發者聯想到Android!沒錯,RxAndroid就這樣在RxJava的基礎上,針對Android開發的一個庫。今天我們主要是來講解一下RxJava,在接下來的幾篇部落格中我會陸續帶大家來認識RxAndroid,Retrofit框架的使用,這些都是目前比較火的一些技術框架!

官方的介紹

1.支援Java6+

2.android 2.3+

3.非同步的

4.基於觀察者設計模式(Observer、Observable)不懂設計模式的可以移步到此:淺談Java設計模式(十五)觀察者模式(Observer)

5.Subscribe (訂閱)

正式使用RxJava

用框架或者庫都是為了簡潔、方便,RxJava也不例外它能使你的程式碼邏輯更加的簡潔。舉個例子之前我們先來引入依賴的 gradle 程式碼:

compile 'io.reactivex:rxjava:1.0.14' 
compile 'io.reactivex:rxandroid:1.0.1' 

既然是基於非同步,當然要在處理比較耗時的操作上才能彰顯它的優勢!現在我們假設有這樣一個需求:
需要實現一個多個下載的圖片並且顯示的功能,它的作用可以新增多個下載操作,由於下載這一過程較為耗時,需要放在後臺執行,而圖片的顯示則必須在 UI 執行緒執行。常用的實現方式有多種,我這裡貼出其中一種:

new Thread() {
    @Override
    public void run() {
        super.run();
        for (File folder : folders) {
            File[] files = folder.listFiles();
            for (File file : files) {
                if (file.getName().endsWith(".png")) {
                    final Bitmap bitmap = getBitmapFromFile(file);
                    getActivity().runOnUiThread(new Runnable() {
                        @Override
                        public void run() {
                            imageCollectorView.addImage(bitmap);
                        }
                    });
                }
            }
        }
    }
}.start();
裡面的判斷是不是看起來有點暈暈,當然這是我自己寫的,我一眼就能看清楚裡面的邏輯,但是如果換做是別人來閱讀你的程式碼,這就比較的尷尬了!
我們來看看使用RxJava的程式碼:
Observable.from(folders)
    .flatMap(new Func1<File, Observable<File>>() {
        @Override
        public Observable<File> call(File file) {
            return Observable.from(file.listFiles());
        }
    })
    .filter(new Func1<File, Boolean>() {
        @Override
        public Boolean call(File file) {
            return file.getName().endsWith(".png");
        }
    })
    .map(new Func1<File, Bitmap>() {
        @Override
        public Bitmap call(File file) {
            return getBitmapFromFile(file);
        }
    })
    .subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(new Action1<Bitmap>() {
        @Override
        public void call(Bitmap bitmap) {
            imageCollectorView.addImage(bitmap);
        }
    });
是不是明瞭,雖然說算不上簡單,但是習慣了就一如既往了!

如果你使用的AndroidStudio的話,你開啟Java檔案的時候,你會看到被自動 Lambda 化的預覽,這將讓你更加清晰地看到程式邏輯:

Observable.from(folders)
    .flatMap((Func1) (folder) -> { Observable.from(file.listFiles()) })
    .filter((Func1) (file) -> { file.getName().endsWith(".png") })
    .map((Func1) (file) -> { getBitmapFromFile(file) })
    .subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe((Action1) (bitmap) -> { imageCollectorView.addImage(bitmap) });
不過如果你對Java8還不是很瞭解的話呢這一段可以暫時忽略,但是你可以移步到這裡瞭解一下Java8:Java8部分新特性介紹

看完程式碼,是不是有種相見恨晚的衝動?別急,我們來慢慢了解RxJava!

前面已經提到他是基於Java觀察者設計模式的,這個模式上面有給大家連結,可以去看看,這裡不不坐過多的介紹,我們來介紹一下RxJava中的觀察者模式:
RxJava 的觀察者模式

一、說明
1)RxJava 有四個基本概念:Observable (可觀察者,即被觀察者)、 Observer (觀察者)、 subscribe (訂閱)、事件。Observable 和 Observer 通過 subscribe() 方法實現訂閱關係,從而 Observable 可以在需要的時候發出事件來通知 Observer。
2)與傳統觀察者模式不同, RxJava 的事件回撥方法除了普通事件 onNext() (相當於 onClick() / onEvent())之外,還定義了兩個特殊的事件:onCompleted() 和 onError()。
3)onCompleted(): 事件佇列完結。RxJava 不僅把每個事件單獨處理,還會把它們看做一個佇列。RxJava 規定,當不會再有新的 onNext() 發出時,需要觸發 onCompleted() 方法作為標誌。
4)onError(): 事件佇列異常。在事件處理過程中出異常時,onError() 會被觸發,同時佇列自動終止,不允許再有事件發出。
5)在一個正確執行的事件序列中, onCompleted() 和 onError() 有且只有一個,並且是事件序列中的最後一個。需要注意的是,onCompleted() 和 onError() 二者也是互斥的,即在佇列中呼叫了其中一個,就不應該再呼叫另一個。

二、實現
1) 建立 Observer
Observer 即觀察者,它決定事件觸發的時候將有怎樣的行為。 RxJava 中的 Observer 介面的實現方式:

Observer<String> observer = new Observer<String>() {
    @Override
    public void onNext(String s) {
        Log.d(tag, "Item: " + s);
    }

    @Override
    public void onCompleted() {
        Log.d(tag, "Completed!");
    }

    @Override
    public void onError(Throwable e) {
        Log.d(tag, "Error!");
    }
};
除了 Observer 介面之外,RxJava 還內建了一個實現了 Observer 的抽象類:Subscriber。 Subscriber 對 Observer 介面進行了一些擴充套件,但他們的基本使用方式是完全一樣的:
Subscriber<String> subscriber = new Subscriber<String>() {
    @Override
    public void onNext(String s) {
        Log.d(tag, "Item: " + s);
    }

    @Override
    public void onCompleted() {
        Log.d(tag, "Completed!");
    }

    @Override
    public void onError(Throwable e) {
        Log.d(tag, "Error!");
    }
};
不僅基本使用方式一樣,實質上,在 RxJava 的 subscribe 過程中,Observer 也總是會先被轉換成一個 Subscriber 再使用。所以如果你只想使用基本功能,選擇 Observer 和 Subscriber 是完全一樣的。它們的區別對於使用者來說主要有兩點:
onStart(): 這是 Subscriber 增加的方法。它會在 subscribe 剛開始,而事件還未傳送之前被呼叫,可以用於做一些準備工作,例如資料的清零或重置。這是一個可選方法,預設情況下它的實現為空。需要注意的是,如果對準備工作的執行緒有要求(例如彈出一個顯示進度的對話方塊,這必須在主執行緒執行), onStart() 就不適用了,因為它總是在 subscribe 所發生的執行緒被呼叫,而不能指定執行緒。要在指定的執行緒來做準備工作,可以使用 doOnSubscribe() 方法,具體可以在後面的文中看到。
unsubscribe(): 這是 Subscriber 所實現的另一個介面 Subscription 的方法,用於取消訂閱。在這個方法被呼叫後,Subscriber 將不再接收事件。一般在這個方法呼叫前,可以使用 isUnsubscribed() 先判斷一下狀態。 unsubscribe() 這個方法很重要,因為在 subscribe() 之後, Observable 會持有 Subscriber 的引用,這個引用如果不能及時被釋放,將有記憶體洩露的風險。所以最好保持一個原則:要在不再使用的時候儘快在合適的地方(例如 onPause() onStop() 等方法中)呼叫 unsubscribe() 來解除引用關係,以避免記憶體洩露的發生。

2) 建立 Observable
Observable 即被觀察者,它決定什麼時候觸發事件以及觸發怎樣的事件。 RxJava 使用 create() 方法來建立一個 Observable ,併為它定義事件觸發規則:
Observable observable = Observable.create(new Observable.OnSubscribe<String>() {
    @Override
    public void call(Subscriber<? super String> subscriber) {
        subscriber.onNext("Hello");
        subscriber.onNext("Hi");
        subscriber.onNext("Aloha");
        subscriber.onCompleted();
    }
});
可以看到,這裡傳入了一個 OnSubscribe 物件作為引數。OnSubscribe 會被儲存在返回的 Observable 物件中,它的作用相當於一個計劃表,當 Observable 被訂閱的時候,OnSubscribe 的 call() 方法會自動被呼叫,事件序列就會依照設定依次觸發(對於上面的程式碼,就是觀察者Subscriber 將會被呼叫三次 onNext() 和一次 onCompleted())。這樣,由被觀察者呼叫了觀察者的回撥方法,就實現了由被觀察者向觀察者的事件傳遞,即觀察者模式

create() 方法是 RxJava 最基本的創造事件序列的方法。基於這個方法, RxJava 還提供了一些方法用來快捷建立事件佇列,例如:
just(T...): 將傳入的引數依次傳送出來。

Observable observable = Observable.just("Hello", "Hi", "Aloha");
// 將會依次呼叫:
// onNext("Hello");
// onNext("Hi");
// onNext("Aloha");
// onCompleted();
from(T[]) / from(Iterable<? extends T>) : 將傳入的陣列或 Iterable 拆分成具體物件後,依次傳送出來。
String[] words = {"Hello", "Hi", "Aloha"};
Observable observable = Observable.from(words);
// 將會依次呼叫:
// onNext("Hello");
// onNext("Hi");
// onNext("Aloha");
// onCompleted();
上面 just(T...) 的例子和 from(T[]) 的例子,都和之前的 create(OnSubscribe) 的例子是等價的。

3) Subscribe (訂閱)
建立了 Observable 和 Observer 之後,再用 subscribe() 方法將它們聯結起來,整條鏈子就可以工作了。程式碼形式很簡單:
observable.subscribe(observer);
// 或者:
observable.subscribe(subscriber);
Observable.subscribe(Subscriber) 的內部實現是這樣的(僅核心程式碼):
// 注意:這不是 subscribe() 的原始碼,而是將原始碼中與效能、相容性、擴充套件性有關的程式碼剔除後的核心程式碼。
// 如果需要看原始碼,可以去 RxJava 的 GitHub 倉庫下載。
public Subscription subscribe(Subscriber subscriber) {
    subscriber.onStart();
    onSubscribe.call(subscriber);
    return subscriber;
}
可以看到,subscriber() 做了3件事:
1.呼叫 Subscriber.onStart() 。這個方法在前面已經介紹過,是一個可選的準備方法。
2.呼叫 Observable 中的 OnSubscribe.call(Subscriber) 。在這裡,事件傳送的邏輯開始執行。從這也可以看出,在 RxJava 中, Observable 並不是在建立的時候就立即開始傳送事件,而是在它被訂閱的時候,即當 subscribe() 方法執行的時候。
3.將傳入的 Subscriber 作為 Subscription 返回。這是為了方便 unsubscribe().

除了 subscribe(Observer) 和 subscribe(Subscriber) ,subscribe() 還支援不完整定義的回撥,RxJava 會自動根據定義創建出 Subscriber 。形式如下:
Action1<String> onNextAction = new Action1<String>() {
    // onNext()
    @Override
    public void call(String s) {
        Log.d(tag, s);
    }
};
Action1<Throwable> onErrorAction = new Action1<Throwable>() {
    // onError()
    @Override
    public void call(Throwable throwable) {
        // Error handling
    }
};
Action0 onCompletedAction = new Action0() {
    // onCompleted()
    @Override
    public void call() {
        Log.d(tag, "completed");
    }
};

// 自動建立 Subscriber ,並使用 onNextAction 來定義 onNext()
observable.subscribe(onNextAction);
// 自動建立 Subscriber ,並使用 onNextAction 和 onErrorAction 來定義 onNext() 和 onError()
observable.subscribe(onNextAction, onErrorAction);
// 自動建立 Subscriber ,並使用 onNextAction、 onErrorAction 和 onCompletedAction 來定義 onNext()、 onError() 和 onCompleted()
observable.subscribe(onNextAction, onErrorAction, onCompletedAction);
簡單解釋一下這段程式碼中出現的 Action1 和 Action0。 Action0 是 RxJava 的一個介面,它只有一個方法 call(),這個方法是無參無返回值的;由於 onCompleted() 方法也是無參無返回值的,因此 Action0 可以被當成一個包裝物件,將 onCompleted() 的內容打包起來將自己作為一個引數傳入 subscribe() 以實現不完整定義的回撥。這樣其實也可以看做將 onCompleted() 方法作為引數傳進了 subscribe(),相當於其他某些語言中的『閉包』。 Action1 也是一個介面,它同樣只有一個方法 call(T param),這個方法也無返回值,但有一個引數;與 Action0 同理,由於 onNext(T obj) 和 onError(Throwable error) 也是單引數無返回值的,因此 Action1 可以將 onNext(obj) 和 onError(error) 打包起來傳入 subscribe() 以實現不完整定義的回撥。事實上,雖然 Action0 和 Action1 在 API 中使用最廣泛,但 RxJava 是提供了多個 ActionX 形式的介面 (例如 Action2, Action3) 的,它們可以被用以包裝不同的無返回值的方法。

4) 場景示例

下面舉兩個例子:
a. 列印字串陣列
將字串陣列 names 中的所有字串依次打印出來:

String[] names = ...;
Observable.from(names)
    .subscribe(new Action1<String>() {
        @Override
        public void call(String name) {
            Log.d(tag, name);
        }
    });

b. 由 id 取得圖片並顯示
由指定的一個 drawable 檔案 id drawableRes 取得圖片,並顯示在 ImageView 中,並在出現異常的時候列印 Toast 報錯:
int drawableRes = ...;
ImageView imageView = ...;
Observable.create(new OnSubscribe<Drawable>() {
    @Override
    public void call(Subscriber<? super Drawable> subscriber) {
        Drawable drawable = getTheme().getDrawable(drawableRes));
        subscriber.onNext(drawable);
        subscriber.onCompleted();
    }
}).subscribe(new Observer<Drawable>() {
    @Override
    public void onNext(Drawable drawable) {
        imageView.setImageDrawable(drawable);
    }

    @Override
    public void onCompleted() {
    }

    @Override
    public void onError(Throwable e) {
        Toast.makeText(activity, "Error!", Toast.LENGTH_SHORT).show();
    }
});
正如上面兩個例子這樣,創建出 Observable 和 Subscriber ,再用 subscribe() 將它們串起來,一次 RxJava 的基本使用就完成了。非常簡單。

注意:在 RxJava 的預設規則中,事件的發出和消費都是在同一個執行緒的。也就是說,如果只用上面的方法,實現出來的只是一個同步的觀察者模式。觀察者模式本身的目的就是『後臺處理,前臺回撥』的非同步機制,因此非同步對於 RxJava 是至關重要的。而要實現非同步,則需要用到 RxJava 的另一個概念: Scheduler 。

執行緒控制 —— Scheduler (一)
前言:

在不指定執行緒的情況下, RxJava 遵循的是執行緒不變的原則,即:在哪個執行緒呼叫 subscribe(),就在哪個執行緒生產事件;在哪個執行緒生產事件,就在哪個執行緒消費事件。如果需要切換執行緒,就需要用到 Scheduler (排程器)。

1) Scheduler 的 API (一)
在RxJava 中,Scheduler ——排程器,相當於執行緒控制器,RxJava 通過它來指定每一段程式碼應該執行在什麼樣的執行緒。RxJava 已經內建了幾個 Scheduler ,它們已經適合大多數的使用場景:
Schedulers.immediate(): 直接在當前執行緒執行,相當於不指定執行緒。這是預設的 Scheduler。
Schedulers.newThread(): 總是啟用新執行緒,並在新執行緒執行操作。
Schedulers.io(): I/O 操作(讀寫檔案、讀寫資料庫、網路資訊互動等)所使用的 Scheduler。行為模式和 newThread() 差不多,區別在於 io() 的內部實現是是用一個無數量上限的執行緒池,可以重用空閒的執行緒,因此多數情況下 io() 比 newThread() 更有效率。不要把計算工作放在 io() 中,可以避免建立不必要的執行緒。
Schedulers.computation(): 計算所使用的 Scheduler。這個計算指的是 CPU 密集型計算,即不會被 I/O 等操作限制性能的操作,例如圖形的計算。這個 Scheduler 使用的固定的執行緒池,大小為 CPU 核數。不要把 I/O 操作放在 computation() 中,否則 I/O 操作的等待時間會浪費 CPU。
另外, Android 還有一個專用的 AndroidSchedulers.mainThread(),它指定的操作將在 Android 主執行緒執行。
有了這幾個 Scheduler ,就可以使用 subscribeOn() 和 observeOn() 兩個方法來對執行緒進行控制了。 * subscribeOn(): 指定 subscribe() 所發生的執行緒,即 Observable.OnSubscribe 被啟用時所處的執行緒。或者叫做事件產生的執行緒。 * observeOn(): 指定 Subscriber 所執行在的執行緒。或者叫做事件消費的執行緒。

程式碼來理解上面的文字敘述:

Observable.just(1, 2, 3, 4)
    .subscribeOn(Schedulers.io()) // 指定 subscribe() 發生在 IO 執行緒
    .observeOn(AndroidSchedulers.mainThread()) // 指定 Subscriber 的回調發生在主執行緒
    .subscribe(new Action1<Integer>() {
        @Override
        public void call(Integer number) {
            Log.d(tag, "number:" + number);
        }
    });
上面這段程式碼中,由於 subscribeOn(Schedulers.io()) 的指定,被建立的事件的內容 1、2、3、4 將會在 IO 執行緒發出;而由於 observeOn(AndroidScheculers.mainThread()) 的指定,因此 subscriber 數字的列印將發生在主執行緒 。事實上,這種在 subscribe() 之前寫上兩句 subscribeOn(Scheduler.io()) 和 observeOn(AndroidSchedulers.mainThread()) 的使用方式非常常見,它適用於多數的 『後臺執行緒取資料,主執行緒顯示』的程式策略。

而前面提到的由圖片 id 取得圖片並顯示的例子,如果也加上這兩句:
int drawableRes = ...;
ImageView imageView = ...;
Observable.create(new OnSubscribe<Drawable>() {
    @Override
    public void call(Subscriber<? super Drawable> subscriber) {
        Drawable drawable = getTheme().getDrawable(drawableRes));
        subscriber.onNext(drawable);
        subscriber.onCompleted();
    }
})
.subscribeOn(Schedulers.io()) // 指定 subscribe() 發生在 IO 執行緒
.observeOn(AndroidSchedulers.mainThread()) // 指定 Subscriber 的回調發生在主執行緒
.subscribe(new Observer<Drawable>() {
    @Override
    public void onNext(Drawable drawable) {
        imageView.setImageDrawable(drawable);
    }
    @Override
    public void onCompleted() {
    }
    @Override
    public void onError(Throwable e) {
        Toast.makeText(activity, "Error!", Toast.LENGTH_SHORT).show();
    }
});
那麼,載入圖片將會發生在 IO 執行緒,而設定圖片則被設定在了主執行緒。這就意味著,即使載入圖片耗費了幾十甚至幾百毫秒的時間,也不會造成絲毫介面的卡頓。

2) Scheduler 的原理 (一)
RxJava 的 Scheduler API 很方便,也很神奇(加了一句話就把執行緒切換了,怎麼做到的?而且 subscribe() 不是最外層直接呼叫的方法嗎,它竟然也能被指定執行緒?)。然而 Scheduler 的原理需要放在後面講,因為它的原理是以下一節《變換》的原理作為基礎的。
好吧這一節其實我屁也沒說,只是為了讓你安心,讓你知道我不是忘了講原理,而是把它放在了更合適的地方。


變換
RxJava 提供了對事件序列進行變換的支援,這是它的核心功能之一,也是大多數人說『RxJava 真是太好用了』的最大原因。所謂變換,就是將事件序列中的物件或整個序列進行加工處理,轉換成不同的事件或事件序列。概念說著總是模糊難懂的,來看 API。
1) API
首先看一個 map() 的例子:
Observable.just("images/logo.png") // 輸入型別 String
    .map(new Func1<String, Bitmap>() {
        @Override
        public Bitmap call(String filePath) { // 引數型別 String
            return getBitmapFromPath(filePath); // 返回型別 Bitmap
        }
    })
    .subscribe(new Action1<Bitmap>() {
        @Override
        public void call(Bitmap bitmap) { // 引數型別 Bitmap
            showBitmap(bitmap);
        }
    });
這裡出現了一個叫做 Func1 的類。它和 Action1 非常相似,也是 RxJava 的一個介面,用於包裝含有一個引數的方法。 Func1 和 Action 的區別在於, Func1 包裝的是有返回值的方法。另外,和 ActionX 一樣, FuncX 也有多個,用於不同引數個數的方法。FuncX 和 ActionX 的區別在 FuncX 包裝的是有返回值的方法。
可以看到,map() 方法將引數中的 String 物件轉換成一個 Bitmap 物件後返回,而在經過 map() 方法後,事件的引數型別也由 String 轉為了 Bitmap。這種直接變換物件並返回的,是最常見的也最容易理解的變換。不過 RxJava 的變換遠不止這樣,它不僅可以針對事件物件,還可以針對整個事件佇列,這使得 RxJava 變得非常靈活。我列舉幾個常用的變換:
map(): 事件物件的直接變換,具體功能上面已經介紹過。它是 RxJava 最常用的變換。 map() 的示意圖:
flatMap(): 這是一個很有用但非常難理解的變換,因此我決定花多些篇幅來介紹它。 首先假設這麼一種需求:假設有一個數據結構『學生』,現在需要打印出一組學生的名字。實現方式很簡單:
Student[] students = ...;
Subscriber<String> subscriber = new Subscriber<String>() {
    @Override
    public void onNext(String name) {
        Log.d(tag, name);
    }
    ...
};
Observable.from(students)
    .map(new Func1<Student, String>() {
        @Override
        public String call(Student student) {
            return student.getName();
        }
    })
    .subscribe(subscriber);
很簡單。那麼再假設:如果要打印出每個學生所需要修的所有課程的名稱呢?(需求的區別在於,每個學生只有一個名字,但卻有多個課程。)首先可以這樣實現:
Student[] students = ...;
Subscriber<Student> subscriber = new Subscriber<Student>() {
    @Override
    public void onNext(Student student) {
        List<Course> courses = student.getCourses();
        for (int i = 0; i < courses.size(); i++) {
            Course course = courses.get(i);
            Log.d(tag, course.getName());
        }
    }
    ...
};
Observable.from(students)
    .subscribe(subscriber);
依然很簡單。那麼如果我不想在 Subscriber 中使用 for 迴圈,而是希望 Subscriber 中直接傳入單個的 Course 物件呢(這對於程式碼複用很重要)?用 map() 顯然是不行的,因為 map() 是一對一的轉化,而我現在的要求是一對多的轉化。那怎麼才能把一個 Student 轉化成多個 Course 呢?
這個時候,就需要用 flatMap() 了:
Student[] students = ...;
Subscriber<Course> subscriber = new Subscriber<Course>() {
    @Override
    public void onNext(Course course) {
        Log.d(tag, course.getName());
    }
    ...
};
Observable.from(students)
    .flatMap(new Func1<Student, Observable<Course>>() {
        @Override
        public Observable<Course> call(Student student) {
            return Observable.from(student.getCourses());
        }
    })
    .subscribe(subscriber);
從上面的程式碼可以看出, flatMap() 和 map() 有一個相同點:它也是把傳入的引數轉化之後返回另一個物件。但需要注意,和 map() 不同的是, flatMap() 中返回的是個 Observable 物件,並且這個 Observable 物件並不是被直接傳送到了 Subscriber 的回撥方法中。 flatMap() 的原理是這樣的:1. 使用傳入的事件物件建立一個 Observable 物件;2. 並不傳送這個 Observable, 而是將它啟用,於是它開始傳送事件;3. 每一個創建出來的 Observable 傳送的事件,都被匯入同一個 Observable ,而這個 Observable 負責將這些事件統一交給 Subscriber 的回撥方法。這三個步驟,把事件拆成了兩級,通過一組新建立的 Observable 將初始的物件『鋪平』之後通過統一路徑分發了下去。而這個『鋪平』就是 flatMap() 所謂的 flat。

擴充套件:由於可以在巢狀的 Observable 中新增非同步程式碼, flatMap() 也常用於巢狀的非同步操作,例如巢狀的網路請求。示例程式碼(Retrofit + RxJava):
networkClient.token() // 返回 Observable<String>,在訂閱時請求 token,並在響應後傳送 token
    .flatMap(new Func1<String, Observable<Messages>>() {
        @Override
        public Observable<Messages> call(String token) {
            // 返回 Observable<Messages>,在訂閱時請求訊息列表,並在響應後傳送請求到的訊息列表
            return networkClient.messages();
        }
    })
    .subscribe(new Action1<Messages>() {
        @Override
        public void call(Messages messages) {
            // 處理顯示訊息列表
            showMessages(messages);
        }
    });
傳統的巢狀請求需要使用巢狀的 Callback 來實現。而通過 flatMap() ,可以把巢狀的請求寫在一條鏈中,從而保持程式邏輯的清晰。
throttleFirst(): 在每次事件觸發後的一定時間間隔內丟棄新的事件。常用作去抖動過濾,例如按鈕的點選監聽器: RxView.clickEvents(button) // RxBinding 程式碼,後面的文章有解釋 .throttleFirst(500, TimeUnit.MILLISECONDS) // 設定防抖間隔為 500ms .subscribe(subscriber); 媽媽再也不怕我的使用者手抖點開兩個重複的介面啦。
此外, RxJava 還提供很多便捷的方法來實現事件序列的變換,這裡就不一一舉例了。

2) 變換的原理:lift()
這些變換雖然功能各有不同,但實質上都是針對事件序列的處理和再發送。而在 RxJava 的內部,它們是基於同一個基礎的變換方法: lift(Operator)。首先看一下 lift() 的內部實現(僅核心程式碼):
// 注意:這不是 lift() 的原始碼,而是將原始碼中與效能、相容性、擴充套件性有關的程式碼剔除後的核心程式碼。
// 如果需要看原始碼,可以去 RxJava 的 GitHub 倉庫下載。
public <R> Observable<R> lift(Operator<? extends R, ? super T> operator) {
    return Observable.create(new OnSubscribe<R>() {
        @Override
        public void call(Subscriber subscriber) {
            Subscriber newSubscriber = operator.call(subscriber);
            newSubscriber.onStart();
            onSubscribe.call(newSubscriber);
        }
    });
}
這段程式碼很有意思:它生成了一個新的 Observable 並返回,而且建立新 Observable 所用的引數 OnSubscribe 的回撥方法 call() 中的實現竟然看起來和前面講過的 Observable.subscribe() 一樣!然而它們並不一樣喲~不一樣的地方關鍵就在於第二行 onSubscribe.call(subscriber) 中的 onSubscribe 所指代的物件不同(高能預警:接下來的幾句話可能會導致身體的嚴重不適)——
subscribe() 中這句話的 onSubscribe 指的是 Observable 中的 onSubscribe 物件,這個沒有問題,但是 lift() 之後的情況就複雜了點。
當含有 lift() 時: 
1.lift() 建立了一個 Observable 後,加上之前的原始 Observable,已經有兩個 Observable 了; 
2.而同樣地,新 Observable 裡的新 OnSubscribe 加上之前的原始 Observable 中的原始 OnSubscribe,也就有了兩個 OnSubscribe; 
3.當用戶呼叫經過 lift() 後的 Observable 的 subscribe() 的時候,使用的是 lift() 所返回的新的 Observable ,於是它所觸發的 onSubscribe.call(subscriber),也是用的新 Observable 中的新 OnSubscribe,即在 lift() 中生成的那個 OnSubscribe; 
4.而這個新 OnSubscribe 的 call() 方法中的 onSubscribe ,就是指的原始 Observable 中的原始 OnSubscribe ,在這個 call() 方法裡,新 OnSubscribe 利用 operator.call(subscriber) 生成了一個新的 Subscriber(Operator 就是在這裡,通過自己的 call() 方法將新 Subscriber 和原始 Subscriber 進行關聯,並插入自己的『變換』程式碼以實現變換),然後利用這個新 Subscriber 向原始 Observable 進行訂閱。 
這樣就實現了 lift() 過程,有點像一種代理機制,通過事件攔截和處理實現事件序列的變換。
精簡掉細節的話,也可以這麼說:在 Observable 執行了 lift(Operator) 方法之後,會返回一個新的 Observable,這個新的 Observable 會像一個代理一樣,負責接收原始的 Observable 發出的事件,並在處理後傳送給 Subscriber。
舉一個具體的 Operator 的實現。下面這是一個將事件中的 Integer 物件轉換成 String 的例子,僅供參考:
observable.lift(new Observable.Operator<String, Integer>() {
    @Override
    public Subscriber<? super Integer> call(final Subscriber<? super String> subscriber) {
        // 將事件序列中的 Integer 物件轉換為 String 物件
        return new Subscriber<Integer>() {
            @Override
            public void onNext(Integer integer) {
                subscriber.onNext("" + integer);
            }


            @Override
            public void onCompleted() {
                subscriber.onCompleted();
            }


            @Override
            public void onError(Throwable e) {
                subscriber.onError(e);
            }
        };
    }
});

3) compose: 對 Observable 整體的變換
除了 lift() 之外, Observable 還有一個變換方法叫做 compose(Transformer)。它和 lift() 的區別在於, lift() 是針對事件項和事件序列的,而 compose() 是針對 Observable 自身進行變換。舉個例子,假設在程式中有多個 Observable ,並且他們都需要應用一組相同的 lift() 變換。你可以這麼寫:
observable1
    .lift1()
    .lift2()
    .lift3()
    .lift4()
    .subscribe(subscriber1);
observable2
    .lift1()
    .lift2()
    .lift3()
    .lift4()
    .subscribe(subscriber2);
observable3
    .lift1()
    .lift2()
    .lift3()
    .lift4()
    .subscribe(subscriber3);
observable4
    .lift1()
    .lift2()
    .lift3()
    .lift4()
    .subscribe(subscriber1);
你覺得這樣太不軟體工程了,於是你改成了這樣:
private Observable liftAll(Observable observable) {
    return observable
        .lift1()
        .lift2()
        .lift3()
        .lift4();
}
...
liftAll(observable1).subscribe(subscriber1);
liftAll(observable2).subscribe(subscriber2);
liftAll(observable3).subscribe(subscriber3);
liftAll(observable4).subscribe(subscriber4);
可讀性、可維護性都提高了。可是 Observable 被一個方法包起來,這種方式對於 Observale 的靈活性似乎還是增添了那麼點限制。怎麼辦?這個時候,就應該用 compose() 來解決了:
public class LiftAllTransformer implements Observable.Transformer<Integer, String> {
    @Override
    public Observable<String> call(Observable<Integer> observable) {
        return observable
            .lift1()
            .lift2()
            .lift3()
            .lift4();
    }
}
...
Transformer liftAll = new LiftAllTransformer();
observable1.compose(liftAll).subscribe(subscriber1);
observable2.compose(liftAll).subscribe(subscriber2);
observable3.compose(liftAll).subscribe(subscriber3);
observable4.compose(liftAll).subscribe(subscriber4);
像上面這樣,使用 compose() 方法,Observable 可以利用傳入的 Transformer 物件的 call 方法直接對自身進行處理,也就不必被包在方法的裡面了。
compose() 的原理比較簡單,不附圖嘍。

執行緒控制:Scheduler (二)
除了靈活的變換,RxJava 另一個牛逼的地方,就是執行緒的自由控制。
1) Scheduler 的 API (二)
前面講到了,可以利用 subscribeOn() 結合 observeOn() 來實現執行緒控制,讓事件的產生和消費發生在不同的執行緒。可是在瞭解了 map() flatMap() 等變換方法後,有些好事的(其實就是當初剛接觸 RxJava 時的我)就問了:能不能多切換幾次執行緒?
答案是:能。因為 observeOn() 指定的是 Subscriber 的執行緒,而這個 Subscriber 並不是(嚴格說應該為『不一定是』,但這裡不妨理解為『不是』)subscribe() 引數中的 Subscriber ,而是 observeOn() 執行時的當前 Observable 所對應的 Subscriber ,即它的直接下級 Subscriber 。換句話說,observeOn() 指定的是它之後的操作所在的執行緒。因此如果有多次切換執行緒的需求,只要在每個想要切換執行緒的位置呼叫一次 observeOn() 即可。上程式碼:
Observable.just(1, 2, 3, 4) // IO 執行緒,由 subscribeOn() 指定
    .subscribeOn(Schedulers.io())
    .observeOn(Schedulers.newThread())
    .map(mapOperator) // 新執行緒,由 observeOn() 指定
    .observeOn(Schedulers.io())
    .map(mapOperator2) // IO 執行緒,由 observeOn() 指定
    .observeOn(AndroidSchedulers.mainThread) 
    .subscribe(subscriber);  // Android 主執行緒,由 observeOn() 指定
如上,通過 observeOn() 的多次呼叫,程式實現了執行緒的多次切換。
不過,不同於 observeOn() , subscribeOn() 的位置放在哪裡都可以,但它是隻能呼叫一次的。
又有好事的(其實還是當初的我)問了:如果我非要呼叫多次 subscribeOn() 呢?會有什麼效果?
這個問題先放著,我們還是從 RxJava 執行緒控制的原理說起吧。
2) Scheduler 的原理(二)
其實, subscribeOn() 和 observeOn() 的內部實現,也是用的 lift()。具體看圖(不同顏色的箭頭表示不同的執行緒):
從圖中可以看出,subscribeOn() 和 observeOn() 都做了執行緒切換的工作(圖中的 "schedule..." 部位)。不同的是, subscribeOn() 的執行緒切換髮生在 OnSubscribe 中,即在它通知上一級 OnSubscribe 時,這時事件還沒有開始傳送,因此 subscribeOn() 的執行緒控制可以從事件發出的開端就造成影響;而 observeOn() 的執行緒切換則發生在它內建的 Subscriber 中,即發生在它即將給下一級 Subscriber 傳送事件時,因此 observeOn() 控制的是它後面的執行緒。
3) 延伸:doOnSubscribe()
然而,雖然超過一個的 subscribeOn() 對事件處理的流程沒有影響,但在流程之前卻是可以利用的。
在前面講 Subscriber 的時候,提到過 Subscriber 的 onStart() 可以用作流程開始前的初始化。然而 onStart() 由於在 subscribe() 發生時就被呼叫了,因此不能指定執行緒,而是隻能執行在 subscribe() 被呼叫時的執行緒。這就導致如果 onStart() 中含有對執行緒有要求的程式碼(例如在介面上顯示一個 ProgressBar,這必須在主執行緒執行),將會有執行緒非法的風險,因為有時你無法預測 subscribe() 將會在什麼執行緒執行。
而與 Subscriber.onStart() 相對應的,有一個方法 Observable.doOnSubscribe() 。它和 Subscriber.onStart() 同樣是在 subscribe() 呼叫後而且在事件傳送前執行,但區別在於它可以指定執行緒。預設情況下, doOnSubscribe() 執行在 subscribe() 發生的執行緒;而如果在 doOnSubscribe() 之後有 subscribeOn() 的話,它將執行在離它最近的 subscribeOn() 所指定的執行緒。
示例程式碼:
Observable.create(onSubscribe)
    .subscribeOn(Schedulers.io())
    .doOnSubscribe(new Action0() {
        @Override
        public void call() {
            progressBar.setVisibility(View.VISIBLE); // 需要在主執行緒執行
        }
    })
    .subscribeOn(AndroidSchedulers.mainThread()) // 指定主執行緒
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(subscriber);
如上,在 doOnSubscribe()的後面跟一個 subscribeOn() ,就能指定準備工作的執行緒了。