Android 開發者的 RxJava入門教程
前言
我從去年開始使用 RxJava ,到現在一年多了。今年加入了 Flipboard 後,看到 Flipboard 的 Android 專案也在使用 RxJava ,並且使用的場景越來越多 。而最近這幾個月,我也發現國內越來越多的人開始提及 RxJava 。有人說『RxJava 真是太好用了』,有人說『RxJava 真是太難用了』,另外更多的人表示:我真的百度了也谷歌了,但我還是想問: RxJava 到底是什麼?
鑑於 RxJava 目前這種既火爆又神祕的現狀,而我又在一年的使用過程中對 RxJava 有了一些理解,我決定寫下這篇文章來對 RxJava 做一個相對詳細的、針對 Android 開發者的介紹。
這篇文章的目的有兩個:1. 給對 RxJava 感興趣的人一些入門的指引2. 給正在使用 RxJava 但仍然心存疑惑的人一些更深入的解析
在正文開始之前的最後,放上 GitHub
連結和引入依賴的 gradle
程式碼: Github:
https://github.com/ReactiveX/RxJava
https://github.com/ReactiveX/RxAndroid
引入依賴:
compile 'io.reactivex:rxjava:1.0.14'
compile 'io.reactivex:rxandroid:1.0.1'
(版本號是文章釋出時的最新穩定版)
另外,感謝 RxJava 核心成員流火楓林的技術支援和內測讀者程式碼家、鮑永章、drakeet、馬琳、有時放縱、程式亦非猿、大頭鬼、XZoomEye、席德雨、TCahead、Tiiime、Ailurus、宅學長、妖孽、大大大大大臣哥、NicodeLee的幫助,以及周伯通招聘的贊助。
RxJava 到底是什麼
一個詞:非同步。
RxJava 在 GitHub 主頁上的自我介紹是 "a library for composing asynchronous and event-based programs using observable sequences for the Java VM"(一個在 Java VM 上使用可觀測的序列來組成非同步的、基於事件的程式的庫)。這就是 RxJava ,概括得非常精準。
然而,對於初學者來說,這太難看懂了。因為它是一個『總結』,而初學者更需要一個『引言』。
其實, RxJava 的本質可以壓縮為非同步這一個詞。說到根上,它就是一個實現非同步操作的庫,而別的定語都是基於這之上的。
RxJava 好在哪
換句話說,『同樣是做非同步,為什麼人們用它,而不用現成的 AsyncTask / Handler / XXX / ... ?』
一個詞:簡潔。
非同步操作很關鍵的一點是程式的簡潔性,因為在排程過程比較複雜的情況下,非同步程式碼經常會既難寫也難被讀懂。 Android 創造的 AsyncTask
和Handler
,其實都是為了讓非同步程式碼更加簡潔。RxJava 的優勢也是簡潔,但它的簡潔的與眾不同之處在於,隨著程式邏輯變得越來越複雜,它依然能夠保持簡潔。
假設有這樣一個需求:介面上有一個自定義的檢視 imageCollectorView
,它的作用是顯示多張圖片,並能使用
addImage(Bitmap)
方法來任意增加顯示的圖片。現在需要程式將一個給出的目錄陣列 File[] folders
中每個目錄下的 png 圖片都加載出來並顯示在
imageCollectorView
中。需要注意的是,由於讀取圖片的這一過程較為耗時,需要放在後臺執行,而圖片的顯示則必須在 UI 執行緒執行。常用的實現方式有多種,我這裡貼出其中一種:
newThread(){@Overridepublicvoid run(){super.run();for(File folder : folders){File[] files = folder.listFiles();for(File file : files){if(file.getName().endsWith(".png")){
final Bitmap bitmap = getBitmapFromFile(file);
getActivity().runOnUiThread(newRunnable(){@Overridepublicvoid run(){
imageCollectorView.addImage(bitmap);}});}}}}}.start();
而如果使用 RxJava ,實現方式是這樣的:
Observable.from(folders).flatMap(newFunc1<File,Observable<File>>(){@OverridepublicObservable<File> call(File file){returnObservable.from(file.listFiles());}}).filter(newFunc1<File,Boolean>(){@OverridepublicBoolean call(File file){return file.getName().endsWith(".png");}}).map(newFunc1<File,Bitmap>(){@OverridepublicBitmap call(File file){return getBitmapFromFile(file);}}).subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()).subscribe(newAction1<Bitmap>(){@Overridepublicvoid call(Bitmap bitmap){
imageCollectorView.addImage(bitmap);}});
那位說話了:『你這程式碼明明變多了啊!簡潔個毛啊!』大兄弟你消消氣,我說的是邏輯的簡潔,不是單純的程式碼量少(邏輯簡潔才是提升讀寫程式碼速度的必殺技對不?)。觀察一下你會發現, RxJava 的這個實現,是一條從上到下的鏈式呼叫,沒有任何巢狀,這在邏輯的簡潔性上是具有優勢的。當需求變得複雜時,這種優勢將更加明顯(試想如果還要求只選取前 10 張圖片,常規方式要怎麼辦?如果有更多這樣那樣的要求呢?再試想,在這一大堆需求實現完兩個月之後需要改功能,當你翻回這裡看到自己當初寫下的那一片迷之縮排,你能保證自己將迅速看懂,而不是對著程式碼重新捋一遍思路?)。
另外,如果你的 IDE 是 Android Studio ,其實每次開啟某個 Java 檔案的時候,你會看到被自動 Lambda 化的預覽,這將讓你更加清晰地看到程式邏輯:
Observable.from(folders).flatMap((Func1)(folder)->{Observable.from(file.listFiles())}).filter((Func1)(file)->{ file.getName().endsWith(".png")}).map((Func1)(file)->{ getBitmapFromFile(file)}).subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()).subscribe((Action1)(bitmap)->{ imageCollectorView.addImage(bitmap)});
如果你習慣使用 Retrolambda ,你也可以直接把程式碼寫成上面這種簡潔的形式。而如果你看到這裡還不知道什麼是 Retrolambda ,我不建議你現在就去學習它。原因有兩點:1. Lambda 是把雙刃劍,它讓你的程式碼簡潔的同時,降低了程式碼的可讀性,因此同時學習 RxJava 和 Retrolambda 可能會讓你忽略 RxJava 的一些技術細節;2. Retrolambda 是 Java 6/7 對 Lambda 表示式的非官方相容方案,它的向後相容性和穩定性是無法保障的,因此對於企業專案,使用 Retrolambda 是有風險的。所以,與很多 RxJava 的推廣者不同,我並不推薦在學習 RxJava 的同時一起學習 Retrolambda。事實上,我個人雖然很欣賞 Retrolambda,但我從來不用它。
在Flipboard 的 Android 程式碼中,有一段邏輯非常複雜,包含了多次記憶體操作、本地檔案操作和網路操作,物件分分合合,執行緒間相互配合相互等待,一會兒排成人字,一會兒排成一字。如果使用常規的方法來實現,肯定是要寫得欲仙欲死,然而在使用 RxJava 的情況下,依然只是一條鏈式呼叫就完成了。它很長,但很清晰。
所以, RxJava 好在哪?就好在簡潔,好在那把什麼複雜邏輯都能穿成一條線的簡潔。
API 介紹和原理簡析
這個我就做不到一個詞說明了……因為這一節的主要內容就是一步步地說明 RxJava 到底怎樣做到了非同步,怎樣做到了簡潔。
1. 概念:擴充套件的觀察者模式
RxJava 的非同步實現,是通過一種擴充套件的觀察者模式來實現的。
觀察者模式
先簡述一下觀察者模式,已經熟悉的可以跳過這一段。
觀察者模式面向的需求是:A 物件(觀察者)對 B 物件(被觀察者)的某種變化高度敏感,需要在 B 變化的一瞬間做出反應。舉個例子,新聞裡喜聞樂見的警察抓小偷,警察需要在小偷伸手作案的時候實施抓捕。在這個例子裡,警察是觀察者,小偷是被觀察者,警察需要時刻盯著小偷的一舉一動,才能保證不會漏過任何瞬間。程式的觀察者模式和這種真正的『觀察』略有不同,觀察者不需要時刻盯著被觀察者(例如 A 不需要每過 2ms 就檢查一次 B 的狀態),而是採用註冊(Register)或者稱為訂閱(Subscribe)的方式,告訴被觀察者:我需要你的某某狀態,你要在它變化的時候通知我。
Android 開發中一個比較典型的例子是點選監聽器 OnClickListener
。對設定 OnClickListener
來說,
View
是被觀察者, OnClickListener
是觀察者,二者通過 setOnClickListener()
方法達成訂閱關係。訂閱之後使用者點選按鈕的瞬間,Android Framework 就會將點選事件傳送給已經註冊的
OnClickListener
。採取這樣被動的觀察方式,既省去了反覆檢索狀態的資源消耗,也能夠得到最高的反饋速度。當然,這也得益於我們可以隨意定製自己程式中的觀察者和被觀察者,而警察叔叔明顯無法要求小偷『你在作案的時候務必通知我』。
OnClickListener 的模式大致如下圖:
如圖所示,通過 setOnClickListener()
方法,Button
持有 OnClickListener
的引用(這一過程沒有在圖上畫出);當用戶點選時,Button
自動呼叫
OnClickListener
的 onClick()
方法。另外,如果把這張圖中的概念抽象出來(Button
-> 被觀察者、OnClickListener
-> 觀察者、setOnClickListener()
-> 訂閱,onClick()
-> 事件),就由專用的觀察者模式(例如只用於監聽控制元件點選)轉變成了通用的觀察者模式。如下圖:
而 RxJava 作為一個工具庫,使用的就是通用形式的觀察者模式。
RxJava 的觀察者模式
RxJava 有四個基本概念:Observable
(可觀察者,即被觀察者)、 Observer
(觀察者)、
subscribe
(訂閱)、事件。Observable
和 Observer
通過
subscribe()
方法實現訂閱關係,從而 Observable
可以在需要的時候發出事件來通知
Observer
。
與傳統觀察者模式不同, RxJava 的事件回撥方法除了普通事件 onNext()
(相當於 onClick()
/
onEvent()
)之外,還定義了兩個特殊的事件:onCompleted()
和 onError()
。
onCompleted()
: 事件佇列完結。RxJava 不僅把每個事件單獨處理,還會把它們看做一個佇列。RxJava 規定,當不會再有新的onNext()
發出時,需要觸發onCompleted()
方法作為標誌。onError()
: 事件佇列異常。在事件處理過程中出異常時,onError()
會被觸發,同時佇列自動終止,不允許再有事件發出。- 在一個正確執行的事件序列中,
onCompleted()
和onError()
有且只有一個,並且是事件序列中的最後一個。需要注意的是,onCompleted()
和onError()
二者也是互斥的,即在佇列中呼叫了其中一個,就不應該再呼叫另一個。
RxJava 的觀察者模式大致如下圖:
2. 基本實現
基於以上的概念, RxJava 的基本實現主要有三點:
1) 建立 Observer
Observer 即觀察者,它決定事件觸發的時候將有怎樣的行為。 RxJava 中的 Observer
介面的實現方式:
Observer<String> observer =newObserver<String>(){@Overridepublicvoid onNext(String s){Log.d(tag,"Item: "+ s);}@Overridepublicvoid onCompleted(){Log.d(tag,"Completed!");}@Overridepublicvoid onError(Throwable e){Log.d(tag,"Error!");}};
除了 Observer
介面之外,RxJava 還內建了一個實現了 Observer
的抽象類:Subscriber
。
Subscriber
對 Observer
介面進行了一些擴充套件,但他們的基本使用方式是完全一樣的:
Subscriber<String> subscriber =newSubscriber<String>(){@Overridepublicvoid onNext(String s){Log.d(tag,"Item: "+ s);}@Overridepublicvoid onCompleted(){Log.d(tag,"Completed!");}@Overridepublicvoid onError(Throwable e){Log.d(tag,"Error!");}};
不僅基本使用方式一樣,實質上,在 RxJava 的 subscribe 過程中,Observer
也總是會先被轉換成一個
Subscriber
再使用。所以如果你只想使用基本功能,選擇 Observer
和 Subscriber
是完全一樣的。它們的區別對於使用者來說主要有兩點:
onStart()
: 這是Subscriber
增加的方法。它會在 subscribe 剛開始,而事件還未傳送之前被呼叫,可以用於做一些準備工作,例如資料的清零或重置。這是一個可選方法,預設情況下它的實現為空。需要注意的是,如果對準備工作的執行緒有要求(例如彈出一個顯示進度的對話方塊,這必須在主執行緒執行),onStart()
就不適用了,因為它總是在 subscribe 所發生的執行緒被呼叫,而不能指定執行緒。要在指定的執行緒來做準備工作,可以使用doOnSubscribe()
方法,具體可以在後面的文中看到。unsubscribe()
: 這是Subscriber
所實現的另一個介面Subscription
的方法,用於取消訂閱。在這個方法被呼叫後,Subscriber
將不再接收事件。一般在這個方法呼叫前,可以使用isUnsubscribed()
先判斷一下狀態。unsubscribe()
這個方法很重要,因為在subscribe()
之後,Observable
會持有Subscriber
的引用,這個引用如果不能及時被釋放,將有記憶體洩露的風險。所以最好保持一個原則:要在不再使用的時候儘快在合適的地方(例如onPause()
onStop()
等方法中)呼叫unsubscribe()
來解除引用關係,以避免記憶體洩露的發生。
2) 建立 Observable
Observable 即被觀察者,它決定什麼時候觸發事件以及觸發怎樣的事件。 RxJava 使用 create()
方法來建立一個 Observable ,併為它定義事件觸發規則:
Observable observable =Observable.create(newObservable.OnSubscribe<String>(){@Overridepublicvoid call(Subscriber<?superString> subscriber){
subscriber.onNext("Hello");
subscriber.onNext("Hi");
subscriber.onNext("Aloha");
subscriber.onCompleted();}});
可以看到,這裡傳入了一個 OnSubscribe
物件作為引數。OnSubscribe
會被儲存在返回的
Observable
物件中,它的作用相當於一個計劃表,當 Observable
被訂閱的時候,OnSubscribe
的
call()
方法會自動被呼叫,事件序列就會依照設定依次觸發(對於上面的程式碼,就是觀察者Subscriber
將會被呼叫三次
onNext()
和一次 onCompleted()
)。這樣,由被觀察者呼叫了觀察者的回撥方法,就實現了由被觀察者向觀察者的事件傳遞,即觀察者模式。
這個例子很簡單:事件的內容是字串,而不是一些複雜的物件;事件的內容是已經定好了的,而不像有的觀察者模式一樣是待確定的(例如網路請求的結果在請求返回之前是未知的);所有事件在一瞬間被全部發送出去,而不是夾雜一些確定或不確定的時間間隔或者經過某種觸發器來觸發的。總之,這個例子看起來毫無實用價值。但這是為了便於說明,實質上只要你想,各種各樣的事件傳送規則你都可以自己來寫。至於具體怎麼做,後面都會講到,但現在不行。只有把基礎原理先說明白了,上層的運用才能更容易說清楚。
create()
方法是 RxJava 最基本的創造事件序列的方法。基於這個方法, RxJava 還提供了一些方法用來快捷建立事件佇列,例如:
just(T...)
: 將傳入的引數依次傳送出來。
Observable observable =Observable.just("Hello","Hi","Aloha");// 將會依次呼叫:// onNext("Hello");// onNext("Hi");// onNext("Aloha");// onCompleted();
from(T[])
/from(Iterable<? extends T>)
: 將傳入的陣列或Iterable
拆分成具體物件後,依次傳送出來。
String[] words ={"Hello","Hi","Aloha"};Observable observable =Observable.from(words);// 將會依次呼叫:// onNext("Hello");// onNext("Hi");// onNext("Aloha");// onCompleted();
上面 just(T...)
的例子和 from(T[])
的例子,都和之前的 create(OnSubscribe)
的例子是等價的。
3) Subscribe (訂閱)
建立了 Observable
和 Observer
之後,再用 subscribe()
方法將它們聯結起來,整條鏈子就可以工作了。程式碼形式很簡單:
observable.subscribe(observer);// 或者:
observable.subscribe(subscriber);
有人可能會注意到,
subscribe()
這個方法有點怪:它看起來是『observalbe
訂閱了observer
/subscriber
』而不是『observer
/subscriber
訂閱了observalbe
』,這看起來就像『雜誌訂閱了讀者』一樣顛倒了物件關係。這讓人讀起來有點彆扭,不過如果把 API 設計成observer.subscribe(observable)
/subscriber.subscribe(observable)
,雖然更加符合思維邏輯,但對流式 API 的設計就造成影響了,比較起來明顯是得不償失的。
Observable.subscribe(Subscriber)
的內部實現是這樣的(僅核心程式碼):
// 注意:這不是 subscribe() 的原始碼,而是將原始碼中與效能、相容性、擴充套件性有關的程式碼剔除後的核心程式碼。// 如果需要看原始碼,可以去 RxJava 的 GitHub 倉庫下載。publicSubscription subscribe(Subscriber subscriber){
subscriber.onStart();
onSubscribe.call(subscriber);return subscriber;}
可以看到,subscriber()
做了3件事:
- 呼叫
Subscriber.onStart()
。這個方法在前面已經介紹過,是一個可選的準備方法。 - 呼叫
Observable
中的OnSubscribe.call(Subscriber)
。在這裡,事件傳送的邏輯開始執行。從這也可以看出,在 RxJava 中,Observable
並不是在建立的時候就立即開始傳送事件,而是在它被訂閱的時候,即當subscribe()
方法執行的時候。 - 將傳入的
Subscriber
作為Subscription
返回。這是為了方便unsubscribe()
.
整個過程中物件間的關係如下圖:
或者可以看動圖:
除了 subscribe(Observer)
和 subscribe(Subscriber)
,subscribe()
還支援不完整定義的回撥,RxJava 會自動根據定義創建出
Subscriber
。形式如下:
Action1<String> onNextAction =newAction1<String>(){// onNext()@Overridepublicvoid call(String s){Log.d(tag, s);}};Action1<Throwable> onErrorAction =newAction1<Throwable>(){// onError()@Overridepublicvoid call(Throwable throwable){// Error handling}};Action0 onCompletedAction =newAction0(){// onCompleted()@Overridepublicvoid call(){Log.d(tag,"completed");}};// 自動建立 Subscriber ,並使用 onNextAction 來定義 onNext()
observable.subscribe(onNextAction);// 自動建立 Subscriber ,並使用 onNextAction 和 onErrorAction 來定義 onNext() 和 onError()
observable.subscribe(onNextAction, onErrorAction);// 自動建立 Subscriber ,並使用 onNextAction、 onErrorAction 和 onCompletedAction 來定義 onNext()、 onError() 和 onCompleted()
observable.subscribe(onNextAction, onErrorAction, onCompletedAction);
簡單解釋一下這段程式碼中出現的 Action1
和 Action0
。 Action0
是 RxJava 的一個介面,它只有一個方法
call()
,這個方法是無參無返回值的;由於 onCompleted()
方法也是無參無返回值的,因此
Action0
可以被當成一個包裝物件,將 onCompleted()
的內容打包起來將自己作為一個引數傳入
subscribe()
以實現不完整定義的回撥。這樣其實也可以看做將 onCompleted()
方法作為引數傳進了
subscribe()
,相當於其他某些語言中的『閉包』。 Action1
也是一個介面,它同樣只有一個方法 call(T param)
,這個方法也無返回值,但有一個引數;與
Action0
同理,由於 onNext(T obj)
和 onError(Throwable error)
也是單引數無返回值的,因此
Action1
可以將 onNext(obj)
和 onError(error)
打包起來傳入
subscribe()
以實現不完整定義的回撥。事實上,雖然 Action0
和 Action1
在 API 中使用最廣泛,但 RxJava 是提供了多個
ActionX
形式的介面 (例如 Action2
, Action3
) 的,它們可以被用以包裝不同的無返回值的方法。
注:正如前面所提到的,
Observer
和Subscriber
具有相同的角色,而且Observer
在subscribe()
過程中最終會被轉換成Subscriber
物件,因此,從這裡開始,後面的描述我將用Subscriber
來代替Observer
,這樣更加嚴謹。
4) 場景示例
下面舉兩個例子:
為了把原理用更清晰的方式表述出來,本文中挑選的都是功能儘可能簡單的例子,以至於有些示例程式碼看起來會有『畫蛇添足』『明明不用 RxJava 可以更簡便地解決問題』的感覺。當你看到這種情況,不要覺得是因為 RxJava 太囉嗦,而是因為在過早的時候舉出真實場景的例子並不利於原理的解析,因此我刻意挑選了簡單的情景。
a. 列印字串陣列
將字串陣列 names
中的所有字串依次打印出來:
String[] names =...;Observable.from(names).subscribe(newAction1<String>(){@Overridepublicvoid call(String name){Log.d(tag, name);}});
b. 由 id 取得圖片並顯示
由指定的一個 drawable 檔案 id drawableRes
取得圖片,並顯示在 ImageView
中,並在出現異常的時候列印 Toast 報錯:
int drawableRes =...;ImageView imageView =...;Observable.create(newOnSubscribe<Drawable>(){@Overridepublicvoid call(Subscriber<?superDrawable> subscriber){Drawable drawable = getTheme().getDrawable(drawableRes));
subscriber.onNext(drawable);
subscriber.onCompleted();}}).subscribe(newObserver<Drawable>(){@Overridepublicvoid onNext(Drawable drawable){
imageView.setImageDrawable(drawable);}@Overridepublicvoid onCompleted(){}@Overridepublicvoid onError(Throwable e){Toast.makeText(activity,"Error!",Toast.LENGTH_SHORT).show();}});
正如上面兩個例子這樣,創建出 Observable
和 Subscriber
,再用 subscribe()
將它們串起來,一次 RxJava 的基本使用就完成了。非常簡單。
然而,
在 RxJava 的預設規則中,事件的發出和消費都是在同一個執行緒的。也就是說,如果只用上面的方法,實現出來的只是一個同步的觀察者模式。觀察者模式本身的目的就是『後臺處理,前臺回撥』的非同步機制,因此非同步對於 RxJava 是至關重要的。而要實現非同步,則需要用到 RxJava 的另一個概念:
Scheduler
。
3. 執行緒控制 —— Scheduler (一)
在不指定執行緒的情況下, RxJava 遵循的是執行緒不變的原則,即:在哪個執行緒呼叫 subscribe()
,就在哪個執行緒生產事件;在哪個執行緒生產事件,就在哪個執行緒消費事件。如果需要切換執行緒,就需要用到
Scheduler
(排程器)。
1) Scheduler 的 API (一)
在RxJava 中,Scheduler
——排程器,相當於執行緒控制器,RxJava 通過它來指定每一段程式碼應該執行在什麼樣的執行緒。RxJava 已經內建了幾個
Scheduler
,它們已經適合大多數的使用場景:
Schedulers.immediate()
: 直接在當前執行緒執行,相當於不指定執行緒。這是預設的Scheduler
。Schedulers.newThread()
: 總是啟用新執行緒,並在新執行緒執行操作。Schedulers.io()
: I/O 操作(讀寫檔案、讀寫資料庫、網路資訊互動等)所使用的Scheduler
。行為模式和newThread()
差不多,區別在於io()
的內部實現是是用一個無數量上限的執行緒池,可以重用空閒的執行緒,因此多數情況下io()
比newThread()
更有效率。不要把計算工作放在io()
中,可以避免建立不必要的執行緒。Schedulers.computation()
: 計算所使用的Scheduler
。這個計算指的是 CPU 密集型計算,即不會被 I/O 等操作限制性能的操作,例如圖形的計算。這個Scheduler
使用的固定的執行緒池,大小為 CPU 核數。不要把 I/O 操作放在computation()
中,否則 I/O 操作的等待時間會浪費 CPU。- 另外, Android 還有一個專用的
AndroidSchedulers.mainThread()
,它指定的操作將在 Android 主執行緒執行。
有了這幾個 Scheduler
,就可以使用 subscribeOn()
和 observeOn()
兩個方法來對執行緒進行控制了。*
subscribeOn()
: 指定 subscribe()
所發生的執行緒,即 Observable.OnSubscribe
被啟用時所處的執行緒。或者叫做事件產生的執行緒。*
observeOn()
: 指定 Subscriber
所執行在的執行緒。或者叫做事件消費的執行緒。
文字敘述總歸難理解,上程式碼:
Observable.just(1,2,3,4).subscribeOn(Schedulers.io())// 指定 subscribe() 發生在 IO 執行緒.observeOn(AndroidSchedulers.mainThread())// 指定 Subscriber 的回調發生在主執行緒.subscribe(newAction1<Integer>(){@Overridepublicvoid call(Integer number){Log.d(tag,"number:"+ number);}});
上面這段程式碼中,由於 subscribeOn(Schedulers.io())
的指定,被建立的事件的內容 1
、2
、3
、4
將會在 IO 執行緒發出;而由於
observeOn(AndroidScheculers.mainThread()
) 的指定,因此 subscriber
數字的列印將發生在主執行緒 。事實上,這種在
subscribe()
之前寫上兩句 subscribeOn(Scheduler.io())
和
observeOn(AndroidSchedulers.mainThread())
的使用方式非常常見,它適用於多數的 『後臺執行緒取資料,主執行緒顯示』的程式策略。
而前面提到的由圖片 id 取得圖片並顯示的例子,如果也加上這兩句:
int drawableRes =...;ImageView imageView =...;Observable.create(newOnSubscribe<Drawable>(){@Overridepublicvoid call(Subscriber<?superDrawable> subscriber){Drawable drawable = getTheme().getDrawable(drawableRes));
subscriber.onNext(drawable);
subscriber.onCompleted();}}).subscribeOn(Schedulers.io())// 指定 subscribe() 發生在 IO 執行緒.observeOn(AndroidSchedulers.mainThread())// 指定 Subscriber 的回調發生在主執行緒.subscribe(newObserver<Drawable>(){@Overridepublicvoid onNext(Drawable drawable){
imageView.setImageDrawable(drawable);}@Overridepublicvoid onCompleted(){}@Overridepublicvoid onError(Throwable e){Toast.makeText(activity,"Error!",Toast.LENGTH_SHORT).show();}});
那麼,載入圖片將會發生在 IO 執行緒,而設定圖片則被設定在了主執行緒。這就意味著,即使載入圖片耗費了幾十甚至幾百毫秒的時間,也不會造成絲毫介面的卡頓。
2) Scheduler 的原理 (一)
RxJava 的 Scheduler API 很方便,也很神奇(加了一句話就把執行緒切換了,怎麼做到的?而且 subscribe()
不是最外層直接呼叫的方法嗎,它竟然也能被指定執行緒?)。然而 Scheduler 的原理需要放在後面講,因為它的原理是以下一節《變換》的原理作為基礎的。
好吧這一節其實我屁也沒說,只是為了讓你安心,讓你知道我不是忘了講原理,而是把它放在了更合適的地方。
4. 變換
終於要到牛逼的地方了,不管你激動不激動,反正我是激動了。
RxJava 提供了對事件序列進行變換的支援,這是它的核心功能之一,也是大多數人說『RxJava 真是太好用了』的最大原因。所謂變換,就是將事件序列中的物件或整個序列進行加工處理,轉換成不同的事件或事件序列。概念說著總是模糊難懂的,來看 API。
1) API
首先看一個 map()
的例子:
Observable.just("images/logo.png")// 輸入型別 String.map(newFunc1<String,Bitmap>(){@OverridepublicBitmap call(String filePath){// 引數型別 Stringreturn getBitmapFromPath(filePath);// 返回型別 Bitmap}}).subscribe(newAction1<Bitmap>(){@Overridepublicvoid call(Bitmap bitmap){// 引數型別 Bitmap
showBitmap(bitmap);}});
這裡出現了一個叫做 Func1
的類。它和 Action1
非常相似,也是 RxJava 的一個介面,用於包裝含有一個引數的方法。
Func1
和 Action
的區別在於, Func1
包裝的是有返回值的方法。另外,和
ActionX
一樣, FuncX
也有多個,用於不同引數個數的方法。FuncX
和
ActionX
的區別在 FuncX
包裝的是有返回值的方法。
可以看到,map()
方法將引數中的 String
物件轉換成一個 Bitmap
物件後返回,而在經過
map()
方法後,事件的引數型別也由 String
轉為了 Bitmap
。這種直接變換物件並返回的,是最常見的也最容易理解的變換。不過 RxJava 的變換遠不止這樣,它不僅可以針對事件物件,還可以針對整個事件佇列,這使得 RxJava 變得非常靈活。我列舉幾個常用的變換:
-
map()
: 事件物件的直接變換,具體功能上面已經介紹過。它是 RxJava 最常用的變換。map()
的示意圖: -
flatMap()
: 這是一個很有用但非常難理解的變換,因此我決定花多些篇幅來介紹它。首先假設這麼一種需求:假設有一個數據結構『學生』,現在需要打印出一組學生的名字。實現方式很簡單:
Student[] students =...;Subscriber<String> subscriber =newSubscriber<String>(){@Overridepublicvoid onNext(String name){Log.d(tag, name);}...};Observable.from(students).map(newFunc1<Student,String>(){@OverridepublicString call(Student student){return student.getName