1. 程式人生 > >RxJava2原始碼解析——基本流程、執行緒排程

RxJava2原始碼解析——基本流程、執行緒排程

本篇文章的目的: ①瞭解RxJava的基本流程 ②瞭解RxJava中執行緒排程的實現 ③瞭解了上面那些,其他的操作符對你來說就不是問題了

RxJava基本流程

我們從基本的使用作為入口:

Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> e) throws Exception {
                e.onNext("hey");
                e.onComplete();
            }
        }).subscribe(new Observer<String>() {
            @Override
            public void onSubscribe(Disposable d) {
                Log.d(TAG, "onSubscribe() : d = " + d );
            }

            @Override
            public void onNext(String value) {
                Log.d(TAG, "onNext() : value = " + value );
            } 

            @Override
            public void onError(Throwable e) {
                Log.d(TAG, "onError() : e = " + e );
            }

            @Override
            public void onComplete() {
                Log.d(TAG, "onComplete()");
            }
        });

我們以create方法作為入口,它接受的引數是一個ObservableOnSubscribe物件,ObservableOnSubscribe是一個介面,裡面只有一個subscribe方法我們需要實現:

public interface ObservableOnSubscribe<T> {
    void subscribe(@NonNull ObservableEmitter<T> e) throws Exception;
}

create方法接受一個ObservableOnSubscribe物件:

    public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
        ObjectHelper.requireNonNull(source, "source is null");
        return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
    }

這裡呼叫RxJavaPlugins.onAssembly方法,裡面呼叫相關的hook方法,這裡不詳細講,我們只要知道它返回了原物件。

    public static <T> Observable<T> onAssembly(@NonNull Observable<T> source) {
        Function<? super Observable, ? extends Observable> f = onObservableAssembly;
        if (f != null) {
            return apply(f, source);
        }
        return source;
    }

也就是說create方法返回了一個ObservableCreate物件,它繼承自Observable:

public final class ObservableCreate<T> extends Observable<T> {
    final ObservableOnSubscribe<T> source;

    public ObservableCreate(ObservableOnSubscribe<T> source) {
        this.source = source;
    }
}

該類的建構函式只是做了source的儲存,該source是我們呼叫create方法時傳入的ObservableOnSubscribe,也就是說,用ObservableCreate對ObservableOnSubscribe進行包裝(即裝飾者模式)。

到這裡,create方法最終返回一個ObservableCreate物件,它繼承自Observable,接下來就是subscribe(Observer)方法了,

.subscribe(new Observer<String>() {
            @Override
            public void onSubscribe(Disposable d) {
                Log.d(TAG, "onSubscribe() : d = " + d );
            }

            @Override
            public void onNext(String value) {
                Log.d(TAG, "onNext() : value = " + value );
            } 
           ...
        });

我們看一下observable的subscribe方法:

public final void subscribe(Observer<? super T> observer) {
        ObjectHelper.requireNonNull(observer, "observer is null");
        try {
            observer = RxJavaPlugins.onSubscribe(this, observer);

            ObjectHelper.requireNonNull(observer, "Plugin returned null Observer");

            subscribeActual(observer);
        } catch (NullPointerException e) { // NOPMD
            ...
        }
    }

RxJavaPlugins.onSubscribe同樣是hook方法,最後呼叫了自己的subscribeActual(Observer),也就是說:subscribeActual方法是在我們完成訂閱 即呼叫subscribe(Observer)的時候 被呼叫的,引數就是從下游傳遞上來的Observer物件, 那麼我們直接看ObservableCreate的subscribeActual方法:

    protected void subscribeActual(Observer<? super T> observer) {
        //用observer構造一個CreateEmitter物件
        CreateEmitter<T> parent = new CreateEmitter<T>(observer);
        //呼叫observer的onSubscribe方法
        observer.onSubscribe(parent);

        try {
            //呼叫被觀察者source的subscribe方法,傳入CreateEmitter
            source.subscribe(parent);
        } catch (Throwable ex) {
            Exceptions.throwIfFatal(ex);
            parent.onError(ex);
        }
    }

