1. 程式人生 > >[RxJava學習]observeOn原始碼分析

[RxJava學習]observeOn原始碼分析

  上篇文章分析了subscribeOn的原始碼邏輯,它的實質就是把上游的Observable.onSubscribe.call(subscriber1)放到了指定的Scheduler執行緒中執行;本文要分析的observeOn,實質則是在Subscriber的onNext(T value)、onComplete()、onError()中另起了一個執行緒,在新執行緒裡將資料和通知發射給下游的Subscriber。

  還是老辦法,將程式碼層層替換:

observable1.observeOn(AndroidSchedulers.mainThread())
.subscribe(subscriber1);
就等價於:
scheduler = AndroidSchedulers.mainThread();
operator = new OperatorObserveOn<T>(scheduler, delayError, bufferSize);
observable2 = new Observable<R>(new OnSubscribeLift<T, R>(onSubscribe1, operator));
observable2.subscribe(subscriber1);
簡單替換下:
operator = new OperatorObserveOn<T>(scheduler, delayError, bufferSize);
onSubscribe2 = new OnSubscribeLift<T, R>(onSubscribe1, operator);
observable2.subscribe(subscriber1);	
替換subscribe(),得到:
operator = new OperatorObserveOn<T>(scheduler, delayError, bufferSize);
onSubscribe2 = new OnSubscribeLift<T, R>(onSubscribe1, operator);

subscriber1.onStart();
(observable2.onSubscribe).call(subscriber1);
再次簡單替換,得到:
operator = new OperatorObserveOn<T>(scheduler, delayError, bufferSize);
onSubscribe2 = new OnSubscribeLift<T, R>(onSubscribe1, operator);
subscriber1.onStart();

onSubscribe2.call(subscriber1);
替換最後一句,得到:
operator = new OperatorObserveOn<T>(scheduler, delayError, bufferSize);
onSubscribe2 = new OnSubscribeLift<T, R>(onSubscribe1, operator);
subscriber1.onStart();

Subscriber<? super T> st = hook.onLift(operator).call(subscriber1);
st.onStart();
onSubscribe1.call(st);
替換operator.call得到:
operator = new OperatorObserveOn<T>(scheduler, delayError, bufferSize);
onSubscribe2 = new OnSubscribeLift<T, R>(onSubscribe1, operator);
subscriber1.onStart();

Subscriber<? super T> st = 
	{
		ObserveOnSubscriber<T> parent = new ObserveOnSubscriber<T>(scheduler, subscriber1, delayError, bufferSize);
		parent.init();
		return parent;
	}

st.onStart();
onSubscribe1.call(st);	
至此,可以看到,當上遊的資料來源傳送資料時,會先發給中間偵聽者ObserveOnSubscriber<T> st;檢視ObserveOnSubscriber的程式碼可以看到,st的onNext(T value)、onComplete()、onError()方法最終都呼叫的是schedule(),在這個方法裡,把自身作為一個Action,通過Handler的方式去postdelay一個Runable物件,這個Runable物件就是ScheduledAction。當系統執行到這個訊息時,就會呼叫ScheduledAction的run方法,run方法裡的“action.call();”這裡的"action"就是物件ObserveOnSubscriber<T> st;檢視ObserveOnSubscriber的call方法,可以看到最終呼叫的是"localChild.onNext(localOn.getValue(v));"這裡的“localChild”即subscriber1物件。

相關程式碼附錄如下:

// Observable
    public final Observable<T> observeOn(Scheduler scheduler) {
        return observeOn(scheduler, RxRingBuffer.SIZE);
    }
	其中RxRingBuffer.SIZE數值可設定,android平臺預設是16.
	
	    public final Observable<T> observeOn(Scheduler scheduler, int bufferSize) {
        return observeOn(scheduler, false, bufferSize);
    }
	
	    public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
        if (this instanceof ScalarSynchronousObservable) {
            return ((ScalarSynchronousObservable<T>)this).scalarScheduleOn(scheduler);
        }
        return lift(new OperatorObserveOn<T>(scheduler, delayError, bufferSize));
    }
	
	    public final <R> Observable<R> lift(final Operator<? extends R, ? super T> operator) {
        return new Observable<R>(new OnSubscribeLift<T, R>(onSubscribe, operator));
    }

