1. 程式人生 > >RxJava1.x中的subscribeOn,observeOn到底做了些什麼

RxJava1.x中的subscribeOn,observeOn到底做了些什麼

我們先來舉個例子吧:

Observable
.create<String> {
                Timber.i("create:  ${Thread.currentThread().name}")
                it.onNext("create")
            }
.subscribeOn(
    Schedulers.newThread()
    )
.observeOn(AndroidSchedulers.mainThread()
    )
.subscribe(
    object : Action1<Any?> {
        override fun call
(t: Any?) { Timber.i(" subscriber ${Thread.currentThread().name} onNext ${t?.toString()?: "null"}") } }, object : Action1<Throwable> { override fun call(t: Throwable?) { Timber.i(" subscriber ${Thread.currentThread().name} onError ${t?.message?: "
null"}") } } )

結果如下:

create:  RxNewThreadScheduler-2
subscriber  main  onNext create

顯然create方法執行在子執行緒中,而subscriber接受到訊息是主執行緒執行的,那就來看看這流程到底是怎樣的呢

Observable.create:

 public static <T> Observable<T> create(OnSubscribe<T> f) {
        return new Observable<T>(f);
    }
protected
Observable(OnSubscribe<T> f) { this.onSubscribe = f; }

首先這個create方法,很明顯其實就是先new一個Observable物件(把它當作Observable1),然後把OnSubscribe f(把他當作OnSubscribe1)賦值給Observable1的OnSubscribe屬性,接著。我們來看subscribeOn方法

 public final Observable<T> subscribeOn(Scheduler scheduler) {

        return create(new OperatorSubscribeOn<T>(this, scheduler));
    }

這裡可以看到這裡又執行了Create方法了,也就是這裡又new 一個Observable物件(把它當作Observable2),然後吧OperatorSubscribeOn(把他當作OnSubscribe2)賦值給Observable2的OnSubscribe屬性,需要注意的是這裡的OperatorSubscribeOn把Observable1和Scheduler做為OperatorSubscribeOn建構函式的引數,最後執行的Observable.subscribe方法其實就是執行OnSubscribe.call方法,該例子的結果其實就是Observable2的OnSubscribe的call的結果,我們知道Observable2的OnSubscribe也就是OperatorSubscribeOn這個例項,那我們來看看OperatorSubscribeOn的call方法:

public void call(final Subscriber<? super T> subscriber) {
        final Worker inner = scheduler.createWorker();
        subscriber.add(inner);

        inner.schedule(new Action0() {
            @Override
            public void call() {
                final Thread t = Thread.currentThread();

                Subscriber<T> s = new Subscriber<T>(subscriber) {
                    @Override
                    public void onNext(T t) {
                        subscriber.onNext(t);
                    }

                    @Override
                    public void onError(Throwable e) {
                        try {
                            subscriber.onError(e);
                        } finally {
                            inner.unsubscribe();
                        }
                    }

                    @Override
                    public void onCompleted() {
                        try {
                            subscriber.onCompleted();
                        } finally {
                            inner.unsubscribe();
                        }
                    }

                    @Override
                    public void setProducer(final Producer p) {
                        subscriber.setProducer(new Producer() {
                            @Override
                            public void request(final long n) {
                                if (t == Thread.currentThread()) {
                                    p.request(n);
                                } else {
                                    inner.schedule(new Action0() {
                                        @Override
                                        public void call() {
                                            p.request(n);
                                        }
                                    });
                                }
                            }
                        });
                    }
                };

                source.unsafeSubscribe(s);
            }
        });
    }

我們的例子裡的scheduler是Schedulers.newThread(),根據原始碼慢慢分析,我們可以知道scheduler是NewThreadScheduler的例項 final Worker inner = scheduler.createWorker();

NewThreadScheduler:
public Worker createWorker() {
        return new NewThreadWorker(threadFactory);
    }

這裡的inner就是NewThreadWorker例項,接著inner.schedule(Action0:Action0)

NewThreadWorker:
public NewThreadWorker(ThreadFactory threadFactory) {
        ScheduledExecutorService exec = Executors.newScheduledThreadPool(1, threadFactory);
        // Java 7+: cancelled future tasks can be removed from the executor thus avoiding memory leak
        boolean cancelSupported = tryEnableCancelPolicy(exec);
        if (!cancelSupported && exec instanceof ScheduledThreadPoolExecutor) {
            registerExecutor((ScheduledThreadPoolExecutor)exec);
        }
        executor = exec;
    }

public Subscription schedule(final Action0 action) {
        return schedule(action, 0, null);
    }


public ScheduledAction scheduleActual(final Action0 action, long delayTime, TimeUnit unit) {
        Action0 decoratedAction = RxJavaHooks.onScheduledAction(action);
        ScheduledAction run = new ScheduledAction(decoratedAction);
        Future<?> f;
        if (delayTime <= 0) {
            f = executor.submit(run);
        } else {
            f = executor.schedule(run, delayTime, unit);
        }
        run.add(f);

        return run;
    }

