1. 程式人生 > >Rxjava學習筆記(一)

Rxjava學習筆記(一)

RxJava 2.0

要在Android中使用RxJava2, 先新增Gradle配置:

compile ‘io.reactivex.rxjava2:rxjava:2.0.1’
compile ‘io.reactivex.rxjava2:rxandroid:2.0.1’

rxjava

簡單使用姿勢:

        //1.建立被觀察者
        Observable<Integer> observable = Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
public void subscribe(ObservableEmitter<Integer> e) throws Exception { e.onNext(1); e.onNext(2); e.onNext(3); e.onNext(11); e.onNext(8); e.onComplete(); } }); //2.建立觀察者
Observer<Integer> observer = new Observer<Integer>() { @Override public void onSubscribe(Disposable d) { Log.i(TAG, "onSubscribe: "); } @Override public void onNext(Integer value) { Log.i(TAG, "onNext: "
+ value); } @Override public void onError(Throwable e) { Log.i(TAG, "onError: "); } @Override public void onComplete() { Log.i(TAG, "onComplete: "); } }; //3.觀察者 進入 被觀察者的訂閱 observable.subscribe(observer);

鏈式呼叫:

        //鏈式呼叫
        Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> e) throws Exception {
                e.onNext(1);
                e.onNext(2);
                e.onNext(3);
                e.onNext(11);
                e.onNext(8);
                e.onComplete();
            }
        }).subscribe(new Observer<Integer>() {
                         @Override
                         public void onSubscribe(Disposable d) {
                             Log.i(TAG, "onSubscribe: ");
                         }

                         @Override
                         public void onNext(Integer value) {
                             Log.i(TAG, "onNext: " + value);
                         }

                         @Override
                         public void onError(Throwable e) {
                             Log.i(TAG, "onError: ");
                         }

                         @Override
                         public void onComplete() {
                             Log.i(TAG, "onComplete: ");
                         }
                     }
        );

執行結果:

12-29 22:05:25.654 8416-8416/tyl.rxjavap I/rxjavaTest: onSubscribe:

22:05:25.654 8416-8416/tyl.rxjavap I/rxjavaTest: onNext: 33 12-29

22:05:25.654 8416-8416/tyl.rxjavap I/rxjavaTest: onNext: 48 12-29

22:05:25.654 8416-8416/tyl.rxjavap I/rxjavaTest: onNext: 265 12-29

22:05:25.654 8416-8416/tyl.rxjavap I/rxjavaTest: onComplete:

執行順序onSubscribe->onNext->onComplete/onError;

  • ObservableEmitter: Emitter是發射器的意思,那就很好猜了,這個就是用來發出事件的,它可以發出三種類型的事件,通過呼叫emitter的onNext(T value)onComplete()onError(Throwable error)就可以分別發出next事件、complete事件和error事件。

注意點:

  • 上游可以傳送無限個onNext, 下游也可以接收無限個onNext.
    onNext
  • 當上遊傳送了一個onComplete後, 上游onComplete之後的事件將會繼續傳送, 而下游收到onComplete事件之後將不再繼續接收事件.
    傳送onComplete()
  • 當上遊傳送了一個onError後, 上游onError之後的事件將繼續傳送, 而下游收到onError事件之後將不再繼續接收事件.
    傳送onError
  • 上游可以不傳送onComplete或onError.
  • 最關鍵的是onComplete和onError必須唯一併且互斥(onComplete或onError之後下游不會繼續接收事件
  • Disposable相當於RxJava1.x中的Subscription,用於解除訂閱(可理解為切斷水管,水依舊還在流,但是流不到下游)。

    如果在請求的過程中Activity已經退出了, 這個時候如果回到主執行緒去更新UI, 那麼APP肯定就崩潰了。我可以呼叫Disposable的dispose()方法切斷水管, 使得下游收不到事件, 既然收不到事件, 那麼也就不會再去更新UI了. 因此我們可以在Activity中將這個Disposable 儲存起來, 當Activity退出時, 切斷它即可.
    
    那如果有多個Disposable 該怎麼辦呢? RxJava中已經內建了一個容器CompositeDisposable, 每當我們得到一個Disposable時就呼叫CompositeDisposable.add()將它新增到容器中, 在退出的時候, 呼叫CompositeDisposable.clear() 即可切斷所有的水管.
    

常用subscribe方法

執行緒控制

基本使用姿勢

Observable.create(new ObservableOnSubscribe<Integer>() {
    @Override
    public void subscribe(ObservableEmitter<Integer> e) throws Exception {
                Log.i(TAG, "subscribe:  currentThread is :" + Thread.currentThread().getName());
                e.onNext(1);

                e.onComplete();
            }
        })      // 指定上游執行緒
                .subscribeOn(Schedulers.newThread())//新執行緒
                .doOnNext(new Consumer<Integer>() {
                    @Override
                    public void accept(Integer integer) throws Exception {
                        Log.i(TAG, "accept 1: "  + " currentThread is :" + Thread.currentThread().getName());
                    }
                })
                .subscribeOn(Schedulers.io())//io執行緒
                .doOnNext(new Consumer<Integer>() {
                    @Override
                    public void accept(Integer integer) throws Exception {
                        Log.i(TAG, "accept 2: " + " currentThread is :" + Thread.currentThread().getName());
                    }
                })
                //指定下游執行緒
                .observeOn(AndroidSchedulers.mainThread())//主執行緒
                .doOnNext(new Consumer<Integer>() {
                    @Override
                    public void accept(Integer integer) throws Exception {
                        Log.i(TAG, "accept 3: " + " currentThread is :" + Thread.currentThread().getName());
                    }
                })
                .observeOn(Schedulers.io())
                .doOnNext(new Consumer<Integer>() {
                    @Override
                    public void accept(Integer integer) throws Exception {
                        Log.i(TAG, "accept 4: " +  " currentThread is :" + Thread.currentThread().getName());
                    }
                })
                .subscribe(new Consumer<Integer>() {
                               @Override
                               public void accept(Integer integer) throws Exception {
                                   Log.i(TAG, "Consumer accept: " + " currentThread is :" + Thread.currentThread().getName());
                               }
                           }
                );
    }

控制檯輸出:

subscribe: currentThread is :RxNewThreadScheduler-1

accept 1: currentThread is :RxNewThreadScheduler-1

accept 2: currentThread is :RxNewThreadScheduler-1

accept 3: currentThread is :main accept 4: currentThread is
:RxCachedThreadScheduler-2

Consumer accept: currentThread is :RxCachedThreadScheduler-2

  • subscribeOn(Scheduler scheduler)只能指定一次
  • observeOn(Scheduler scheduler)可指定多次,每呼叫一次observeOn() 執行緒便會切換一次

    內建執行緒:

    • Schedulers.io() 代表io操作的執行緒, 通常用於網路,讀寫檔案等io密集型的操作
    • Schedulers.computation() 代表CPU計算密集型的操作, 例如需要大量計算的操作
    • Schedulers.newThread() 代表一個常規的新執行緒
    • AndroidSchedulers.mainThread() 代表Android的主執行緒
Todo: DoOnNext()方法裡做資料比對和快取處理?