可以看到裡面用一個CreateEmitter包裝下游的Observer,然後呼叫Observer的onSubscribe方法,接著呼叫上游物件也就是source(被包裝物件)的subscribe方法。也就是在subscribeActual方法中呼叫了source(被包裝物件)的subscribe方法,這樣自下而上一層層呼叫他們的subscribe方法,其他操作符也和create操作符一樣,包裝,然後再subscribeActual中呼叫上游的subscribe。 這裡的CreateEmitter很熟悉,

static final class CreateEmitter<T>
    extends AtomicReference<Disposable>
    implements ObservableEmitter<T>, Disposable {

        CreateEmitter(Observer<? super T> observer) {
            this.observer = observer;
        }
}

CreateEmitter實現ObservableEmitter介面,從建構函式中得知它對Observer進行包裝, 在被觀察者的subscribe方法中我們呼叫引數e的onNext和onComplete等來push事件,根據subscribeActual程式碼中source.subscribe(parent)可以知道,這裡的e就是CreateEmitter,

public void subscribe(ObservableEmitter<String> e) throws Exception {
    e.onNext("hey");
    e.onComplete();
}

看一下CreateEmitter類的onNext方法,最終會呼叫observer的onNext方法

public void onNext(T t) {
            if (t == null) {
                onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
                return;
            }
            if (!isDisposed()) {
                observer.onNext(t);
            }
        }

下面直接呼叫了subscribeActual,並傳入observer,subscribeActual就是前面講的,自下而上一層層呼叫subscribeActual也就是解包裝(瞎說的- -)

這裡對基本流程做一個總結:

首先順序執行我們的程式,自上而下每個操作符都對上游返回的結果進行包裝,subscribe(Observer)方法呼叫後,自下而上一層層解包裝,最終在subscribe(ObservableEmitter e)呼叫e.onNext等方法,這裡的e就是從下游傳遞上來的observer。

RxJava執行緒排程

知道了RxJava採用裝飾者模式後,理解其他操作符就不難了,subscribeOn操作符是用來決定被觀察者執行的執行緒,

我們直接看subscribeOn方法,返回一個ObservableSubscribeOn物件:

    public final Observable<T> subscribeOn(Scheduler scheduler) {
        ObjectHelper.requireNonNull(scheduler, "scheduler is null");
        return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
    }
public final class ObservableSubscribeOn<T> extends AbstractObservableWithUpstream<T, T> {
    final Scheduler scheduler;

    public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) {
        super(source);
        this.scheduler = scheduler;
    }

    @Override
    public void subscribeActual(final Observer<? super T> s) {
        final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);
        //呼叫observer的onSubscribe,說明onSubscribe是在訂閱的執行緒執行的
        s.onSubscribe(parent);

        parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
    }
}

可以看到又是一次包裝,在subscribeActual中直接呼叫了Observer的onSubscribe方法,說明onSubscribe方法執行的執行緒和訂閱的執行緒是一致的。 接下來這條常常的程式碼,我們先看SubscribeTask:

    final class SubscribeTask implements Runnable {
        private final SubscribeOnObserver<T> parent;

        SubscribeTask(SubscribeOnObserver<T> parent) {
            this.parent = parent;
        }

        @Override
        public void run() {
            source.subscribe(parent);
        }
    }

它是ObservableSubscribeOn的內部類,實現Runnable類,說明它可以被執行,run方法中正是呼叫了source.subscribe()方法,這裡注意一點,這個source會一層層呼叫上游的程式碼,也就是說在subscribeOn操作符之上的操作都會被影響

parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));

接著是scheduler.scheduleDirect方法,假設scheduler是Schedulers.IO,也就是在子執行緒執行source.subscribe, Schedulers.IO最終得到IoScheduler:

public final class IoScheduler extends Scheduler {

    public IoScheduler() {
        this(WORKER_THREAD_FACTORY);
    }

    public IoScheduler(ThreadFactory threadFactory) {
        this.threadFactory = threadFactory;
        this.pool = new AtomicReference<CachedWorkerPool>(NONE);
        start();
    }
}

內部配置了執行緒池等,由它來執行run方法。

至此,執行緒排程就解析到這裡。

歡迎糾正,喜歡點個贊。