RxJava的觀察者模式(二)
上一篇中我們瞭解了什麼是RxJava,用一個詞來總結就是非同步。
這裡我們來講講RxJava的非同步實現。它是通過一種擴充套件的觀察者模式來實現。
一、觀察者模式
先簡書一下觀察者模式。
觀察者模式面向的需求是:觀察者對被觀察者的某種變化作出反應。比如警察抓小偷,警察需要在小偷作案時實施抓捕。在這裡面小偷是被觀察者,警察是觀察者。而程式的觀察者模式跟真正的觀察略有不同,觀察者不需要時時刻刻頂著被觀察者,而是採用註冊(Register)或者被稱為訂閱(Subscribe)方式告訴觀察者:我需要你的某種狀態,你要在你變化的時候通知我。
Android開發中典型的觀察者模式就是監聽器事件Listener。對設定OnClickListener來說,View是被觀察者,OnClickListener是觀察者,兩者通過setOnClickListener()方法達成註冊(訂閱)關係。訂閱之後使用者點選按鈕的瞬間,Android Framework 就會將點選事件傳送給已經註冊的 OnClickListener。
OnClickListener的模式圖
如圖所示,通過 setOnClickListener()方法,Button持有 OnClickListener的引用(這一過程沒有在圖上畫出);當用戶點選時,Button自動呼叫 OnClickListener的 onClick()方法。另外,如果把這張圖中的概念抽象出來(Button -> 被觀察者、OnClickListener-> 觀察者、setOnClickListener()-> 訂閱,onClick() -> 事件),就由專用的觀察者模式(例如只用於監聽控制元件點選)轉變成了通用的觀察者模式。如下圖:
而 RxJava 作為一個工具庫,使用的就是通用形式的觀察者模式。
二、RxJava的觀察者模式
RxJava的四個基本概念
Observable(被觀察者)
Observer(觀察者)
subscribe(訂閱)
事件
Oservable和Observer通過 subscribe()方法實現訂閱關係,從而 Observable可以在需要的時候發出事件來通知 Observer。
與傳統觀察者模式不同, RxJava 的事件回撥方法除了普通事件 onNext()(相當於 onClick() / onEvent())之外,還定義了兩個特殊的事件:onCompleted()和 onError()。
onCompleted(): 事件佇列完結。RxJava 不僅把每個事件單獨處理,還會把它們看做一個佇列。RxJava 規定,當不會再有新的onNext()發出時,需要觸發 onCompleted()方法作為標誌。
onError(): 事件佇列異常。在事件處理過程中出異常時,onError()會被觸發,同時佇列自動終止,不允許再有事件發出。
在一個正確執行的事件序列中, onCompleted()和 onError()有且只有一個,並且是事件序列中的最後一個。需要注意的是,onCompleted()
和 onError()二者也是互斥的,即在佇列中呼叫了其中一個,就不應該再呼叫另一個。
RxJava 的觀察者模式大致如下圖:
三、基本實現
1、建立Observer
Observer 即觀察者,它決定事件觸發的時候將有怎樣的行為。 RxJava 中的 Observer介面的實現方式:
Observer<String> observer = new Observer<String>() {
@Override
public void onNext(String s) {
Log.d(tag, "Item: " + s);
}
@Override
public void onCompleted() {
Log.d(tag, "Completed!");
}
@Override
public void onError(Throwable e) {
Log.d(tag, "Error!");
}
};
除了 Observer 介面之外,RxJava 還內建了一個實現了 Observer的抽象類:Subscriber。 Subscriber對 Observer介面進行了一些擴充套件,但他們的基本使用方式是完全一樣的:
Subscriber<String> subscriber = new Subscriber<String>() {
@Override
public void onNext(String s) {
Log.d(tag, "Item: " + s);
}
@Override
public void onCompleted() {
Log.d(tag, "Completed!");
}
@Override
public void onError(Throwable e) {
Log.d(tag, "Error!");
}
};
不僅基本使用方式一樣,實質上,在 RxJava 的 subscribe 過程中,Observer也總是會先被轉換成一個 Subscriber再使用。所以如果你只想使用基本功能,選擇 Observer和 Subscriber是完全一樣的。它們的區別對於使用者來說主要有兩點:
onStart(): 這是 Subscriber增加的方法。它會在 subscribe 剛開始,而事件還未傳送之前被呼叫,可以用於做一些準備工作,例如資料的清零或重置。這是一個可選方法,預設情況下它的實現為空。需要注意的是,如果對準備工作的執行緒有要求(例如彈出一個顯示進度的對話方塊,這必須在主執行緒執行), onStart()就不適用了,因為它總是在 subscribe 所發生的執行緒被呼叫,而不能指定執行緒。要在指定的執行緒來做準備工作,可以使用 doOnSubscribe()方法,具體可以在後面的文中看到。
unsubscribe(): 這是 Subscriber所實現的另一個介面 Subscription的方法,用於取消訂閱。在這個方法被呼叫後,Subscriber將不再接收事件。一般在這個方法呼叫前,可以使用 isUnsubscribed()先判斷一下狀態。 unsubscribe()這個方法很重要,因為在 subscribe()之後, Observable會持有 Subscriber的引用,這個引用如果不能及時被釋放,將有記憶體洩露的風險。所以最好保持一個原則:要在不再使用的時候儘快在合適的地方(例如 onPause()onStop()等方法中)呼叫unsubscribe()來解除引用關係,以避免記憶體洩露的發生。
2、 建立 Observable
Observable 即被觀察者,它決定什麼時候觸發事件以及觸發怎樣的事件。 RxJava 使用 create()方法來建立一個 Observable ,併為它定義事件觸發規則:
Observable observable = Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
subscriber.onNext("Hello");
subscriber.onNext("Hi");
subscriber.onNext("Aloha");
subscriber.onCompleted();
}
});
可以看到,這裡傳入了一個 OnSubscribe物件作為引數。OnSubscribe會被儲存在返回的 Observable物件中,它的作用相當於一個計劃表,當 Observable被訂閱的時候,OnSubscribe的 call()方法會自動被呼叫,事件序列就會依照設定依次觸發(對於上面的程式碼,就是觀察者Subscriber將會被呼叫三次 onNext()和一次 onCompleted())。這樣,由被觀察者呼叫了觀察者的回撥方法,就實現了由被觀察者向觀察者的事件傳遞,即觀察者模式。
create()方法是 RxJava 最基本的創造事件序列的方法。基於這個方法, RxJava 還提供了一些方法用來快捷建立事件佇列,例如:
just(T…): 將傳入的引數依次傳送出來。
Observable observable = Observable.just("Hello", "Hi", "Aloha");
// 將會依次呼叫:
// onNext("Hello");
// onNext("Hi");
// onNext("Aloha");
// onCompleted();
from(T[])/ from(Iterable<? extends T>): 將傳入的陣列或 Iterable拆分成具體物件後,依次傳送出來。
String[] words = {"Hello", "Hi", "Aloha"};
Observable observable = Observable.from(words);
// 將會依次呼叫:
// onNext("Hello");
// onNext("Hi");
// onNext("Aloha");
// onCompleted();
上面 just(T…)的例子和 from(T[])的例子,都和之前的 create(OnSubscribe)的例子是等價的。
3、Subscribe (訂閱)
建立了 Observable和 Observer之後,再用 subscribe()方法將它們聯結起來,整條鏈子就可以工作了。程式碼形式很簡單:
observable.subscribe(observer);
// 或者:
observable.subscribe(subscriber);
除了 subscribe(Observer)和 subscribe(Subscriber),subscribe()還支援不完整定義的回撥,RxJava 會自動根據定義創建出Subscriber 。形式如下:
Action1<String> onNextAction = new Action1<String>() {
// onNext()
@Override
public void call(String s) {
Log.d(tag, s);
}
};
Action1<Throwable> onErrorAction = new Action1<Throwable>() {
// onError()
@Override
public void call(Throwable throwable) {
// Error handling
}
};
Action0 onCompletedAction = new Action0() {
// onCompleted()
@Override
public void call() {
Log.d(tag, "completed");
}
};
// 自動建立 Subscriber ,並使用 onNextAction 來定義 onNext()
observable.subscribe(onNextAction);
// 自動建立 Subscriber ,並使用 onNextAction 和 onErrorAction 來定義 onNext() 和 onError()
observable.subscribe(onNextAction, onErrorAction);
// 自動建立 Subscriber ,並使用 onNextAction、 onErrorAction 和 onCompletedAction 來定義 onNext()、 onError() 和 onCompleted()
observable.subscribe(onNextAction, onErrorAction, onCompletedAction);
簡單解釋一下這段程式碼中出現的 Action1和 Action0。 Action0是 RxJava 的一個介面,它只有一個方法 call(),這個方法是無參無返回值的;由於 onCompleted()方法也是無參無返回值的,因此 Action0可以被當成一個包裝物件,將 onCompleted()的內容打包起來將自己作為一個引數傳入 subscribe()以實現不完整定義的回撥。這樣其實也可以看做將 onCompleted() 方法作為引數傳進了subscribe(),相當於其他某些語言中的『閉包』。
Action1也是一個介面,它同樣只有一個方法 call(T param),這個方法也無返回值,但有一個引數;與 Action0同理,由於 onNext(T obj)和 onError(Throwable error)也是單引數無返回值的,因此 Action1可以將 onNext(obj)和 onError(error)打包起來傳入 subscribe()以實現不完整定義的回撥。事實上,雖然 Action0和 Action1在 API 中使用最廣泛,但 RxJava 是提供了多個 ActionX形式的介面 (例如 Action2, Action3) 的,它們可以被用以包裝不同的無返回值的方法。
三、場景示例
1. 列印字串陣列
將字串陣列 names中的所有字串依次打印出來:
String[] names = ...;
Observable.from(names)
.subscribe(new Action1<String>() {
@Override
public void call(String name) {
Log.d(tag, name);
}
});
在 RxJava 的預設規則中,事件的發出和消費都是在同一個執行緒的。也就是說,如果只用上面的方法,實現出來的只是一個同步的觀察者模式。觀察者模式本身的目的就是『後臺處理,前臺回撥』的非同步機制,因此非同步對於 RxJava 是至關重要的。而要實現非同步,則需要用到 RxJava 的另一個概念: Scheduler。今天就先講到這裡,下篇繼續