// OnSubscribeLift	
	    public OnSubscribeLift(OnSubscribe<T> parent, Operator<? extends R, ? super T> operator) {
        this.parent = parent;
        this.operator = operator;
    }

    @Override
    public void call(Subscriber<? super R> o) {
        try {
            Subscriber<? super T> st = hook.onLift(operator).call(o);
            try {
                // new Subscriber created and being subscribed with so 'onStart' it
                st.onStart();
                parent.call(st);
            } catch (Throwable e) {
                // localized capture of errors rather than it skipping all operators 
                // and ending up in the try/catch of the subscribe method which then
                // prevents onErrorResumeNext and other similar approaches to error handling
                Exceptions.throwIfFatal(e);
                st.onError(e);
            }
        } catch (Throwable e) {
            Exceptions.throwIfFatal(e);
            // if the lift function failed all we can do is pass the error to the final Subscriber
            // as we don't have the operator available to us
            o.onError(e);
        }
    }

// OperatorObserveOn
    public Subscriber<? super T> call(Subscriber<? super T> child) {
        if (scheduler instanceof ImmediateScheduler) {
            // avoid overhead, execute directly
            return child;
        } else if (scheduler instanceof TrampolineScheduler) {
            // avoid overhead, execute directly
            return child;
        } else {
            ObserveOnSubscriber<T> parent = new ObserveOnSubscriber<T>(scheduler, child, delayError, bufferSize);
            parent.init();
            return parent;
        }
    }	

// ObserveOnSubscriber
// static final class ObserveOnSubscriber<T> extends Subscriber<T> implements Action0
	public ObserveOnSubscriber(Scheduler scheduler, Subscriber<? super T> child, boolean delayError, int bufferSize) {
		this.child = child;
		this.recursiveScheduler = scheduler.createWorker();
	}
		
	void init() {
		// don't want this code in the constructor because `this` can escape through the 
		// setProducer call
		Subscriber<? super T> localChild = child;
		
		localChild.setProducer(new Producer() {

			@Override
			public void request(long n) {
				if (n > 0L) {
					BackpressureUtils.getAndAddRequest(requested, n);
					schedule();
				}
			}

		});
		localChild.add(recursiveScheduler);
		localChild.add(this);
	}

	protected void schedule() {
		if (counter.getAndIncrement() == 0) {
			recursiveScheduler.schedule(this);
		}
	}
		
	@Override
	public void onNext(final T t) {
		if (isUnsubscribed() || finished) {
			return;
		}
		if (!queue.offer(on.next(t))) {
			onError(new MissingBackpressureException());
			return;
		}
		schedule();
	}
	
	@Override
	public void onCompleted() {
		if (isUnsubscribed() || finished) {
			return;
		}
		finished = true;
		schedule();
	}

	@Override
	public void onError(final Throwable e) {
		if (isUnsubscribed() || finished) {
			RxJavaPlugins.getInstance().getErrorHandler().handleError(e);
			return;
		}
		error = e;
		finished = true;
		schedule();
	}

	public void call() {
		for (;;) {
			long requestAmount = requested.get();
			
			while (requestAmount != currentEmission) {
				
				localChild.onNext(localOn.getValue(v));
			}
		}
	}

// AndroidSchedulers
    public static Scheduler mainThread() {
        return getInstance().mainThreadScheduler;
    }
    mainThreadScheduler = new LooperScheduler(Looper.getMainLooper());

// LooperScheduler
    public Worker createWorker() {
        return new HandlerWorker(handler);
    }

// HandlerWorker
	public Subscription schedule(Action0 action, long delayTime, TimeUnit unit) {
		if (unsubscribed) {
			return Subscriptions.unsubscribed();
		}

		action = hook.onSchedule(action);

		ScheduledAction scheduledAction = new ScheduledAction(action, handler);

		Message message = Message.obtain(handler, scheduledAction);
		message.obj = this; // Used as token for unsubscription operation.

		handler.sendMessageDelayed(message, unit.toMillis(delayTime));

		if (unsubscribed) {
			handler.removeCallbacks(scheduledAction);
			return Subscriptions.unsubscribed();
		}

		return scheduledAction;
	}	
	
	@Override
	public Subscription schedule(final Action0 action) {
		return schedule(action, 0, TimeUnit.MILLISECONDS);
	}

// ScheduledAction
//final class ScheduledAction implements Runnable, Subscription

	@Override public void run() {
		try {
			action.call();
		} catch (Throwable e) {
			// nothing to do but print a System error as this is fatal and there is nowhere else to throw this
			IllegalStateException ie;
			if (e instanceof OnErrorNotImplementedException) {
				ie = new IllegalStateException("Exception thrown on Scheduler.Worker thread. Add `onError` handling.", e);
			} else {
				ie = new IllegalStateException("Fatal Exception thrown on Scheduler.Worker thread.", e);
			}
			RxJavaPlugins.getInstance().getErrorHandler().handleError(ie);
			Thread thread = Thread.currentThread();
			thread.getUncaughtExceptionHandler().uncaughtException(thread, ie);
		}
	}