1. 程式人生 > >Rxjava和EventBus對比

Rxjava和EventBus對比

總的來說,EventBus是一款針對Android優化的釋出/訂閱事件匯流排,主要功能是替代Intent,Handler,BroadCast在Fragment,Activity,Service,執行緒之間傳遞訊息。而Rxjava則是一種基於非同步資料流的處理方案。如果一個訂閱者需要註冊多個事件的時候,Rxjava需要一個個單獨的註冊,而EventBus則可以實現一個訂閱者訂閱多個事件,和一個事件對應多個訂閱者。

EventBus

EventBus是一個Android端優化的publish/subscribe訊息匯流排,簡化了應用程式內各元件間、元件與後臺執行緒間的通訊。比如請求網路,等網路返回時通過Handler或Broadcast通知UI,兩個Fragment之間需要通過Listener通訊,這些需求都可以通過EventBus實現。EventBus僅僅適合當做元件間的通訊工具使用,主要用來傳遞訊息,避免搞出一大堆的interface。

使用

新增依賴
使用EventBus之前需要新增相關的依賴:

compile 'org.greenrobot:eventbus:3.0.0'

然後在onStart()方法中註冊它,在onStop()方法中消耗。

    //註冊eventBus
    @Override
    protected void onStart() {
        super.onStart();
        EventBus.getDefault().register(this);
    }

    //取消註冊
    @Override
    protected void onStop
() { super.onStop(); EventBus.getDefault().unregister(this); }

在需要接受的地方新增訂閱者(使用@Subscribe註解),@Subscribe註解來描述一個public無返回值的非靜態方法,註解後面可以跟threadMode,來給定訂閱者處理事件所在的執行緒。

@Subscribe(threadMode = ThreadMode.MAIN)
    public void onEventMainThread(IdEvent event) {
        if (event != null
) { } }

EventBus包含4個ThreadMode:

  • ThreadMode.POSTING
    事件的處理在和事件的傳送在相同的程序,所以事件處理時間不應太長,不然影響事件的傳送執行緒,而這個執行緒可能是UI執行緒;
  • ThreadMode.MAIN
    事件的處理會在UI執行緒中執行,事件處理不應太長時間;
  • ThreadMode.BACKGROUND
    事件的處理會在一個後臺執行緒中執行,儘管是在後臺執行緒中執行,事件處理時間不應太長;
  • ThreadMode.ASYNC
    事件處理會在單獨的執行緒中執行,主要用於在後臺執行緒中執行耗時操作,每個事件會開啟一個執行緒(有執行緒池),但最好限制執行緒的數目。

例如:

/**
     * 在後臺執行緒中執行,如果當前執行緒是子執行緒,則會在當前執行緒執行,如果當前執行緒是主執行緒,則會建立一個新的子執行緒來執行
     * @param event
     */
    @Subscribe(threadMode = ThreadMode.BACKGROUND)
    public void  onEventBackgroundThread(MessageEvent event){
        System.out.println("onEventBackgroundThread::"+" "+Thread.currentThread().getName());
    }

    /**
     * 建立一個非同步執行緒來執行
     * @param event
     */
    @Subscribe(threadMode = ThreadMode.ASYNC)
    public void onEventAsync(MessageEvent event){
        System.out.println("onEventAsync::"+" "+Thread.currentThread().getName());
    }

    /**
     * 在主執行緒中執行
     * @param event
     */
    @Subscribe(threadMode = ThreadMode.MAIN)
    public void onEventMain(MessageEvent event){
        System.out.println("onEventMain::"+" "+Thread.currentThread().getName());
    }

    /**
     *預設的執行緒模式,在當前執行緒下執行。如果當前執行緒是子執行緒則在子執行緒中,當前執行緒是主執行緒,則在主執行緒中執行。
     * @param event
     */
    @Subscribe(threadMode = ThreadMode.POSTING)
    public void onEventPosting(MessageEvent event){
        System.out.println("onEventPosting::"+" "+Thread.currentThread().getName());
    }

釋出者是在主執行緒還是子執行緒,釋出的訊息在所有定義好的實體型別訂閱者中都可以接收到訊息。也就是,可以實現一個訂閱者訂閱多個事件,和一個事件對應多個訂閱者。

EventBus.getDefault().post(new MessageEvent("hello"));

RxJava

