[轉] 給 Android 開發者的 RxJava 詳解
newThread(){@Overridepublicvoid run(){super.run();for(File folder : folders){File[] files = folder.listFiles();for(File file : files){if(file.getName().endsWith(".png")){
final Bitmap bitmap = getBitmapFromFile(file);
getActivity().runOnUiThread(newRunnable(){ @Overridepublicvoid run(){
imageCollectorView.addImage(bitmap);}});}}}}}.start();
而如果使用 RxJava ,實現方式是這樣的:
Observable.from(folders).flatMap(newFunc1<File,Observable<File>>(){@OverridepublicObservable<File> call(File file){returnObservable.from(file.listFiles ());}}).filter(newFunc1<File,Boolean>(){@OverridepublicBoolean call(File file){return file.getName().endsWith(".png");}}).map(newFunc1<File,Bitmap>(){@OverridepublicBitmap call(File file){return getBitmapFromFile(file);}}).subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread ()).subscribe(newAction1<Bitmap>(){@Overridepublicvoid call(Bitmap bitmap){
imageCollectorView.addImage(bitmap);}});
那位說話了:『你這程式碼明明變多了啊!簡潔個毛啊!』大兄弟你消消氣,我說的是邏輯的簡潔,不是單純的程式碼量少(邏輯簡潔才是提升讀寫程式碼速度的必殺技對不?)。觀察一下你會發現, RxJava 的這個實現,是一條從上到下的鏈式呼叫,沒有任何巢狀,這在邏輯的簡潔性上是具有優勢的。當需求變得複雜時,這種優勢將更加明顯(試想如果還要求只選取前 10 張圖片,常規方式要怎麼辦?如果有更多這樣那樣的要求呢?再試想,在這一大堆需求實現完兩個月之後需要改功能,當你翻回這裡看到自己當初寫下的那一片迷之縮排,你能保證自己將迅速看懂,而不是對著程式碼重新捋一遍思路?)。
另外,如果你的 IDE 是 Android Studio ,其實每次開啟某個 Java 檔案的時候,你會看到被自動 Lambda 化的預覽,這將讓你更加清晰地看到程式邏輯:
Observable.from(folders).flatMap((Func1)(folder)->{Observable.from(file.listFiles())}).filter((Func1)(file)->{ file.getName().endsWith(".png")}).map((Func1)(file)->{ getBitmapFromFile(file)}).subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()).subscribe((Action1)(bitmap)->{ imageCollectorView.addImage(bitmap)});
如果你習慣使用 Retrolambda ,你也可以直接把程式碼寫成上面這種簡潔的形式。而如果你看到這裡還不知道什麼是 Retrolambda ,我不建議你現在就去學習它。原因有兩點:1. Lambda 是把雙刃劍,它讓你的程式碼簡潔的同時,降低了程式碼的可讀性,因此同時學習 RxJava 和 Retrolambda 可能會讓你忽略 RxJava 的一些技術細節;2. Retrolambda 是 Java 6/7 對 Lambda 表示式的非官方相容方案,它的向後相容性和穩定性是無法保障的,因此對於企業專案,使用 Retrolambda 是有風險的。所以,與很多 RxJava 的推廣者不同,我並不推薦在學習 RxJava 的同時一起學習 Retrolambda。事實上,我個人雖然很欣賞 Retrolambda,但我從來不用它。
在Flipboard 的 Android 程式碼中,有一段邏輯非常複雜,包含了多次記憶體操作、本地檔案操作和網路操作,物件分分合合,執行緒間相互配合相互等待,一會兒排成人字,一會兒排成一字。如果使用常規的方法來實現,肯定是要寫得欲仙欲死,然而在使用 RxJava 的情況下,依然只是一條鏈式呼叫就完成了。它很長,但很清晰。
所以, RxJava 好在哪?就好在簡潔,好在那把什麼複雜邏輯都能穿成一條線的簡潔。
API 介紹和原理簡析
這個我就做不到一個詞說明了……因為這一節的主要內容就是一步步地說明 RxJava 到底怎樣做到了非同步,怎樣做到了簡潔。
1. 概念:擴充套件的觀察者模式
RxJava 的非同步實現,是通過一種擴充套件的觀察者模式來實現的。
觀察者模式
先簡述一下觀察者模式,已經熟悉的可以跳過這一段。
觀察者模式面向的需求是:A 物件(觀察者)對 B 物件(被觀察者)的某種變化高度敏感,需要在 B 變化的一瞬間做出反應。舉個例子,新聞裡喜聞樂見的警察抓小偷,警察需要在小偷伸手作案的時候實施抓捕。在這個例子裡,警察是觀察者,小偷是被觀察者,警察需要時刻盯著小偷的一舉一動,才能保證不會漏過任何瞬間。程式的觀察者模式和這種真正的『觀察』略有不同,觀察者不需要時刻盯著被觀察者(例如 A 不需要每過 2ms 就檢查一次 B 的狀態),而是採用註冊(Register)或者稱為訂閱(Subscribe)的方式,告訴被觀察者:我需要你的某某狀態,你要在它變化的時候通知我。 Android 開發中一個比較典型的例子是點選監聽器 OnClickListener
。對設定 OnClickListener
來說, View
是被觀察者, OnClickListener
是觀察者,二者通過 setOnClickListener()
方法達成訂閱關係。訂閱之後使用者點選按鈕的瞬間,Android Framework 就會將點選事件傳送給已經註冊的 OnClickListener
。採取這樣被動的觀察方式,既省去了反覆檢索狀態的資源消耗,也能夠得到最高的反饋速度。當然,這也得益於我們可以隨意定製自己程式中的觀察者和被觀察者,而警察叔叔明顯無法要求小偷『你在作案的時候務必通知我』。
OnClickListener 的模式大致如下圖:
如圖所示,通過 setOnClickListener()
方法,Button
持有 OnClickListener
的引用(這一過程沒有在圖上畫出);當用戶點選時,Button
自動呼叫 OnClickListener
的 onClick()
方法。另外,如果把這張圖中的概念抽象出來(Button
-> 被觀察者、OnClickListener
-> 觀察者、setOnClickListener()
-> 訂閱,onClick()
-> 事件),就由專用的觀察者模式(例如只用於監聽控制元件點選)轉變成了通用的觀察者模式。如下圖:
而 RxJava 作為一個工具庫,使用的就是通用形式的觀察者模式。
RxJava 的觀察者模式
RxJava 有四個基本概念:Observable
(可觀察者,即被觀察者)、 Observer
(觀察者)、 subscribe
(訂閱)、事件。Observable
和 Observer
通過 subscribe()
方法實現訂閱關係,從而 Observable
可以在需要的時候發出事件來通知 Observer
。
與傳統觀察者模式不同, RxJava 的事件回撥方法除了普通事件 onNext()
(相當於 onClick()
/ onEvent()
)之外,還定義了兩個特殊的事件:onCompleted()
和 onError()
。
onCompleted()
: 事件佇列完結。RxJava 不僅把每個事件單獨處理,還會把它們看做一個佇列。RxJava 規定,當不會再有新的onNext()
發出時,需要觸發onCompleted()
方法作為標誌。onError()
: 事件佇列異常。在事件處理過程中出異常時,onError()
會被觸發,同時佇列自動終止,不允許再有事件發出。- 在一個正確執行的事件序列中,
onCompleted()
和onError()
有且只有一個,並且是事件序列中的最後一個。需要注意的是,onCompleted()
和onError()
二者也是互斥的,即在佇列中呼叫了其中一個,就不應該再呼叫另一個。
RxJava 的觀察者模式大致如下圖:
2. 基本實現
基於以上的概念, RxJava 的基本實現主要有三點:
1) 建立 Observer
Observer 即觀察者,它決定事件觸發的時候將有怎樣的行為。 RxJava 中的 Observer
介面的實現方式:
Observer<String> observer =newObserver<String>(){@Overridepublicvoid onNext(String s){Log.d(tag,"Item: "+ s);}@Overridepublicvoid onCompleted(){Log.d(tag,"Completed!");}@Overridepublicvoid onError(Throwable e){Log.d(tag,"Error!");}};
除了 Observer
介面之外,RxJava 還內建了一個實現了 Observer
的抽象類:Subscriber
。 Subscriber
對 Observer
介面進行了一些擴充套件,但他們的基本使用方式是完全一樣的:
Subscriber<String> subscriber =newSubscriber<String>(){@Overridepublicvoid onNext(String s){Log.d(tag,"Item: "+ s);}@Overridepublicvoid onCompleted(){Log.d(tag,"Completed!");}@Overridepublicvoid onError(Throwable e){Log.d(tag,"Error!");}};
不僅基本使用方式一樣,實質上,在 RxJava 的 subscribe 過程中,Observer
也總是會先被轉換成一個 Subscriber
再使用。所以如果你只想使用基本功能,選擇 Observer
和 Subscriber
是完全一樣的。它們的區別對於使用者來說主要有兩點:
onStart()
: 這是Subscriber
增加的方法。它會在 subscribe 剛開始,而事件還未傳送之前被呼叫,可以用於做一些準備工作,例如資料的清零或重置。這是一個可選方法,預設情況下它的實現為空。需要注意的是,如果對準備工作的執行緒有要求(例如彈出一個顯示進度的對話方塊,這必須在主執行緒執行),onStart()
就不適用了,因為它總是在 subscribe 所發生的執行緒被呼叫,而不能指定執行緒。要在指定的執行緒來做準備工作,可以使用doOnSubscribe()
方法,具體可以在後面的文中看到。unsubscribe()
: 這是Subscriber
所實現的另一個介面Subscription
的方法,用於取消訂閱。在這個方法被呼叫後,Subscriber
將不再接收事件。一般在這個方法呼叫前,可以使用isUnsubscribed()
先判斷一下狀態。unsubscribe()
這個方法很重要,因為在subscribe()
之後,Observable
會持有Subscriber
的引用,這個引用如果不能及時被釋放,將有記憶體洩露的風險。所以最好保持一個原則:要在不再使用的時候儘快在合適的地方(例如onPause()
onStop()
等方法中)呼叫unsubscribe()
來解除引用關係,以避免記憶體洩露的發生。
2) 建立 Observable
Observable 即被觀察者,它決定什麼時候觸發事件以及觸發怎樣的事件。 RxJava 使用 create()
方法來建立一個 Observable ,併為它定義事件觸發規則:
Observable observable =Observable.create(newObservable.OnSubscribe<String>(){@Overridepublicvoid call(Subscriber<?superString> subscriber){
subscriber.onNext("Hello");
subscriber.onNext("Hi");
subscriber.onNext("Aloha");
subscriber.onCompleted();}});
可以看到,這裡傳入了一個 OnSubscribe
物件作為引數。OnSubscribe
會被儲存在返回的 Observable
物件中,它的作用相當於一個計劃表,當 Observable
被訂閱的時候,OnSubscribe
的 call()
方法會自動被呼叫,事件序列就會依照設定依次觸發(對於上面的程式碼,就是觀察者Subscriber
將會被呼叫三次 onNext()
和一次 onCompleted()
)。這樣,由被觀察者呼叫了觀察者的回撥方法,就實現了由被觀察者向觀察者的事件傳遞,即觀察者模式。
這個例子很簡單:事件的內容是字串,而不是一些複雜的物件;事件的內容是已經定好了的,而不像有的觀察者模式一樣是待確定的(例如網路請求的結果在請求返回之前是未知的);所有事件在一瞬間被全部發送出去,而不是夾雜一些確定或不確定的