Rxjava學習筆記(一)
RxJava 2.0
要在Android中使用RxJava2, 先新增Gradle配置:
compile ‘io.reactivex.rxjava2:rxjava:2.0.1’
compile ‘io.reactivex.rxjava2:rxandroid:2.0.1’
簡單使用姿勢:
//1.建立被觀察者
Observable<Integer> observable = Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> e) throws Exception {
e.onNext(1);
e.onNext(2);
e.onNext(3);
e.onNext(11);
e.onNext(8);
e.onComplete();
}
});
//2.建立觀察者
Observer<Integer> observer = new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
Log.i(TAG, "onSubscribe: ");
}
@Override
public void onNext(Integer value) {
Log.i(TAG, "onNext: " + value);
}
@Override
public void onError(Throwable e) {
Log.i(TAG, "onError: ");
}
@Override
public void onComplete() {
Log.i(TAG, "onComplete: ");
}
};
//3.觀察者 進入 被觀察者的訂閱
observable.subscribe(observer);
鏈式呼叫:
//鏈式呼叫
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> e) throws Exception {
e.onNext(1);
e.onNext(2);
e.onNext(3);
e.onNext(11);
e.onNext(8);
e.onComplete();
}
}).subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
Log.i(TAG, "onSubscribe: ");
}
@Override
public void onNext(Integer value) {
Log.i(TAG, "onNext: " + value);
}
@Override
public void onError(Throwable e) {
Log.i(TAG, "onError: ");
}
@Override
public void onComplete() {
Log.i(TAG, "onComplete: ");
}
}
);
執行結果:
12-29 22:05:25.654 8416-8416/tyl.rxjavap I/rxjavaTest: onSubscribe:
22:05:25.654 8416-8416/tyl.rxjavap I/rxjavaTest: onNext: 33 12-29
22:05:25.654 8416-8416/tyl.rxjavap I/rxjavaTest: onNext: 48 12-29
22:05:25.654 8416-8416/tyl.rxjavap I/rxjavaTest: onNext: 265 12-29
22:05:25.654 8416-8416/tyl.rxjavap I/rxjavaTest: onComplete:
執行順序onSubscribe->onNext->onComplete/onError;
- ObservableEmitter: Emitter是發射器的意思,那就很好猜了,這個就是用來發出事件的,它可以發出三種類型的事件,通過呼叫emitter的onNext(T value)、onComplete()和onError(Throwable error)就可以分別發出next事件、complete事件和error事件。
注意點:
- 上游可以傳送無限個onNext, 下游也可以接收無限個onNext.
- 當上遊傳送了一個onComplete後, 上游onComplete之後的事件將會繼續傳送, 而下游收到onComplete事件之後將不再繼續接收事件.
- 當上遊傳送了一個onError後, 上游onError之後的事件將繼續傳送, 而下游收到onError事件之後將不再繼續接收事件.
- 上游可以不傳送onComplete或onError.
- 最關鍵的是onComplete和onError必須唯一併且互斥(onComplete或onError之後下游不會繼續接收事件)
Disposable相當於RxJava1.x中的Subscription,用於解除訂閱(可理解為切斷水管,水依舊還在流,但是流不到下游)。
如果在請求的過程中Activity已經退出了, 這個時候如果回到主執行緒去更新UI, 那麼APP肯定就崩潰了。我可以呼叫Disposable的dispose()方法切斷水管, 使得下游收不到事件, 既然收不到事件, 那麼也就不會再去更新UI了. 因此我們可以在Activity中將這個Disposable 儲存起來, 當Activity退出時, 切斷它即可. 那如果有多個Disposable 該怎麼辦呢? RxJava中已經內建了一個容器CompositeDisposable, 每當我們得到一個Disposable時就呼叫CompositeDisposable.add()將它新增到容器中, 在退出的時候, 呼叫CompositeDisposable.clear() 即可切斷所有的水管.
執行緒控制
基本使用姿勢
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> e) throws Exception {
Log.i(TAG, "subscribe: currentThread is :" + Thread.currentThread().getName());
e.onNext(1);
e.onComplete();
}
}) // 指定上游執行緒
.subscribeOn(Schedulers.newThread())//新執行緒
.doOnNext(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.i(TAG, "accept 1: " + " currentThread is :" + Thread.currentThread().getName());
}
})
.subscribeOn(Schedulers.io())//io執行緒
.doOnNext(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.i(TAG, "accept 2: " + " currentThread is :" + Thread.currentThread().getName());
}
})
//指定下游執行緒
.observeOn(AndroidSchedulers.mainThread())//主執行緒
.doOnNext(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.i(TAG, "accept 3: " + " currentThread is :" + Thread.currentThread().getName());
}
})
.observeOn(Schedulers.io())
.doOnNext(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.i(TAG, "accept 4: " + " currentThread is :" + Thread.currentThread().getName());
}
})
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.i(TAG, "Consumer accept: " + " currentThread is :" + Thread.currentThread().getName());
}
}
);
}
控制檯輸出:
subscribe: currentThread is :RxNewThreadScheduler-1
accept 1: currentThread is :RxNewThreadScheduler-1
accept 2: currentThread is :RxNewThreadScheduler-1
accept 3: currentThread is :main accept 4: currentThread is
:RxCachedThreadScheduler-2Consumer accept: currentThread is :RxCachedThreadScheduler-2
- subscribeOn(Scheduler scheduler)只能指定一次
observeOn(Scheduler scheduler)可指定多次,每呼叫一次observeOn() 執行緒便會切換一次
內建執行緒:
- Schedulers.io() 代表io操作的執行緒, 通常用於網路,讀寫檔案等io密集型的操作
- Schedulers.computation() 代表CPU計算密集型的操作, 例如需要大量計算的操作
- Schedulers.newThread() 代表一個常規的新執行緒
- AndroidSchedulers.mainThread() 代表Android的主執行緒
Todo: DoOnNext()方法裡做資料比對和快取處理?