1. 程式人生 > >Rxjava學習筆記

Rxjava學習筆記

基礎

RxJava最核心的東西就是Observable和Observer。Observable會發出資料,而與之相對的Observer則會通過訂閱Observable來進行觀察。

Observer可以在Observable發出資料、報錯或者宣告沒有資料可以傳送時進行相應的操作。這三個操作被封裝在Observer介面中,相應的方法為onNext(),onError()和onCompleted()。

Observable<List<String>> listObservable = Observable.just(getColorList());
listObservable.subscribe(new
Observer<List<String>>() { @Override public void onCompleted() { } @Override public void onError(Throwable e) { } @Override public void onNext(List<String> colors) { mSimpleStringAdapter.setStrings(colors); } });

注意:只有當subscribe()訂閱了Observable

之後才會執行onNext()方法;如果不再有資料可以傳送(我們在Observable.just()中只讓Observable傳送一個數據),onComplete()方法會被呼叫。

多使用Observable.fromCallable

使用Observable.fromCallable的好處
- 獲取傳送資料的程式碼只會在有Observer訂閱之後執行
- 獲取資料的程式碼可以在子執行緒中執行.

每當Observer訂閱Observable時就會生成一個Subscription物件。一個Subscription代表了一個Observer與Observable之間的連線。使用它可以在onDestory方法中解除訂閱.

使用Single

Single是Observable的一個精簡版,它的回撥方法是onSuccess/onError,它使用的是SingleSubscriber來訂閱.

Single<List<String>> tvShowSingle = Single.fromCallable(new Callable<List<String>>() { 
    @Override
    public List<String> call() throws Exception {
        mRestClient.getFavoriteTvShows(); 
    }
});

然後訂閱一下:

mTvShowSubscription = tvShowSingle
    .subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(new SingleSubscriber<List<String>>() {

        @Override 
        public void onSuccess(List<String> tvShows) {
            displayTvShows(tvShows); 
        }

        @Override 
        public void onError(Throwable error) {
            displayErrorMessage(); 
        } 
    });

Subjects

Subject這個物件既是Observable又是Observer,我會把Subject想象成一個管道:從一端把資料注入,結果就會從另一端輸出。

mCounterEmitter = PublishSubject.create(); 
mCounterEmitter.subscribe(new Observer<Integer>() {

    @Override
    public void onCompleted() { } 

    @Override
    public void onError(Throwable e) { } 

    @Override
    public void onNext(Integer integer) { 
        mCounterDisplay.setText(String.valueOf(integer));
    } 
});

mIncrementButton.setOnClickListener(new View.OnClickListener() {

    @Override 
    public void onClick(View v) { 
        mCounter++;
        mCounterEmitter.onNext(mCounter);
    }
});

如上,它既可以subcribe()從管道輸出 又可以 onNext()向管道輸入
注意:如果一個Observer訂閱了(subscribe)Observable,那麼這個Observable就不能再被其他的Observer訂閱. 但是PublishSubject就不存在這種情況.

deboundce與throttleFirst的區別

  • debounce(400, TimeUnit.MILLISECONDS) 當沒有資料傳入達到400ms之後,才去傳送資料
  • throttleFirst(400, TimeUnit.MILLISECONDS) 在每一個400ms內,如果有資料傳入就傳送.且每個400ms內只發送一次或零次資料.

take(1) 與 first

這兩個都代表只發送資料列表的第一個資料. 但是當列表是空的時候如: Observable.just(“”).isEmpty(),這時候take(1)不會crash,而first不會.

subscribeOn 與 observeOn

  • observeOn 指定的是訂閱者所在的執行緒
  • subscribeOn 指定的是被訂閱者所在的執行緒, 但是它可以切換多次.
    如下所示:
 Observable.just("abs").subscribeOn(Schedulers.io())
                //執行在io執行緒
                .map(new Func1<String, String>() {
                    @Override
                    public String call(String s) {

                        return s + "aaaa";
                    }
                }).subscribeOn(Schedulers.newThread())
                //執行在newThread
                .flatMap(new Func1<String, Observable<MovieEntity>>() {
                    @Override
                    public Observable<MovieEntity> call(String o) {
                        MovieEntity movieEntity = new MovieEntity();
                        movieEntity.name = o;
                        return Observable.just(movieEntity);
                    }
                }).observeOn(AndroidSchedulers.mainThread())
                //執行在activity mainThread
                .subscribe(new Observer<MovieEntity>() {
                    @Override
                    public void onCompleted() {

                    }

                    @Override
                    public void onError(Throwable e) {

                    }

                    @Override
                    public void onNext(MovieEntity movieEntity) {

                    }
                });