Rxjava學習筆記
基礎
RxJava最核心的東西就是Observable和Observer。Observable會發出資料,而與之相對的Observer則會通過訂閱Observable來進行觀察。
Observer可以在Observable發出資料、報錯或者宣告沒有資料可以傳送時進行相應的操作。這三個操作被封裝在Observer介面中,相應的方法為onNext(),onError()和onCompleted()。
Observable<List<String>> listObservable = Observable.just(getColorList());
listObservable.subscribe(new Observer<List<String>>() {
@Override
public void onCompleted() { }
@Override
public void onError(Throwable e) { }
@Override
public void onNext(List<String> colors) {
mSimpleStringAdapter.setStrings(colors);
}
});
注意:只有當subscribe()訂閱了Observable
多使用Observable.fromCallable
使用Observable.fromCallable的好處
- 獲取傳送資料的程式碼只會在有Observer訂閱之後執行
- 獲取資料的程式碼可以在子執行緒中執行.
每當Observer訂閱Observable時就會生成一個Subscription物件。一個Subscription代表了一個Observer與Observable之間的連線。使用它可以在onDestory方法中解除訂閱.
使用Single
Single是Observable的一個精簡版,它的回撥方法是onSuccess/onError,它使用的是SingleSubscriber來訂閱.
Single<List<String>> tvShowSingle = Single.fromCallable(new Callable<List<String>>() {
@Override
public List<String> call() throws Exception {
mRestClient.getFavoriteTvShows();
}
});
然後訂閱一下:
mTvShowSubscription = tvShowSingle
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new SingleSubscriber<List<String>>() {
@Override
public void onSuccess(List<String> tvShows) {
displayTvShows(tvShows);
}
@Override
public void onError(Throwable error) {
displayErrorMessage();
}
});
Subjects
Subject這個物件既是Observable又是Observer,我會把Subject想象成一個管道:從一端把資料注入,結果就會從另一端輸出。
mCounterEmitter = PublishSubject.create();
mCounterEmitter.subscribe(new Observer<Integer>() {
@Override
public void onCompleted() { }
@Override
public void onError(Throwable e) { }
@Override
public void onNext(Integer integer) {
mCounterDisplay.setText(String.valueOf(integer));
}
});
mIncrementButton.setOnClickListener(new View.OnClickListener() {
@Override
public void onClick(View v) {
mCounter++;
mCounterEmitter.onNext(mCounter);
}
});
如上,它既可以subcribe()從管道輸出 又可以 onNext()向管道輸入
注意:如果一個Observer訂閱了(subscribe)Observable,那麼這個Observable就不能再被其他的Observer訂閱. 但是PublishSubject就不存在這種情況.
deboundce與throttleFirst的區別
- debounce(400, TimeUnit.MILLISECONDS) 當沒有資料傳入達到400ms之後,才去傳送資料
- throttleFirst(400, TimeUnit.MILLISECONDS) 在每一個400ms內,如果有資料傳入就傳送.且每個400ms內只發送一次或零次資料.
take(1) 與 first
這兩個都代表只發送資料列表的第一個資料. 但是當列表是空的時候如: Observable.just(“”).isEmpty(),這時候take(1)不會crash,而first不會.
subscribeOn 與 observeOn
- observeOn 指定的是訂閱者所在的執行緒
- subscribeOn 指定的是被訂閱者所在的執行緒, 但是它可以切換多次.
如下所示:
Observable.just("abs").subscribeOn(Schedulers.io())
//執行在io執行緒
.map(new Func1<String, String>() {
@Override
public String call(String s) {
return s + "aaaa";
}
}).subscribeOn(Schedulers.newThread())
//執行在newThread
.flatMap(new Func1<String, Observable<MovieEntity>>() {
@Override
public Observable<MovieEntity> call(String o) {
MovieEntity movieEntity = new MovieEntity();
movieEntity.name = o;
return Observable.just(movieEntity);
}
}).observeOn(AndroidSchedulers.mainThread())
//執行在activity mainThread
.subscribe(new Observer<MovieEntity>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(MovieEntity movieEntity) {
}
});