RxJava2:observeOn和subscribeOn的使用
阿新 • • 發佈:2019-01-02
RxJava的好處大家都知道,能使程式碼邏輯結構看起來更清晰,當需要進行前後臺處理的時候,一般會進行observeOn和subscribeOn的呼叫,然而這2個方法的呼叫沒有那麼簡單:
observeOn:設定Observer觀察者在什麼執行緒執行;
subscribeOn:設定Observable被觀察者在什麼執行緒執行;
以上是最基本的使用,但是在使用的時候,呼叫的順序和次數都會有影響:
subscribeOn: subscribeOn 作用於該操作符之前的 Observable 的建立操符作以及 doOnSubscribe 操作符
observeOn:
上例項:
Observable<Integer> observable = Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
Log .d(TAG, "Observable thread is : " + Thread.currentThread().getName());
Log.d(TAG, "emit 1");
emitter.onNext(1);
}
});
//該類只接收next發出的事件
Consumer<Integer> consumer = new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.d(TAG, "Observer thread is :" + Thread.currentThread().getName());
Log.d(TAG, "onNext: " + integer);
}
};
observable
.doOnNext(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Thread.sleep(10000);
Log.d(TAG, "a thread is: " + Thread.currentThread().getName());
Log.d(TAG, "doOnNext a: " + integer);
}
})
//
.flatMap(new Function<Integer, ObservableSource<Integer>>() {
@Override
public ObservableSource<Integer> apply(@NonNull Integer integer) throws Exception {
Thread.sleep(10000);
Log.d(TAG, "c thread is: " + Thread.currentThread().getName());
Log.d(TAG, "doOnNext c: " + integer);
return Observable.just(integer);
}
})
.subscribeOn(Schedulers.newThread())
.doOnNext(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Thread.sleep(10000);
Log.d(TAG, "b thread is : " + Thread.currentThread().getName());
Log.d(TAG, "doOnNext b: " + integer);
}
})
.observeOn(AndroidSchedulers.mainThread())
.subscribe(consumer);
如果把observeOn的方法放到前面,可以看看各個方法所在的執行緒
Observable<Integer> observable = Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
Log.d(TAG, "Observable thread is : " + Thread.currentThread().getName());
Log.d(TAG, "emit 1");
emitter.onNext(1);
}
});
//該類只接收next發出的事件
Consumer<Integer> consumer = new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.d(TAG, "Observer thread is :" + Thread.currentThread().getName());
Log.d(TAG, "onNext: " + integer);
}
};
observable
.doOnNext(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Thread.sleep(10000);
Log.d(TAG, "a thread is: " + Thread.currentThread().getName());
Log.d(TAG, "doOnNext a: " + integer);
}
})
//
.flatMap(new Function<Integer, ObservableSource<Integer>>() {
@Override
public ObservableSource<Integer> apply(@NonNull Integer integer) throws Exception {
Thread.sleep(10000);
Log.d(TAG, "c thread is: " + Thread.currentThread().getName());
Log.d(TAG, "doOnNext c: " + integer);
return Observable.just(integer);
}
})
.subscribeOn(Schedulers.newThread())
.observeOn(AndroidSchedulers.mainThread())
.doOnNext(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Thread.sleep(10000);
Log.d(TAG, "b thread is : " + Thread.currentThread().getName());
Log.d(TAG, "doOnNext b: " + integer);
}
})
.subscribe(consumer);