[RxJava學習]observeOn原始碼分析
阿新 • • 發佈:2019-01-11
上篇文章分析了subscribeOn的原始碼邏輯,它的實質就是把上游的Observable.onSubscribe.call(subscriber1)放到了指定的Scheduler執行緒中執行;本文要分析的observeOn,實質則是在Subscriber的onNext(T value)、onComplete()、onError()中另起了一個執行緒,在新執行緒裡將資料和通知發射給下游的Subscriber。
還是老辦法,將程式碼層層替換:
就等價於:observable1.observeOn(AndroidSchedulers.mainThread()) .subscribe(subscriber1);
scheduler = AndroidSchedulers.mainThread();
operator = new OperatorObserveOn<T>(scheduler, delayError, bufferSize);
observable2 = new Observable<R>(new OnSubscribeLift<T, R>(onSubscribe1, operator));
observable2.subscribe(subscriber1);
簡單替換下:
替換subscribe(),得到:operator = new OperatorObserveOn<T>(scheduler, delayError, bufferSize); onSubscribe2 = new OnSubscribeLift<T, R>(onSubscribe1, operator); observable2.subscribe(subscriber1);
operator = new OperatorObserveOn<T>(scheduler, delayError, bufferSize);
onSubscribe2 = new OnSubscribeLift<T, R>(onSubscribe1, operator);
subscriber1.onStart();
(observable2.onSubscribe).call(subscriber1);
再次簡單替換,得到:
替換最後一句,得到:operator = new OperatorObserveOn<T>(scheduler, delayError, bufferSize); onSubscribe2 = new OnSubscribeLift<T, R>(onSubscribe1, operator); subscriber1.onStart(); onSubscribe2.call(subscriber1);
operator = new OperatorObserveOn<T>(scheduler, delayError, bufferSize);
onSubscribe2 = new OnSubscribeLift<T, R>(onSubscribe1, operator);
subscriber1.onStart();
Subscriber<? super T> st = hook.onLift(operator).call(subscriber1);
st.onStart();
onSubscribe1.call(st);
替換operator.call得到:
operator = new OperatorObserveOn<T>(scheduler, delayError, bufferSize);
onSubscribe2 = new OnSubscribeLift<T, R>(onSubscribe1, operator);
subscriber1.onStart();
Subscriber<? super T> st =
{
ObserveOnSubscriber<T> parent = new ObserveOnSubscriber<T>(scheduler, subscriber1, delayError, bufferSize);
parent.init();
return parent;
}
st.onStart();
onSubscribe1.call(st);
至此,可以看到,當上遊的資料來源傳送資料時,會先發給中間偵聽者ObserveOnSubscriber<T> st;檢視ObserveOnSubscriber的程式碼可以看到,st的onNext(T value)、onComplete()、onError()方法最終都呼叫的是schedule(),在這個方法裡,把自身作為一個Action,通過Handler的方式去postdelay一個Runable物件,這個Runable物件就是ScheduledAction。當系統執行到這個訊息時,就會呼叫ScheduledAction的run方法,run方法裡的“action.call();”這裡的"action"就是物件ObserveOnSubscriber<T>
st;檢視ObserveOnSubscriber的call方法,可以看到最終呼叫的是"localChild.onNext(localOn.getValue(v));"這裡的“localChild”即subscriber1物件。
相關程式碼附錄如下:
// Observable
public final Observable<T> observeOn(Scheduler scheduler) {
return observeOn(scheduler, RxRingBuffer.SIZE);
}
其中RxRingBuffer.SIZE數值可設定,android平臺預設是16.
public final Observable<T> observeOn(Scheduler scheduler, int bufferSize) {
return observeOn(scheduler, false, bufferSize);
}
public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
if (this instanceof ScalarSynchronousObservable) {
return ((ScalarSynchronousObservable<T>)this).scalarScheduleOn(scheduler);
}
return lift(new OperatorObserveOn<T>(scheduler, delayError, bufferSize));
}
public final <R> Observable<R> lift(final Operator<? extends R, ? super T> operator) {
return new Observable<R>(new OnSubscribeLift<T, R>(onSubscribe, operator));
}
// OnSubscribeLift
public OnSubscribeLift(OnSubscribe<T> parent, Operator<? extends R, ? super T> operator) {
this.parent = parent;
this.operator = operator;
}
@Override
public void call(Subscriber<? super R> o) {
try {
Subscriber<? super T> st = hook.onLift(operator).call(o);
try {
// new Subscriber created and being subscribed with so 'onStart' it
st.onStart();
parent.call(st);
} catch (Throwable e) {
// localized capture of errors rather than it skipping all operators
// and ending up in the try/catch of the subscribe method which then
// prevents onErrorResumeNext and other similar approaches to error handling
Exceptions.throwIfFatal(e);
st.onError(e);
}
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
// if the lift function failed all we can do is pass the error to the final Subscriber
// as we don't have the operator available to us
o.onError(e);
}
}
// OperatorObserveOn
public Subscriber<? super T> call(Subscriber<? super T> child) {
if (scheduler instanceof ImmediateScheduler) {
// avoid overhead, execute directly
return child;
} else if (scheduler instanceof TrampolineScheduler) {
// avoid overhead, execute directly
return child;
} else {
ObserveOnSubscriber<T> parent = new ObserveOnSubscriber<T>(scheduler, child, delayError, bufferSize);
parent.init();
return parent;
}
}
// ObserveOnSubscriber
// static final class ObserveOnSubscriber<T> extends Subscriber<T> implements Action0
public ObserveOnSubscriber(Scheduler scheduler, Subscriber<? super T> child, boolean delayError, int bufferSize) {
this.child = child;
this.recursiveScheduler = scheduler.createWorker();
}
void init() {
// don't want this code in the constructor because `this` can escape through the
// setProducer call
Subscriber<? super T> localChild = child;
localChild.setProducer(new Producer() {
@Override
public void request(long n) {
if (n > 0L) {
BackpressureUtils.getAndAddRequest(requested, n);
schedule();
}
}
});
localChild.add(recursiveScheduler);
localChild.add(this);
}
protected void schedule() {
if (counter.getAndIncrement() == 0) {
recursiveScheduler.schedule(this);
}
}
@Override
public void onNext(final T t) {
if (isUnsubscribed() || finished) {
return;
}
if (!queue.offer(on.next(t))) {
onError(new MissingBackpressureException());
return;
}
schedule();
}
@Override
public void onCompleted() {
if (isUnsubscribed() || finished) {
return;
}
finished = true;
schedule();
}
@Override
public void onError(final Throwable e) {
if (isUnsubscribed() || finished) {
RxJavaPlugins.getInstance().getErrorHandler().handleError(e);
return;
}
error = e;
finished = true;
schedule();
}
public void call() {
for (;;) {
long requestAmount = requested.get();
while (requestAmount != currentEmission) {
localChild.onNext(localOn.getValue(v));
}
}
}
// AndroidSchedulers
public static Scheduler mainThread() {
return getInstance().mainThreadScheduler;
}
mainThreadScheduler = new LooperScheduler(Looper.getMainLooper());
// LooperScheduler
public Worker createWorker() {
return new HandlerWorker(handler);
}
// HandlerWorker
public Subscription schedule(Action0 action, long delayTime, TimeUnit unit) {
if (unsubscribed) {
return Subscriptions.unsubscribed();
}
action = hook.onSchedule(action);
ScheduledAction scheduledAction = new ScheduledAction(action, handler);
Message message = Message.obtain(handler, scheduledAction);
message.obj = this; // Used as token for unsubscription operation.
handler.sendMessageDelayed(message, unit.toMillis(delayTime));
if (unsubscribed) {
handler.removeCallbacks(scheduledAction);
return Subscriptions.unsubscribed();
}
return scheduledAction;
}
@Override
public Subscription schedule(final Action0 action) {
return schedule(action, 0, TimeUnit.MILLISECONDS);
}
// ScheduledAction
//final class ScheduledAction implements Runnable, Subscription
@Override public void run() {
try {
action.call();
} catch (Throwable e) {
// nothing to do but print a System error as this is fatal and there is nowhere else to throw this
IllegalStateException ie;
if (e instanceof OnErrorNotImplementedException) {
ie = new IllegalStateException("Exception thrown on Scheduler.Worker thread. Add `onError` handling.", e);
} else {
ie = new IllegalStateException("Fatal Exception thrown on Scheduler.Worker thread.", e);
}
RxJavaPlugins.getInstance().getErrorHandler().handleError(ie);
Thread thread = Thread.currentThread();
thread.getUncaughtExceptionHandler().uncaughtException(thread, ie);
}
}