1. 程式人生 > >RxJava2原始碼解析

RxJava2原始碼解析

基礎解析

我們看下RxJava最簡單的寫法

        Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> emitter) throws Exception {
                emitter.onNext("1");
                emitter.onComplete();
            }
        }).subscribe(new Observer<String>() {
            @Override
            public void onSubscribe(Disposable d) {
                System.out.println("onSubscribe");
            }

            @Override
            public void onNext(String s) {
                System.out.println(s);
            }

            @Override
            public void onError(Throwable e) {
                System.out.println("onError"+e.getLocalizedMessage());
            }

            @Override
            public void onComplete() {
                System.out.println("onComplete");
            }
        })

很簡單的3個步驟:

  1. 建立 Observable :被觀察者
  2. 建立 Observer :觀察者
  3. 通過 subscribe() 方法建立訂閱關係

一個個來看

被觀察者的建立

public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
    ObjectHelper.requireNonNull(source, "source is null");
    //建立了一個ObservableCreate類,裡面包裝了我們傳入的source引數
    return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
}

public final class ObservableCreate<T> extends Observable<T> {
    final ObservableOnSubscribe<T> source;

    public ObservableCreate(ObservableOnSubscribe<T> source) {
        this.source = source;
    }

觀察者的建立

這裡很簡單,只是通過new方法生成了一個簡單的Observer物件。

訂閱

訂閱是通過subscribe方法來執行的,我們來跟蹤一下,這個方法是屬於Observable類的

public final void subscribe(Observer<? super T> observer) {
    //校驗觀察者不為空
    ObjectHelper.requireNonNull(observer, "observer is null");
    try {
        observer = RxJavaPlugins.onSubscribe(this, observer);

        ObjectHelper.requireNonNull(observer, "Plugin returned null Observer");
        //呼叫subscribeActual方法,然後入參是observer(被觀察者)。這個方法是抽象方法,具體的實現是交給子類的
        subscribeActual(observer);
    } catch (NullPointerException e) { // NOPMD
        throw e;
    } catch (Throwable e) {
        Exceptions.throwIfFatal(e);
        // can't call onError because no way to know if a Disposable has been set or not
        // can't call onSubscribe because the call might have set a Subscription already
        RxJavaPlugins.onError(e);

        NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
        npe.initCause(e);
        throw npe;
    }
}
    /**
     * Operator implementations (both source and intermediate) should implement this method that
     * performs the necessary business logic.
     * <p>There is no need to call any of the plugin hooks on the current Observable instance or
     * the Subscriber.
     * @param observer the incoming Observer, never null
     */
    protected abstract void subscribeActual(Observer<? super T> observer);

最終通過 subscribeActual(observer) 來實現功能,而這個方法是有具體的子類去實現的。從第一步中我們通過Observable.create()來生成的被觀察者。裡面最終的生成的是 ObservableCreate 這個類。也就是說,這個subscribeActual(observer) 方法是由 ObservableCreate 這個類去實現的,我們去裡面找一下。

@Override
protected void subscribeActual(Observer<? super T> observer) {
    //這裡將我們傳入的被觀察者進行了一層封裝,裡面實現了ObservableEmitter<T>, Disposable等介面->裝飾者模式
    CreateEmitter<T> parent = new CreateEmitter<T>(observer);
    //呼叫被觀察者的onSubscribe方法(這裡很神奇,調起者是observer,而不是被訂閱者,是為了相容Rxajva1麼?)
    observer.onSubscribe(parent);
    try {
        //這裡的source就是我們自己寫的那個ObservableOnSubscribe了,呼叫了裡面的subscriber方法,然後引數是封裝後的觀察者。
        source.subscribe(parent);
    } catch (Throwable ex) {
        Exceptions.throwIfFatal(ex);
        parent.onError(ex);
    }
}

Observable.create(new ObservableOnSubscribe<String>() {
            //看到了哈,實際是執行的這個方法,這裡面的emitter是我們封裝之後的CreateEmitter,那麼這裡面的onNext(),onComplete()又是誰呢?
            @Override
            public void subscribe(ObservableEmitter<String> emitter) throws Exception {
                emitter.onNext("1");
                emitter.onComplete();
            }
        })

我們現在回到我們封裝生成的 CreateEmitter 這個類

static final class CreateEmitter<T>
extends AtomicReference<Disposable>
implements ObservableEmitter<T>, Disposable {
    private static final long serialVersionUID = -3434801548987643227L;
    final Observer<? super T> observer;
    //定義的觀察者
    CreateEmitter(Observer<? super T> observer) {
        this.observer = observer;
    }
    
    @Override
    public void onNext(T t) {
        if (t == null) {
            onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
            return;
        }
        //呼叫的是觀察者的onNext()方法
        if (!isDisposed()) {
            observer.onNext(t);
        }
    }

    @Override
    public void onComplete() {
        if (!isDisposed()) {
            try {
                //呼叫的是觀察者的onComplete()方法
                observer.onComplete();
            } finally {
                //執行完onComplete()方法後要取消訂閱
                dispose();
            }
        }
    }
    .....
}

到這裡為知,最簡單的一個流程基本已經走通了。。

高階用法

執行緒切換

下層切換

RxJava中我們使用的最多的應該就是進行執行緒切換了吧?通過 observeOn() 方法來進行執行緒的隨意切換,舒舒服服,再也不用進行噁心的執行緒處理了。

Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> emitter) throws Exception {
                emitter.onNext("1");
                emitter.onComplete();
            }
        }).observeOn(Schedulers.io())