在NewThreadWorker建構函式中就建立了一個執行緒池,並賦值給了executor屬性,schedule方法也就是是這個執行緒池executor執行ScheduledAction,ScheduledAction是繼承Runnable

 public void run() {
        try {
            lazySet(Thread.currentThread());
            action.call();
        } catch (OnErrorNotImplementedException e) {
            signalError(new IllegalStateException("Exception thrown on Scheduler.Worker thread. Add `onError` handling.", e));
        } catch (Throwable e) {
            signalError(new IllegalStateException("Fatal Exception thrown on Scheduler.Worker thread.", e));
        } finally {
            unsubscribe();
        }
    }

也就是執行了Action0的call方法, source.unsafeSubscribe(s)也就是在這個執行緒池中執行的,這也就是執行緒切換的過程,souerce也就是Observable1

public final Subscription unsafeSubscribe(Subscriber<? super T> subscriber) {
        try {
            // new Subscriber so onStart it
            subscriber.onStart();
            // allow the hook to intercept and/or decorate
            RxJavaHooks.onObservableStart(this, onSubscribe).call(subscriber);
            return RxJavaHooks.onObservableReturn(subscriber);
        } catch (Throwable e) {
            // special handling for certain Throwable/Error/Exception types
            Exceptions.throwIfFatal(e);
            // if an unhandled error occurs executing the onSubscribe we will propagate it
            try {
                subscriber.onError(RxJavaHooks.onObservableError(e));
            } catch (Throwable e2) {
                Exceptions.throwIfFatal(e2);
                // if this happens it means the onError itself failed (perhaps an invalid function implementation)
                // so we are unable to propagate the error correctly and will just throw
                RuntimeException r = new OnErrorFailedException("Error occurred attempting to subscribe [" + e.getMessage() + "] and then again while trying to pass to onError.", e2);
                // TODO could the hook be the cause of the error in the on error handling.
                RxJavaHooks.onObservableError(r);
                // TODO why aren't we throwing the hook's return value.
                throw r; // NOPMD
            }
            return Subscriptions.unsubscribed();
        }
    }

其實unsafeSubscribe這個方法也就是用Observable的onSubscribe去執行它的call方法,也就是onSubscribe1的call方法,也就是說subscribeOn方法改變的是Observable的onSubscribe的call方法所在的執行緒

接著。我們來看observeOn方法,最終執行的是:

public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
   return lift(new OperatorObserveOn<T>(scheduler, delayError, bufferSize));
}

 public final <R> Observable<R> lift(final Operator<? extends R, ? super T> operator) {
        return create(new OnSubscribeLift<T, R>(onSubscribe, operator));
    }

最後還是又執行了create方法了,Observable3和它的OnSubscribe3(OnSubscribeLift),OnSubscribeLift建構函式傳入的是Observable2的OnSubscribe2和OperatorObserveOn,它的subscribe方法執行的結果就是OnSubscribe3(OnSubscribeLift)的call方法

public void call(Subscriber<? super R> o) {
    Subscriber<? super T> st = RxJavaHooks.onObservableLift(operator).call(o);
        st.onStart();
        parent.call(st);
    }

先執行operator(OperatorObserveOn)的call方法,得到新的Subscriber,然後再執行OnSubscribe2.call方法

OperatorObserveOn:
 public Subscriber<? super T> call(Subscriber<? super T> child) {
    ObserveOnSubscriber<T> parent = new ObserveOnSubscriber<T>(scheduler, child, delayError, bufferSize);
    parent.init();
    return parent;
    }

ObserveOnSubscriber這個類有個特點,就是onNext,onCompleted,onError中最後都會執行schedule這個方法

 protected void schedule() {
    if (counter.getAndIncrement() == 0) {
        recursiveScheduler.schedule(this);
    }
}

recursiveScheduler是在ObserveOnSubscriber建構函式中賦值的,也就是scheduler.createWorker();scheduler是從AndroidSchedulers.mainThread(),我們知道是LooperScheduler(Looper.getMainLooper())就行了

public Worker createWorker() {
    return new HandlerWorker(handler);
}

 public Subscription schedule(Action0 action, long delayTime, TimeUnit unit) {
            if (unsubscribed) {
                return Subscriptions.unsubscribed();
            }

            action = hook.onSchedule(action);

            ScheduledAction scheduledAction = new ScheduledAction(action, handler);

            Message message = Message.obtain(handler, scheduledAction);
            message.obj = this; // Used as token for unsubscription operation.

            handler.sendMessageDelayed(message, unit.toMillis(delayTime));

            if (unsubscribed) {
                handler.removeCallbacks(scheduledAction);
                return Subscriptions.unsubscribed();
            }

            return scheduledAction;
        }

ScheduledAction繼承Runnable,這裡用了Handler,用來切換執行緒

總結一下observeOn方法到底做了些什麼 把subscribe封裝到ObserveOnSubscriber,ObserveOnSubscriber的各個方法都觸發了ScheduledAction,這裡又會用到handler切換到主執行緒,observeOn上一個Observable2的onSubscribe2會處理ObserveOnSubscriber