1. 程式人生 > >rxJava學習筆記之observeOn

rxJava學習筆記之observeOn

一段最簡單的程式碼

 longOpeSubscription = Observable.create(new Observable.OnSubscribe<String>() {
            @Override
            public void call(Subscriber<? super String> subscriber) {
                printLog("onStart in OnSubscribe");
                subscriber.onStart();
                int
N = data.length; for (int i = 0; i < N; i++) { dosomethingBlockThread(); printLog("onNext" + data[i] + " in OnSubscribe"); subscriber.onNext(data[i]); } printLog("OnCompleted in OnSubscribe"
); subscriber.onCompleted(); } }) .observeOn(AndroidSchedulers.mainThread()) .subscribe(new Subscriber<String>() { @Override public void onCompleted() { printLog("OnCompleted in Subscriber"
); // mProgressOperationRunning.setVisibility(View.INVISIBLE); } @Override public void onError(Throwable e) { printLog("onError in Subscriber"); // mProgressOperationRunning.setVisibility(View.INVISIBLE); } @Override public void onNext(String s) { printLog("onNext " + s + " in Subscriber"); } });

和前兩篇同樣的分析,observeOn最終是通過lift變換來實現的,這裡具體的物件是onSubscribeLift,該物件儲存第一次的onSubscribe和operator,operator在這裡具體是OperatorObserveOn物件。subscribe會呼叫observeOn中生成的obervable中的onSubscribe物件中call方法也就是onSubscribeLift的call方法。

 @Override
    public void call(Subscriber<? super R> o) {
        try {
            Subscriber<? super T> st = hook.onLift(operator).call(o);
            try {
                // new Subscriber created and being subscribed with so 'onStart' it
                st.onStart();
                parent.call(st);
            } catch (Throwable e) {
                // localized capture of errors rather than it skipping all operators 
                // and ending up in the try/catch of the subscribe method which then
                // prevents onErrorResumeNext and other similar approaches to error handling
                Exceptions.throwIfFatal(e);
                st.onError(e);
            }
        } catch (Throwable e) {
            Exceptions.throwIfFatal(e);
            // if the lift function failed all we can do is pass the error to the final Subscriber
            // as we don't have the operator available to us
            o.onError(e);
        }
    }

這裡首先執行operator的call方法,這裡的o也就是下面的child就是最終的subscriber,進入OperatorObserveOn中call中

@Override
    public Subscriber<? super T> call(Subscriber<? super T> child) {
        if (scheduler instanceof ImmediateScheduler) {
            // avoid overhead, execute directly
            return child;
        } else if (scheduler instanceof TrampolineScheduler) {
            // avoid overhead, execute directly
            return child;
        } else {
            ObserveOnSubscriber<T> parent = new ObserveOnSubscriber<T>(scheduler, child, delayError, bufferSize);
            parent.init();
            return parent;
        }
    }

這裡最終會返回一個parent物件,也就是ObserveOnSubscriber,其中的scheduler就是我們寫的ndroidSchedulers.mainThread()。分析到這裡我們回到onSubscribeLift的call方法中的 parent.call(st)這行程式碼,這裡的parent就是create中傳入的onSubscribe,這裡我們再次呼叫了st的onNext方法,我們再次跳轉到OperatorObserveOn.onNext中:

 @Override
        public void onNext(final T t) {
            if (isUnsubscribed() || finished) {
                return;
            }
            if (!queue.offer(on.next(t))) {
                onError(new MissingBackpressureException());
                return;
            }
            schedule();
        }

注意這裡if (!queue.offer(on.next(t))),會將值傳入queue中。的最終會進入 recursiveScheduler.schedule(this);這個recursiveScheduler又是什麼呢,看原始碼得知這個是由AndroidSchedulers.mainThread().createWorker()產生的,我們再次尋找,最終recursiveScheduler定位到一個HandlerWorker物件,我們看看HandlerWorker的schedule方法

@Override
        public Subscription schedule(Action0 action, long delayTime, TimeUnit unit) {
            if (unsubscribed) {
                return Subscriptions.unsubscribed();
            }

            action = hook.onSchedule(action);

            ScheduledAction scheduledAction = new ScheduledAction(action, handler);

            Message message = Message.obtain(handler, scheduledAction);
            message.obj = this; // Used as token for unsubscription operation.

            handler.sendMessageDelayed(message, unit.toMillis(delayTime));

            if (unsubscribed) {
                handler.removeCallbacks(scheduledAction);
                return Subscriptions.unsubscribed();
            }

            return scheduledAction;
        }

有沒有很熟悉,這個handler自然是使用主執行緒looper的handler了,scheduledAction的run最終會呼叫action的call,這是一個無參方法,定位到OperatorObserveOn的call,呼叫最終的subscripter的onNext方法。oberservOn分析完畢