Rxjava和EventBus的使用比較
阿新 • • 發佈:2019-02-05
EventBus訂閱釋出模式
- 概念:EventBus是一個Android端優化的publish/subscribe訊息匯流排,簡化了應用程式內各元件間、元件與後臺執行緒間的通訊。比如請求網路,等網路返回時通過Handler或Broadcast通知UI,兩個Fragment之間需要通過Listener通訊,這些需求都可以通過EventBus實現。
- EventBus比較適合僅僅當做元件間的通訊工具使用,主要用來傳遞訊息。
- 使用EventBus可以避免搞出一大推的interface
使用方法
新增依賴
compile 'org.greenrobot:eventbus:3.0.0'
註冊
- 在onStart()方法中註冊
//註冊eventBus
@Override
protected void onStart() {
super.onStart();
EventBus.getDefault().register(this);
}
//取消註冊
@Override
protected void onStop() {
super.onStop();
EventBus.getDefault().unregister(this);
}
訂閱者
- @Subscribe註解來描述一個public無返回值的非靜態方法,註解後面可以跟threadMode,來給定訂閱者處理事件所在的執行緒。
@Subscribe(threadMode = ThreadMode.MAIN)
public void onEventMainThread(IdEvent event) {
if (event != null) {
System.out.println("onEventMainThread:"+event.getId()+" "+Thread.currentThread().getName());
textView.setText(""+event.getId());
}else {
System.out.println("event:" +event);
}
}
- EventBus包含4個ThreadMode
- ThreadMode.POSTING
- ThreadMode.MAIN
- ThreadMode.BACKGROUND
- ThreadMode.ASYNC
/**
* 在後臺執行緒中執行,如果當前執行緒是子執行緒,則會在當前執行緒執行,如果當前執行緒是主執行緒,則會建立一個新的子執行緒來執行
* @param event
*/
@Subscribe(threadMode = ThreadMode.BACKGROUND)
public void onEventBackgroundThread(MessageEvent event){
System.out.println("onEventBackgroundThread::"+" "+Thread.currentThread().getName());
}
/**
* 建立一個非同步執行緒來執行
* @param event
*/
@Subscribe(threadMode = ThreadMode.ASYNC)
public void onEventAsync(MessageEvent event){
System.out.println("onEventAsync::"+" "+Thread.currentThread().getName());
}
/**
* 在主執行緒中執行
* @param event
*/
@Subscribe(threadMode = ThreadMode.MAIN)
public void onEventMain(MessageEvent event){
System.out.println("onEventMain::"+" "+Thread.currentThread().getName());
}
/**
*預設的執行緒模式,在當前執行緒下執行。如果當前執行緒是子執行緒則在子執行緒中,當前執行緒是主執行緒,則在主執行緒中執行。
* @param event
*/
@Subscribe(threadMode = ThreadMode.POSTING)
public void onEventPosting(MessageEvent event){
System.out.println("onEventPosting::"+" "+Thread.currentThread().getName());
}
釋出者
- 不管釋出者是在主執行緒還是子執行緒,釋出的訊息在所有定義好的實體型別訂閱者中都可以接收到訊息。也就是,可以實現一個訂閱者訂閱多個事件,和一個事件對應多個訂閱者。
EventBus.getDefault().post(new MessageEvent("你好!"));
以下是測試的結果圖:
在主執行緒中測試的結果
發生的是另外的實體型別
在子執行緒中釋出訊息
RxJava
- 概念:”a library for composing asynchronous and event-based programs using observable sequences for the Java VM”(一個在 Java VM 上使用可觀測的序列來組成非同步的、基於事件的程式的庫),簡單的用中文概括也就是兩個字:非同步。
- RxJava 的優勢即是簡潔,但它的簡潔的與眾不同之處在於,隨著程式邏輯變得越來越複雜,它依然能夠保持簡潔。
使用方法
新增依賴
compile 'io.reactivex.rxjava2:rxjava:2.0.1'
//RxAndroid的依賴包
compile 'io.reactivex.rxjava2:rxandroid:2.0.1'
被觀察者與觀察者
- Observeable(被觀察者)/Observer(觀察者)
- Flowable(被觀察者)/Subscriber(觀察者)
- 2.x中提供了以上兩種觀察者與被觀察者的關係。
- Observeable用於訂閱Observer,是不支援背壓的,而Flowable用於訂閱Subscriber,是支援背壓(Backpressure)的。
- 背壓的概念是:指在非同步場景中,被觀察者傳送事件速度遠快於觀察者的處理速度的情況下,一種告訴上游的被觀察者降低傳送速度的策略。
//被觀察者在主執行緒中,每1ms傳送一個事件
Observable.interval(1, TimeUnit.MILLISECONDS)
//.subscribeOn(Schedulers.newThread())
//將觀察者的工作放在新執行緒環境中
.observeOn(Schedulers.newThread())
//觀察者處理每1000ms才處理一個事件
.subscribe(new Action1() {
@Override
public void call(Long aLong) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
Log.w("TAG","---->"+aLong);
}
});
在上面的程式碼中,被觀察者傳送事件的速度是觀察者處理速度的1000倍
這段程式碼執行之後:
...
Caused by: rx.exceptions.MissingBackpressureException
...
...
- 所以在2.x之後使用Flowable支援被壓的被觀察者,一般而言,上游的被觀察者會響應下游觀察者的資料請求,下游呼叫request(n)來告訴上游傳送多少個數據。這樣避免了大量資料堆積在呼叫鏈上,使記憶體一直處於較低水平。
Flowable<String> t = Flowable.create(new FlowableOnSubscribe<String>() {
@Override
public void subscribe(FlowableEmitter<String> e) throws Exception {
e.onNext("hello Rx2.0.");
e.onComplete();
}
}, BackpressureStrategy.BUFFER);
- 下游被觀察者subscriber 告知上游請求的資料
- 我們需要呼叫request去請求資源,引數就是要請求的數量,一般如果不限制請求數量,可以寫成Long.MAX_VALUE。如果你不呼叫request,Subscriber的onNext和onComplete方法將不會被呼叫。
Subscriber subscriber = new Subscriber() {
@Override
public void onSubscribe(Subscription s) {
System.out.println("onSubscribe()");
s.request(Long.MAX_VALUE);
}
@Override
public void onNext(Object o) {
System.out.println(""+o.toString());
}
@Override
public void onError(Throwable t) {
}
@Override
public void onComplete() {
}
};
除了上面這兩種觀察者,還有一類觀察者
* Single/SingleObserver ——— 返回泛型資料的結果給觀察者
* Completable/CompletableObserver ——– 返回完成的結果
* Maybe/MaybeObserver ——– 前兩者的複合體
* Single為例:
Single.create(new SingleOnSubscribe<Integer>() {
@Override
public void subscribe(SingleEmitter<Integer> e) throws Exception {
e.onSuccess(10);
}
}).subscribe(new SingleObserver<Integer>() {
@Override
public void onSubscribe(Disposable d) {
System.out.println("onSubscribe()");
}
@Override
public void onSuccess(Integer value) {
System.out.println("onSuccess:"+value);
}
@Override
public void onError(Throwable e) {
System.out.println("onError()");
}
});
- 顯示結果:
12-06 10:20:35.278 21057-21057/com.flw.rx20_demo I/System.out: onSubscribe()
12-06 10:20:35.278 21057-21057/com.flw.rx20_demo I/System.out: onSuccess:10
- Completable為例:
Completable.create(new CompletableOnSubscribe() {
@Override
public void subscribe(CompletableEmitter e) throws Exception {
System.out.println("subscribe");
e.onComplete();
}
}).subscribe(new CompletableObserver() {
@Override
public void onSubscribe(Disposable d) {
System.out.println("onSubscribe");
}
@Override
public void onComplete() {
System.out.println("onComplete");
}
@Override
public void onError(Throwable e) {
System.out.println("onError");
}
});
顯示結果:
12-06 10:02:31.964 4341-4341/? I/System.out: onSubscribe
12-06 10:02:31.964 4341-4341/? I/System.out: subscribe
12-06 10:02:31.964 4341-4341/? I/System.out: onComplete
- Maybe為例:
Maybe.create(new MaybeOnSubscribe<String>() {
@Override
public void subscribe(MaybeEmitter<String> e) throws Exception {
System.out.println("subscribe()");
e.onSuccess("hello Maybe.");
e.onComplete();
}
}).subscribe(new MaybeObserver<String>() {
@Override
public void onSubscribe(Disposable d) {
System.out.println("onSubscribe()");
}
@Override
public void onSuccess(String value) {
System.out.println("onSuccess():"+value);
}
@Override
public void onError(Throwable e) {
System.out.println("onError():"+e);
}
@Override
public void onComplete() {
System.out.println("onComplete()");
}
});
- 列印結果如下:
12-06 10:37:35.605 3344-3344/com.flw.rx20_demo I/System.out: onSubscribe()
12-06 10:37:35.605 3344-3344/com.flw.rx20_demo I/System.out: subscribe()
12-06 10:37:35.605 3344-3344/com.flw.rx20_demo I/System.out: onSuccess():hello Maybe.
Rxjava內建了幾個 Scheduler:
- Schedulers.immediate() 預設的,直接在當前執行緒執行。
- Schedulers.newThread() 啟用新執行緒,在新執行緒工作。
- Schedulers.io() I/O操作(讀寫檔案,讀寫資料庫,網路資訊互動等)、和newThread()最大的區別是:io()內部實現是一個無數量上限的執行緒池,可以重用空閒的執行緒。
- Schedulers.computation():計算所使用的 Scheduler。這個計算指的是 CPU 密集型計算,即不會被 I/O 等操作限制性能的操作,例如圖形的計算。這個 Scheduler 使用的固定的執行緒池,大小為 CPU 核數。不要把 I/O 操作放在 computation() 中,否則 I/O 操作的等待時間會浪費 CPU。
- AndroidSchedulers.mainThread() Android專用的,指定的操作在Android的主執行緒執行。
subscribeOn()和observerOn()
subscribeOn() 指定subscribe() 所發生的執行緒,即 Observable.OnSubcribe 被啟用時所處的執行緒,或者叫事件產生的執行緒。
observerOn() 指定Subscriber 執行所在的執行緒,或者叫做事件消費的執行緒。
Rxjava常用的操作符
- create
create操作符是所有建立型操作符的“根”,也就是說其他建立型操作符最後都是通過create操作符來建立Observable的. - from
from操作符是把其他型別的物件和資料型別轉化成Observable,接收一個集合作為輸入。 - just
just操作符也是把其他型別的物件和資料型別轉化成Observable,它和from操作符很像,只是方法的引數有所差別。接收一個可變引數作為輸入。 - defer
defer操作符是直到有訂閱者訂閱時,才通過Observable的工廠方法建立Observable並執行,defer操作符能夠保證Observable的狀態是最新的 - timer
隔一段時間產生一個數字,然後就結束,也就是延遲時間接受到資料。timer操作符預設情況下是執行在一個新執行緒上的,當然你可以通過傳入引數來修改其執行的執行緒。 - interval
interval操作符是每隔一段時間就產生一個數字,這些數字從0開始,一次遞增1直至無窮大。 - range
range操作符是建立一組在從n開始,個數為m的連續數字,比如range(3,10),就是建立3、4、5…12的一組數字 - map
map函式只有一個引數,引數一般是Func1,Func1的
必要解釋下一些操作符的使用:
- timer:
// o 指的是Observable物件,在全域性定義了
//相當於延遲兩秒接收到被觀察者發生的資料
o.timer(2,TimeUnit.SECONDS,Schedulers.newThread())
.subscribe(observer);
- interval:
//從0開始遞增,每隔2s遞增1,一直遞增至無窮大
o = Observable.interval(2,TimeUnit.SECONDS);
o.subscribe(observer);
綜合來看rxjava,也就是以下四點:
- 作用 - 非同步
- 模式 - 觀察者模式
- 結構 - 響應式程式設計
- 優勢 - 邏輯簡潔
Rxjava和eventbus使用對比:
- 如果一個訂閱者需要註冊多個事件的時候,Rxjava需要一個個單獨的註冊,而EventBus則可以實現一個訂閱者訂閱多個事件,和一個事件對應多個訂閱者。所以在Rxjava的基礎上有了Rxbus來作為事件匯流排的庫。這裡不做Rxbus的解釋。