用RxJava實現Rxbus替換EventBus事件匯流排
首先,Rxjava不必多說,可以說和Retrofit是年度最火框架,在GitHub上都已經超過兩萬star,Eventbus也不必多說,目前大多數開發者大多數專案一定會用到EventBus或者Otto作為事件匯流排通訊庫,對於RxJava使用者來說,RxJava也可以輕鬆實現事件匯流排,因為它們都依據於觀察者模式。本文介紹Rxbus如何完美替換Eventbus,減少APP體積.
不多說,上程式碼
/**
* 作者:JackyZ fenglizizhu
* 說明:事件的訂閱
*/
public class RxBus {
private static volatile RxBus mDefaultInstance;
private final Subject<Object,Object> mBus;
private final Map<Class<?>,Object> mStickEventMap;
public RxBus() {
//將Subject(PublishSubject)轉換為SerializedSubject
mBus = new SerializedSubject<>(PublishSubject.create());
mStickEventMap=new ConcurrentHashMap<>();
}
/*單例*/
public static RxBus getDefault(){
if(mDefaultInstance==null){
synchronized (RxBus.class){
if(mDefaultInstance==null){
mDefaultInstance=new RxBus();
}
}
}
return mDefaultInstance;
}
/*傳送事件*/
public void post(Object event){
if(mBus == null) {
mDefaultInstance=new RxBus();
}
mBus.onNext(event);
}
/*訂閱事件*/
public <T> Observable<T> toObservable(Class<T> eventType){
return mBus.ofType(eventType);//ofType可以根據事件型別傳送指定資料
}
/*傳送一個新的Sticky事件*/
public void postSticky(Object event){
synchronized (mStickEventMap){//執行緒鎖
mStickEventMap.put(event.getClass(),event);//將事件型別儲存到Map集合
}
post(event);
}
/*訂閱Sticky事件*/
public <T> Observable<T> tObservableStick(final Class<T> eventType){
synchronized (mStickEventMap){
final Observable<T> observable=mBus.ofType(eventType);//獲取傳送事件的Observable
final Object event=mStickEventMap.get(eventType);//根據事件型別作為key查詢value對應的value
if(null!=event){
return observable.mergeWith(Observable.create(new Observable.OnSubscribe<T>(){//通過mergeWith合併兩個Observable
@Override
public void call(Subscriber<? super T> subscriber) {
//訂閱 eventType.cast(event)直接將eventType轉換為 T傳送
subscriber.onNext(eventType.cast(event));
}
}));
}else {
return observable;//如果沒有sticky 就返回observable
}
}
}
/*根據eventType獲取事件*/
public <T> T getStickEvent(Class<T> eventType){
synchronized (mStickEventMap){
return eventType.cast(mStickEventMap.get(eventType));
}
}
/*移除指定型別的eventType的Sticky事件*/
public <T> T removeStickyEvent(Class<T> eventType){
synchronized (mStickEventMap){
return eventType.cast(mStickEventMap.remove(eventType));
}
}
/*移除所有的Sticky事件*/
public void removeAllStickyEvents(){
synchronized (mStickEventMap){
mStickEventMap.clear();
}
}
開始分析
注:
1、Subject同時充當了Observer和Observable的角色,Subject是非執行緒安全的,要避免該問題,需要將 Subject轉換為一個 SerializedSubject ,上述RxBus類中把執行緒非安全的PublishSubject包裝成執行緒安全的Subject。
2、PublishSubject只會把在訂閱發生的時間點之後來自原始Observable的資料發射給觀察者。
3、ofType操作符只發射指定型別的資料,其內部就是filter+cast
public final <R> Observable<R> ofType(final Class<R> klass) {
return filter(new Func1<T, Boolean>() {
@Override
public final Boolean call(T t) {
return klass.isInstance(t);
}
}).cast(klass);
}
filter操作符可以使你提供一個指定的測試資料項,只有通過測試的資料才會被“發射”。
cast操作符可以將一個Observable轉換成指定型別的Observable。
分析:
RxBus工作流程圖
1、首先建立一個可同時充當Observer和Observable的Subject;
2、在需要接收事件的地方,訂閱該Subject(此時Subject是作為Observable),在這之後,一旦Subject接收到事件,立即發射給該訂閱者;
3、在我們需要傳送事件的地方,將事件post至Subject,此時Subject作為Observer接收到事件(onNext),然後會發射給所有訂閱該Subject的訂閱者。
對於RxBus的使用,就和普通的RxJava訂閱事件很相似了。
先看傳送事件的程式碼:
RxBus.getDefault().post(new UserEvent (1, "yoyo"));
userEvent是要傳送的事件,如果你用過EventBus, 很容易理解,UserEvent的程式碼:
public class UserEvent {
long id;
String name;
public UserEvent(long id,String name) {
this.id= id;
this.name= name;
}
public long getId() {
return id;
}
public String getName() {
return name;
}
}
再看接收事件的程式碼:
// rxSubscription是一個Subscription的全域性變數,這段程式碼可以在onCreate/onStart等生命週期內
rxSubscription = RxBus.getDefault().toObserverable(UserEvent.class)
.subscribe(new Action1<UserEvent>() {
@Override
public void call(UserEvent userEvent) {
long id = userEvent.getId();
String name = userEvent.getName();
...
}
},
new Action1<Throwable>() {
@Override
public void call(Throwable throwable) {
// TODO: 處理異常
}
});
最後,一定要記得在生命週期結束的地方取消訂閱事件,防止RxJava可能會引起的記憶體洩漏問題。
@Override
protected void onDestroy() {
super.onDestroy();
if(!rxSubscription.isUnsubscribed()) {
rxSubscription.unsubscribe();
}
}
如果你的專案已經開始使用RxJava,建議可以把EventBus或Otto替換成RxBus,減小專案體積。
支援Sticky事件
在Android開發中,Sticky事件只指事件消費者在事件釋出之後才註冊的也能接收到該事件的特殊型別。Android中就有這樣的例項,也就是Sticky Broadcast,即粘性廣播。正常情況下如果傳送者傳送了某個廣播,而接收者在這個廣播發送後才註冊自己的Receiver,這時接收者便無法接收到剛才的廣播,為此Android引入了StickyBroadcast,在廣播發送結束後會儲存剛剛傳送的廣播(Intent),這樣當接收者註冊完Receiver後就可以接收到剛才已經發布的廣播。這就使得我們可以預先處理一些事件,讓有消費者時再把這些事件投遞給消費者。
Subject
我們在實現簡單的RxBus時使用了PublishSubject
,其實RxJava提供給開發者4種Subject:
PublishSubject,BehaviorSubject ,BehaviorSubject,AsyncSubject。
-
PublishSubject
只會給在訂閱者訂閱的時間點之後的資料傳送給觀察者。 -
BehaviorSubject
在訂閱者訂閱時,會發送其最近傳送的資料(如果此時還沒有收到任何資料,它會發送一個預設值)。 -
ReplaySubject
在訂閱者訂閱時,會發送所有的資料給訂閱者,無論它們是何時訂閱的。 -
AsyncSubject
只在原Observable事件序列完成後,傳送最後一個數據,後續如果還有訂閱者繼續訂閱該Subject, 則可以直接接收到最後一個值。
Subject
從上圖來看,似乎BehaviorSubject
和ReplaySubject
具備Sticky的特性。
BehaviorSubject方案
BehaviorSubject
似乎完全符合Sticky的定義,但是你發現了它只能儲存最近的那個事件。
有這樣的場景:如果訂閱者A訂閱了Event1,訂閱者B訂閱了Event2,而此時BehaviorSubject
事件佇列裡是[...,
Event2, Event1],當訂閱者訂閱時,因為儲存的是最近的事件:即Event1,所以訂閱者B是接收不到Event2的。
解決辦法就是:
每個Event型別都各自建立一個對應的BehaviorSubject
,這樣的話資源開銷比較大,並且該Sticky事件匯流排和普通的RxBus事件匯流排不能共享,即:普通事件和Sticky事件是獨立的,因為普通事件是基於PublishSubject
的,
暫時放棄該方案!
ReplaySubject方案
ReplaySubject
可以儲存傳送過的所有資料事件。
因為儲存了所有的資料事件,所以不管什麼型別的Event,我們只要過濾該型別,並讓其傳送最近的那個Event即可滿足Sticky事件了。但是獲取最近的對應事件是個難點,因為最符合需求的操作符takeLast()
僅在訂閱事件結束時(即:onCompleted()
)才會傳送最後的那個資料事件,而我們的RxBus正常情況下應該是儘量避免其訂閱事件結束的。(我沒能找到合適的操作符,如果你知道,請告知我)
所以BehaviorSubject也是比較難實現Sticky特性的。
並且,不管是BehaviorSubject
還是ReplaySubject
,它們還有一個棘手的問題:它們實現的EventBus和普通的RxBus(基於PublishSubject
)之間的資料是相互獨立的!
總結:BehaviorSubject
和BehaviorSubject
都不能天然適合Sticky事件......
使用Map實現Sticky
該方法思路是在原來PublishSubject
實現的RxBus基礎上,使用ConcurrentHashMap<事件型別,事件>儲存每個事件的最近事件,不僅能實現Sticky特性,最重要的是可以和普通RxBus的事件資料共享,不獨立。
因為我們的RxBus是基於
PublishSubject
的,而RxJava又有4種Subject,而且其中的BehaviorSubject
和ReplaySubject
看起來又符合Sticky特性,所以我們可能會鑽這個牛角尖,理所當然的認為實現Sticky需要通過其他型別的Subject.... (好吧,我鑽進去了...)
這個方案的思路我是根據EventBus的實現想到的,下面是大致流程:
-
Map的初始化:
private final Map<Class<?>, Object> mStickyEventMap; public RxBus() { mBus = new SerializedSubject<>(PublishSubject.create()); mStickyEventMap = new ConcurrentHashMap<>(); }
ConcurrentHashMap是一個執行緒安全的HashMap, 採用stripping lock(分離鎖),效率比HashTable高很多。
-
在我們
postSticky(Event)
時,存入Map中:public void postSticky(Object event) { synchronized (mStickyEventMap) { mStickyEventMap.put(event.getClass(), event); } post(event); }
-
訂閱時
toObservableSticky(Class<T> eventType)
,先從Map中尋找是否包含該型別的事件,如果沒有,則說明沒有Sticky事件要傳送,直接訂閱Subject(此時作為被觀察者Observable);如果有,則說明有Sticky事件需要傳送,訂閱merge(Subject 和 Sticky事件)
。public <T> Observable<T> toObservableSticky(final Class<T> eventType) { synchronized (mStickyEventMap) { Observable<T> observable = mBus.ofType(eventType); final Object event = mStickyEventMap.get(eventType); if (event != null) { return observable.mergeWith(Observable.create(new Observable.OnSubscribe<T>() { @Override public void call(Subscriber<? super T> subscriber) { subscriber.onNext(eventType.cast(event)); } })); } else { return observable; } } }
merge操作符:可以將多個Observables合併,就好像它們是單個的Observable一樣。
這樣,Sticky的核心功能就完成了,使用上和普通RxBus一樣,通過postSticky()
傳送事件,toObservableSticky()
訂閱事件。
除此之外,我還提供了getStickyEvent(Class<T> eventType)
,removeStickyEvent(Class<T>
eventType)
,removeAllStickyEvents()
方法,供查詢、移除對應事件型別的事件、移除全部Sticky事件。
在使用Sticky特性時,在不需要某Sticky事件時, 通過removeStickyEvent(Class<T>
eventType)
移除它,最保險的做法是:在主Activity的onDestroy
裡removeAllStickyEvents()
。
因為我們的RxBus是個單例靜態物件,再正常退出app時,該物件依然會存在於JVM,除非程序被殺死,這樣的話導致StickyMap裡的資料依然存在,為了避免該問題,需要在app退出時,清理StickyMap。
// 主Activity(一般是棧底Activity)
@Override
protected void onDestroy() {
super.onDestroy();
// 移除所有Sticky事件
RxBus.getDefault().removeAllStickyEvents();
}
異常處理
在使用RxBus過程中,你會發現你訂閱了某個事件後,在後續接收到該事件時,如果處理的過程中發生了異常,你會發現後續的事件再也接收不到了,除非你重新訂閱!
原因在於RxJava的事件序列機制,一個訂閱事件是以onCompleted()
或者onError()
作為結束的,即:一旦訂閱者的onCompleted()
或onError()
被呼叫,訂閱者和被訂閱者的訂閱關係就解除了。
這裡說下RxJava的異常傳遞機制:onError()
在Observable序列傳遞過程中出現任何異常時被呼叫,然後終止Observable事件序列的傳遞,以此通知所有的訂閱者發生了一個不可恢復的錯誤,即:異常總會傳遞到訂閱者。
這本是RxJava的一個優點,反而在事件匯流排的場景下,成了讓人頭疼的問題!
所以我們的RxBus的訂閱者在處理訂閱事件時,一旦發生了異常,而又沒Catch,那麼最終都會呼叫到onError()
,而一旦走到onError()
,就意味著這個訂閱者和該Subject解除了訂閱關係,因此再也收不到後續發出的事件了
我目前想到2種方案
解決方案1:自動重新訂閱
即在onError(e)或onCompleted()
發生時,立即重新訂閱,保證訂閱事件在解決時可以立即恢復。
private void subscribeEvent(){
RxBus.getDefault().toObservable(Event.class)
// 使用操作符過程中,無需try,catch,直接使用
.subscribe(new Subscriber<Event>() {
@Override
public void onCompleted() {
subscribeEvent();
}
@Override
public void onError(Throwable e) {
e.printStackTrace();
subscribeEvent();
}
@Override
public void onNext(Event event) {
// 直接處理接收的事件
}
});
}
注意:這個方案僅可以用於普通RxBus,絕不能用於Sticky事件!
原因在於Sticky的粘性特性,引起error的事件如果重新訂閱的話,該事件很可能繼續導致error,從而引起死迴圈!
解決方案2:捕捉
不管是普通RxBus還是使用Sticky的RxBus,通用的解決方案就是捕捉異常:在任何操作符內的ActionX
,FuncX
方法以及onNext(T
t)
內使用try,catch處理。
RxBus.getDefault().toObservableSticky(EventSticky.class) // 建議在Sticky時,在操作符內主動try,catch
.map(new Func1<EventSticky, EventSticky>() {
@Override
public EventSticky call(EventSticky eventSticky) {
try {
// 變換操作
} catch (Exception e) {
e.printStackTrace();
}
return eventSticky;
}
})
.subscribe(new Action1<EventSticky>() {
@Override
public void call(EventSticky eventSticky) {
try {
// 處理接收的事件
} catch (Exception e) {
e.printStackTrace();
}
}
});