RxJava1.x中的subscribeOn,observeOn到底做了些什麼
我們先來舉個例子吧:
Observable
.create<String> {
Timber.i("create: ${Thread.currentThread().name}")
it.onNext("create")
}
.subscribeOn(
Schedulers.newThread()
)
.observeOn(AndroidSchedulers.mainThread()
)
.subscribe(
object : Action1<Any?> {
override fun call (t: Any?) {
Timber.i(" subscriber ${Thread.currentThread().name} onNext ${t?.toString()?: "null"}")
}
},
object : Action1<Throwable> {
override fun call(t: Throwable?) {
Timber.i(" subscriber ${Thread.currentThread().name} onError ${t?.message?: " null"}")
}
}
)
結果如下:
create: RxNewThreadScheduler-2
subscriber main onNext create
顯然create方法執行在子執行緒中,而subscriber接受到訊息是主執行緒執行的,那就來看看這流程到底是怎樣的呢
Observable.create:
public static <T> Observable<T> create(OnSubscribe<T> f) {
return new Observable<T>(f);
}
protected Observable(OnSubscribe<T> f) {
this.onSubscribe = f;
}
首先這個create方法,很明顯其實就是先new一個Observable物件(把它當作Observable1),然後把OnSubscribe f(把他當作OnSubscribe1)賦值給Observable1的OnSubscribe屬性,接著。我們來看subscribeOn方法
public final Observable<T> subscribeOn(Scheduler scheduler) {
return create(new OperatorSubscribeOn<T>(this, scheduler));
}
這裡可以看到這裡又執行了Create方法了,也就是這裡又new 一個Observable物件(把它當作Observable2),然後吧OperatorSubscribeOn(把他當作OnSubscribe2)賦值給Observable2的OnSubscribe屬性,需要注意的是這裡的OperatorSubscribeOn把Observable1和Scheduler做為OperatorSubscribeOn建構函式的引數,最後執行的Observable.subscribe方法其實就是執行OnSubscribe.call方法,該例子的結果其實就是Observable2的OnSubscribe的call的結果,我們知道Observable2的OnSubscribe也就是OperatorSubscribeOn這個例項,那我們來看看OperatorSubscribeOn的call方法:
public void call(final Subscriber<? super T> subscriber) {
final Worker inner = scheduler.createWorker();
subscriber.add(inner);
inner.schedule(new Action0() {
@Override
public void call() {
final Thread t = Thread.currentThread();
Subscriber<T> s = new Subscriber<T>(subscriber) {
@Override
public void onNext(T t) {
subscriber.onNext(t);
}
@Override
public void onError(Throwable e) {
try {
subscriber.onError(e);
} finally {
inner.unsubscribe();
}
}
@Override
public void onCompleted() {
try {
subscriber.onCompleted();
} finally {
inner.unsubscribe();
}
}
@Override
public void setProducer(final Producer p) {
subscriber.setProducer(new Producer() {
@Override
public void request(final long n) {
if (t == Thread.currentThread()) {
p.request(n);
} else {
inner.schedule(new Action0() {
@Override
public void call() {
p.request(n);
}
});
}
}
});
}
};
source.unsafeSubscribe(s);
}
});
}
我們的例子裡的scheduler是Schedulers.newThread(),根據原始碼慢慢分析,我們可以知道scheduler是NewThreadScheduler的例項 final Worker inner = scheduler.createWorker();
NewThreadScheduler:
public Worker createWorker() {
return new NewThreadWorker(threadFactory);
}
這裡的inner就是NewThreadWorker例項,接著inner.schedule(Action0:Action0)
NewThreadWorker:
public NewThreadWorker(ThreadFactory threadFactory) {
ScheduledExecutorService exec = Executors.newScheduledThreadPool(1, threadFactory);
// Java 7+: cancelled future tasks can be removed from the executor thus avoiding memory leak
boolean cancelSupported = tryEnableCancelPolicy(exec);
if (!cancelSupported && exec instanceof ScheduledThreadPoolExecutor) {
registerExecutor((ScheduledThreadPoolExecutor)exec);
}
executor = exec;
}
public Subscription schedule(final Action0 action) {
return schedule(action, 0, null);
}
public ScheduledAction scheduleActual(final Action0 action, long delayTime, TimeUnit unit) {
Action0 decoratedAction = RxJavaHooks.onScheduledAction(action);
ScheduledAction run = new ScheduledAction(decoratedAction);
Future<?> f;
if (delayTime <= 0) {
f = executor.submit(run);
} else {
f = executor.schedule(run, delayTime, unit);
}
run.add(f);
return run;
}
在NewThreadWorker建構函式中就建立了一個執行緒池,並賦值給了executor屬性,schedule方法也就是是這個執行緒池executor執行ScheduledAction,ScheduledAction是繼承Runnable
public void run() {
try {
lazySet(Thread.currentThread());
action.call();
} catch (OnErrorNotImplementedException e) {
signalError(new IllegalStateException("Exception thrown on Scheduler.Worker thread. Add `onError` handling.", e));
} catch (Throwable e) {
signalError(new IllegalStateException("Fatal Exception thrown on Scheduler.Worker thread.", e));
} finally {
unsubscribe();
}
}
也就是執行了Action0的call方法, source.unsafeSubscribe(s)也就是在這個執行緒池中執行的,這也就是執行緒切換的過程,souerce也就是Observable1
public final Subscription unsafeSubscribe(Subscriber<? super T> subscriber) {
try {
// new Subscriber so onStart it
subscriber.onStart();
// allow the hook to intercept and/or decorate
RxJavaHooks.onObservableStart(this, onSubscribe).call(subscriber);
return RxJavaHooks.onObservableReturn(subscriber);
} catch (Throwable e) {
// special handling for certain Throwable/Error/Exception types
Exceptions.throwIfFatal(e);
// if an unhandled error occurs executing the onSubscribe we will propagate it
try {
subscriber.onError(RxJavaHooks.onObservableError(e));
} catch (Throwable e2) {
Exceptions.throwIfFatal(e2);
// if this happens it means the onError itself failed (perhaps an invalid function implementation)
// so we are unable to propagate the error correctly and will just throw
RuntimeException r = new OnErrorFailedException("Error occurred attempting to subscribe [" + e.getMessage() + "] and then again while trying to pass to onError.", e2);
// TODO could the hook be the cause of the error in the on error handling.
RxJavaHooks.onObservableError(r);
// TODO why aren't we throwing the hook's return value.
throw r; // NOPMD
}
return Subscriptions.unsubscribed();
}
}
其實unsafeSubscribe這個方法也就是用Observable的onSubscribe去執行它的call方法,也就是onSubscribe1的call方法,也就是說subscribeOn方法改變的是Observable的onSubscribe的call方法所在的執行緒
接著。我們來看observeOn方法,最終執行的是:
public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
return lift(new OperatorObserveOn<T>(scheduler, delayError, bufferSize));
}
public final <R> Observable<R> lift(final Operator<? extends R, ? super T> operator) {
return create(new OnSubscribeLift<T, R>(onSubscribe, operator));
}
最後還是又執行了create方法了,Observable3和它的OnSubscribe3(OnSubscribeLift),OnSubscribeLift建構函式傳入的是Observable2的OnSubscribe2和OperatorObserveOn,它的subscribe方法執行的結果就是OnSubscribe3(OnSubscribeLift)的call方法
public void call(Subscriber<? super R> o) {
Subscriber<? super T> st = RxJavaHooks.onObservableLift(operator).call(o);
st.onStart();
parent.call(st);
}
先執行operator(OperatorObserveOn)的call方法,得到新的Subscriber,然後再執行OnSubscribe2.call方法
OperatorObserveOn:
public Subscriber<? super T> call(Subscriber<? super T> child) {
ObserveOnSubscriber<T> parent = new ObserveOnSubscriber<T>(scheduler, child, delayError, bufferSize);
parent.init();
return parent;
}
ObserveOnSubscriber這個類有個特點,就是onNext,onCompleted,onError中最後都會執行schedule這個方法
protected void schedule() {
if (counter.getAndIncrement() == 0) {
recursiveScheduler.schedule(this);
}
}
recursiveScheduler是在ObserveOnSubscriber建構函式中賦值的,也就是scheduler.createWorker();scheduler是從AndroidSchedulers.mainThread(),我們知道是LooperScheduler(Looper.getMainLooper())就行了
public Worker createWorker() {
return new HandlerWorker(handler);
}
public Subscription schedule(Action0 action, long delayTime, TimeUnit unit) {
if (unsubscribed) {
return Subscriptions.unsubscribed();
}
action = hook.onSchedule(action);
ScheduledAction scheduledAction = new ScheduledAction(action, handler);
Message message = Message.obtain(handler, scheduledAction);
message.obj = this; // Used as token for unsubscription operation.
handler.sendMessageDelayed(message, unit.toMillis(delayTime));
if (unsubscribed) {
handler.removeCallbacks(scheduledAction);
return Subscriptions.unsubscribed();
}
return scheduledAction;
}
ScheduledAction繼承Runnable,這裡用了Handler,用來切換執行緒
總結一下observeOn方法到底做了些什麼 把subscribe封裝到ObserveOnSubscriber,ObserveOnSubscriber的各個方法都觸發了ScheduledAction,這裡又會用到handler切換到主執行緒,observeOn上一個Observable2的onSubscribe2會處理ObserveOnSubscriber