observeOn() 方法是屬於Observable這個類的。我們跟蹤進去這個方法去看看。

public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
        //進行空校驗
        ObjectHelper.requireNonNull(scheduler, "scheduler is null");
        ObjectHelper.verifyPositive(bufferSize, "bufferSize");
        return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize));
    }

這裡建立了一個 ObservableObserveOn 物件,所以和之前基礎裡面將的一樣,當呼叫 subscribe() 方法的時候,會先呼叫觀察者的 onSubscribe() 方法,然後通過subscribe的層層處理,呼叫這個被觀察者裡面的 subscribeActual() 方法。

@Override
protected void subscribeActual(Observer<? super T> observer) {
    if (scheduler instanceof TrampolineScheduler) {//如果傳入的scheduler是TrampolineScheduler,那麼執行緒不需要切換,直接呼叫subscribe方法即可
        source.subscribe(observer);
    } else {
        //根據傳入的scheduler,建立Worker
        Scheduler.Worker w = scheduler.createWorker();
        //將傳入的observer進行包裝,包裝為ObserveOnObserver類。
        source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
    }
}

這裡可以依據基礎篇的進行整理一下,這裡將觀察者進行了一層包裝,也就是我們的觀察者由原來的observaer變為了ObserveOnObserver物件。而被觀察者還是之前的ObservableCreate(注意,這裡只是依據基礎中.create()建立的類,所以是ObservableCreate,如果是其他方式建立的被觀察者,那麼這裡可能就是另一個具體的實現類了),並未改變。之前我們講過,當呼叫subscribe方法的onNext(),onComplete()方法,其實是呼叫的觀察者的方法。我們現在看一下ObserveOnObserver的onNext和onComplete方法又是做了什麼神奇的操作。

@Override
public void onNext(T t) {
    if (done) {//如果已經完成,直接返回
        return;
    }
    if (sourceMode != QueueDisposable.ASYNC) {
        //將onNext的資料放入佇列queue
        queue.offer(t);
    }
    //進行執行緒切換
    schedule();
}

void schedule() {
    if (getAndIncrement() == 0) {
        //呼叫了worker的方法,這裡通過呼叫執行緒池,呼叫了自身的run方法
        worker.schedule(this);
    }
}

這裡我們使用的是IO執行緒,那麼在 scheduler.createWorker() 中的生成worker時

@NonNull
@Override
public Worker createWorker() {
    return new EventLoopWorker(pool.get());
}

那麼跟到這個類裡面的 schedule 方法

@Override
public Disposable schedule(@NonNull Runnable action, long delayTime, @NonNull TimeUnit unit) {
    if (tasks.isDisposed()) {
        // don't schedule, we are unsubscribed
        return EmptyDisposable.INSTANCE;
    }
    //這裡呼叫了執行緒worker的scheduleActual方法,並把Runable物件傳進去
    return threadWorker.scheduleActual(action, delayTime, unit, tasks);
}

public ScheduledRunnable scheduleActual(final Runnable run, long delayTime, @NonNull TimeUnit unit, @Nullable DisposableContainer parent) {
        //留下鉤子
        Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
        ScheduledRunnable sr = new ScheduledRunnable(decoratedRun, parent);
        ....
        Future<?> f;
        try {
            if (delayTime <= 0) {
                //線上程池中呼叫封裝之後的Runnable
                f = executor.submit((Callable<Object>)sr);
            } else {
                f = executor.schedule((Callable<Object>)sr, delayTime, unit);
            }
            sr.setFuture(f);
        } catch (RejectedExecutionException ex) {
            if (parent != null) {
                parent.remove(sr);
            }
            RxJavaPlugins.onError(ex);
        }
        return sr;
    }

可以看到,其實最終是通過執行緒池呼叫了 ObserveOnObserver 本身,這個類實現了 Runnable 介面,我們看一下run方法裡面做了什麼。