RxJava 在 GitHub 主頁上的自我介紹是 “a library for composing asynchronous and event-based programs using observable sequences for the Java VM”,翻譯成中文是,一個在 Java VM 上使用可觀測的序列來組成非同步的、基於事件的程式的庫,也就是一個非同步事件庫。
RxJava 的優勢即是簡潔,但它的簡潔的與眾不同之處在於,隨著程式邏輯變得越來越複雜,它依然能夠保持簡潔。

使用

使用RxJava之前需要先新增相關的依賴:

compile 'io.reactivex.rxjava2:rxjava:2.1.8'
compile 'io.reactivex.rxjava2:rxandroid:2.1.8'

使用RxJava之前,有以下幾個概念需要注意:

  • Observeable(被觀察者)/Observer(觀察者)
  • Flowable(被觀察者)/Subscriber(觀察者)
//被觀察者在主執行緒中,每1ms傳送一個事件
Observable.interval(1, TimeUnit.MILLISECONDS)
                //.subscribeOn(Schedulers.newThread())
                //將觀察者的工作放在新執行緒環境中
                .observeOn(Schedulers.newThread())
                //觀察者處理每1000ms才處理一個事件
                .subscribe(new Action1() {
                      @Override
                      public void call(Long aLong) {
                          try {
                              Thread.sleep(1000);
                          } catch (InterruptedException e) {
                              e.printStackTrace();
                          }
                      }
                  });

2.x中提供了以上兩種觀察者與被觀察者的關係。
Observeable用於訂閱Observer,是不支援背壓的,而Flowable用於訂閱Subscriber,是支援背壓(Backpressure)的。
背壓的概念是:指在非同步場景中,被觀察者傳送事件速度遠快於觀察者的處理速度的情況下,一種告訴上游的被觀察者降低傳送速度的策略。

Flowable<String> t = Flowable.create(new FlowableOnSubscribe<String>() {
            @Override
            public void subscribe(FlowableEmitter<String> e) throws Exception {
                e.onNext("hello Rx2.0.");
                e.onComplete();
            }
        }, BackpressureStrategy.BUFFER);

一般而言,上游的被觀察者會響應下游觀察者的資料請求,下游呼叫request(n)來告訴上游傳送多少個數據。這樣避免了大量資料堆積在呼叫鏈上,使記憶體一直處於較低水平。

我們需要呼叫request去請求資源,引數就是要請求的數量,一般如果不限制請求數量,可以寫成Long.MAX_VALUE。如果你不呼叫request,Subscriber的onNext和onComplete方法將不會被呼叫。

Subscriber subscriber = new Subscriber() {
            @Override
            public void onSubscribe(Subscription s) {
                System.out.println("onSubscribe()");
                s.request(Long.MAX_VALUE);
            }

            @Override
            public void onNext(Object o) {
                System.out.println(""+o.toString());
            }

            @Override
            public void onError(Throwable t) {

            }

            @Override
            public void onComplete() {

            }
        };

除了上面這兩種觀察者,還有一類觀察者:

  • Single/SingleObserver: 返回泛型資料的結果給觀察者
  • Completable/CompletableObserver:返回完成的結果
  • Maybe/MaybeObserver : 前兩者的複合體

Rxjava內建 Scheduler

  • Schedulers.immediate() :預設的,直接在當前執行緒執行;
  • Schedulers.newThread() :啟用新執行緒,在新執行緒工作;
  • Schedulers.io():I/O操作(讀寫檔案,讀寫資料庫,網路資訊互動等)、和newThread()最大的區別是:io()內部實現是一個無數量上限的執行緒池,可以重用空閒的執行緒;
  • Schedulers.computation():計算所使用的 Scheduler。這個計算指的是 CPU 密集型計算,即不會被 I/O等操作限制性能的操作,例如圖形的計算。這個 Scheduler 使用的固定的執行緒池,大小為 CPU 核數。不要把 I/O 操作放在
    computation() 中,否則 I/O 操作的等待時間會浪費 CPU。
  • AndroidSchedulers.mainThread(): Android專用的,指定的操作在Android的主執行緒執行。

subscribeOn()和observerOn()

  • subscribeOn() 指定subscribe() 所發生的執行緒,即 Observable.OnSubcribe被啟用時所處的執行緒,或者叫事件產生的執行緒。
  • observerOn() 指定Subscriber 執行所在的執行緒,或者叫做事件消費的執行緒。