@Override
public void run() {
    if (outputFused) {
        drainFused();
    } else {
        drainNormal();
    }
}
//具體的操作
void drainNormal() {
     int missed = 1;
     //被觀察者onNext傳送的資料佇列
     final SimpleQueue<T> q = queue;
     //實際的觀察者
     final Observer<? super T> a = downstream;
     for (;;) {
         //檢測是否有異常資訊
         if (checkTerminated(done, q.isEmpty(), a)) {
             return;
         }
         //遍歷
         for (;;) {
             boolean d = done;
             T v;
             //取出佇列中的資料
             try {
                 v = q.poll();
             } catch (Throwable ex) {
                 //發生異常,則直接呼叫dispose()和onError()方法
                 Exceptions.throwIfFatal(ex);
                 disposed = true;
                 upstream.dispose();
                 q.clear();
                 a.onError(ex);
                 worker.dispose();
                 return;
             }
             ....
             //呼叫實際的觀察者的onNext()方法
             a.onNext(v);
         }
         ...
     }
 }

因為這個操作最終是在scheduler.createWorker()建立的地方進行了處理,才實現了對於之後程式碼處理都在io執行緒中進行了呼叫。從而實現執行緒的切換功能。這裡我們對之前的測試程式碼流程做一個總結。

先看一下對於觀察者的onSubscribe()方法的呼叫流程:

這裡面我們自己定義的觀察者通過subscribe()方法層層往上呼叫,最後呼叫了我們定義的被觀察者裡面的onSubscribe方法,再一層層的往下呼叫,最後到我們自己定義的onSubscribe()方法,裡面很少有執行緒的切換處理,所以這段程式碼在哪兒執行,那麼這段程式碼在那裡執行,這個onSubscribe()方法就是在哪個執行緒執行。

繼續,我們看一下onNext()方法

上層切換

除了 observeOn 方法來處理我們操作流的下層執行緒處理之外,我們也可以通過 subscribeOn 方法來進行對上層流的執行緒處理。

測試用程式碼:

Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> emitter) throws Exception {
                emitter.onNext("1");
                emitter.onComplete();
            }
}).subscribeOn(Schedulers.io())

現在我們跟蹤進 subscribeOn 方法

public final Observable<T> subscribeOn(Scheduler scheduler) {
    ObjectHelper.requireNonNull(scheduler, "scheduler is null");
    //
    return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
}

這裡看到,跟我們基礎篇裡面的 create() 方法有異曲同工之妙,這裡面生成了一個ObservableSubscribeOn類,這個類也是繼承Observable類的,我們跟蹤進去看一下。

public final class ObservableSubscribeOn<T> extends AbstractObservableWithUpstream<T, T> {
    final Scheduler scheduler;

    public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) {
        super(source);
        this.scheduler = scheduler;
    }

    @Override
    public void subscribeActual(final Observer<? super T> observer) {
        final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(observer);
        //呼叫訂閱者的onSubscribe方法,這裡的執行緒還未進行切換
        observer.onSubscribe(parent);
        //進行執行緒的切換處理
        //1.創造一個SubscribeTask的Runable方法
        //2.通過scheduler的scheduleDirect進行執行緒的切換
        //3.通過parent.setDisposable來進行Disposable的切換
        parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
    }

看起來是不是很像?在基礎篇我們知道了,這個 subscribeActual 方法裡面的引數就是我們的觀察者。

我們看一下里面和之前分析所不同的地方,也就是執行緒的切換

final class SubscribeTask implements Runnable {
    ...
    @Override
    public void run() {
        //source是我們上一層的被觀察者,parent是包裝之後的觀察者.
        //所以會在相關的worker裡面呼叫source的subscribe方法,
        //即上層的資料呼叫已經在woker裡面了(如果是IoScheduler,那麼這裡就是在RxCachedThreadScheduler執行緒池呼叫了這個方法 )
        source.subscribe(parent);
    }
}

然後看一下這裡面最重要的 scheduler.scheduleDirect 這個方法

    @NonNull
    public Disposable scheduleDirect(@NonNull Runnable run) {
        return scheduleDirect(run, 0L, TimeUnit.NANOSECONDS);
    }
    @NonNull
    public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
        //建立一個Worker,這個是有具體的實現類來實現的,比如我們的IOScheduler,ImmediateThinScheduler等,具體要看我們切換傳參
        final Worker w = createWorker();
        final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
        DisposeTask task = new DisposeTask(decoratedRun, w);
        w.schedule(task, delay, unit);
        return task;
    }

這裡我們對上層切換的流程做一個總結:當呼叫 subscribeOn 方法的時候,會在建立的排程器中來執行被觀察者的執行程式碼,從而實現了對上層的執行緒切換功能。

先看一下測試程式碼中的onNext()方法的呼叫流程:

彙總

其實對於執行緒的切換,主要是根據裡面傳遞的執行緒切換函式,將上游或者下游的程式碼在指定的執行緒裡面去執行來實現。

本文由 開了肯 釋出!