1. 程式人生 > >RxJava2小白手冊(2)- 執行緒管理和流程淺析

RxJava2小白手冊(2)- 執行緒管理和流程淺析

介紹

承接上文,結合使用場景,討論一下如何告別AsyncTask,就是因為RxJava的強大執行緒管理功能。

舉個栗子

認識RxJava之前,我們處理非同步任務的方式主要有兩種:
1. AsyncTask
2. Thread + Runnable.
涉及的程式碼量相比較RxJava而言大太多,針對Handler處理不好,可能存在記憶體洩漏的風險。不贅述,看看如何使用RxJava處理非同步任務。

1. 非同步處理

1.1 程式碼示例

Observable<Integer> observable = Observable.create(new ObservableOnSubscribe<Integer>() {
    @Override
public void subscribe(ObservableEmitter<Integer> e) throws Exception { Logger("Emit 111"); e.onNext(111); Logger("Emit 222"); e.onNext(222); Logger("Emit onComplete"); e.onComplete(); } }); Observer<Integer> observer = new Observer<Integer>() { @Override
public void onSubscribe(Disposable d) { Logger("onSubscribe"); } @Override public void onNext(Integer integer) { Logger("onNext integer = " + integer); } @Override public void onError(Throwable e) { Logger("onError e = " + e.getMessage()); } @Override
public void onComplete() { Logger("onComplete"); } }; observable.subscribeOn(Schedulers.newThread()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(observer);

1.2 執行結果

這裡寫圖片描述
可以清楚的看到,Observer下的操作是在主執行緒下完成的,而Observable下發射器的發射動作卻是在一個新的執行緒中完成的。通過這種操作,我們可以在subscribe方法中執行耗時操作,然後結果通過onNext()方法返回給主執行緒,實現非同步處理的目的。
常用的場景:訪問資料庫,網路請求資料,後臺計算操作等等。

1.3 Schedulers 和AndroidSchedulers

AndroidSchedulers是RxAndroid中的執行緒排程器,主要用途如上所示,AndroidSchedulers.mainThread,代表Android中的主執行緒(UI執行緒)。

方法 解釋
Schedulers.computation() 用於計算任務
Schedulers.from(Executor executor) 使用指定的Executor作為排程器
Schedulers.io() 用於IO密集型任務,如非同步阻塞IO操作,這個排程器的執行緒池會根據需要增長
Scheduler.newThread() 為每個任務建立一個新執行緒
Scheduler.shutdown() 停止排程器
Scheduler.single() 單獨執行緒
Scheduler.start() 啟動排程器
Scheduler.trampoline() 在當前執行緒中,但是會等到當前執行緒任務執行完畢之後再去執行
AndroidScheduler.mainThread() 主執行緒

看看原始碼

1. Observable.create()

    public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
        ObjectHelper.requireNonNull(source, "source is null");
        return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
    }

先執行非空檢查,然後通過ObservableCreate來建立Observable。而ObservableCreate繼承Observable。看下程式碼

//ObservableCreate繼承Observable
public final class ObservableCreate<T> extends Observable<T> {
    //ObservableOnSubscribe介面只有一個subscribe方法
    final ObservableOnSubscribe<T> source;

    public ObservableCreate(ObservableOnSubscribe<T> source) {
        //賦值,結合上面Observable的create方法,這個source應該是我們new出來的ObservableOnSubscribe
        this.source = source;
    }

    //這個方法在Observable執行subscribe(Observer)的時候使用到
    @Override
    protected void subscribeActual(Observer<? super T> observer) {
        //建立CreateEmitter,傳入observer,內部使用
        CreateEmitter<T> parent = new CreateEmitter<T>(observer);
        //執行observer的onSubscribe方法,parent是CreateEmitter,實現了Disposable,和我們建立Observer時實現的onSubscribe方法一致,沒毛病
        observer.onSubscribe(parent);

        try {
            //執行subscribe方法,source為ObservableOnSubscribe物件,parent為CreateEmitter,而CreateEmitter實現ObservableEmitter介面,沒毛病
            source.subscribe(parent);
        } catch (Throwable ex) {
            Exceptions.throwIfFatal(ex);
            //CreateEmitter執行onError方法
            parent.onError(ex);
        }
    }

    //CreateEmitter類繼承AtomicReference<Disposable>實現ObservableEmitter和Disposable 介面
    static final class CreateEmitter<T>
    extends AtomicReference<Disposable>
    implements ObservableEmitter<T>, Disposable {


        private static final long serialVersionUID = -3434801548987643227L;

        //建立過程中傳入的Observer
        final Observer<? super T> observer;

        CreateEmitter(Observer<? super T> observer) {
            this.observer = observer;
        }

        //OnNext方法
        @Override
        public void onNext(T t) {
            //非空檢查,onNext在2.0之後不允許傳入null值作為引數
            if (t == null) {
                onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
                return;
            }
            //這個對應上我們的上一篇部落格,一次性水管,如果isDisposed為true,則發射器發出的事件,將不會被觀察者執行
            if (!isDisposed()) {
                observer.onNext(t);
            }
        }

        //onError方法,當tryOnErro返回false的時候,執行RxJavaPlugins.onError(t),何時tryOnError會返回false呢?看下面
        @Override
        public void onError(Throwable t) {
            //當isDisposed()為true後,會執行RxJavaPlugins.onError(t)操作,也就是說如果在isDisposed()為true的情況下,發射器還發出onError()事件,會導致程式崩潰。具體看下面的執行示例。
            if (!tryOnError(t)) {
                RxJavaPlugins.onError(t);
            }
        }

        @Override
        public boolean tryOnError(Throwable t) {
            //也是不允許傳入null
            if (t == null) {
                t = new NullPointerException("onError called with null. Null values are generally not allowed in 2.x operators and sources.");
            }
            //如果isDisposed為false,執行觀察者的onError方法,然後執行dispose()操作,也就是觀察者不處理後面發射器傳送的事件了。估計onComplete()方法中也會有類似的操作流程
            if (!isDisposed()) {
                try {
                    observer.onError(t);
                } finally {
                    dispose();
                }
                return true;
            }
            //只有當isDisposed為true的時候回返回false,也就是上一個方法回執行RxJavaPlugins.onError(t);操作
            return false;
        }

        @Override
        public void onComplete() {
            //和上面onError()操作類似,不同的是沒有非空檢查,因為onComplete沒有引數。
            if (!isDisposed()) {
                try {
                    observer.onComplete();
                } finally {
                    dispose();
                }
            }
        }
        ……
    }
    ……
    //這部分介紹的是SerializedEmitter,暫無涉及
}

舉例說明,isDisposed()為true時,發射器繼續傳送onError事件會導致程式崩潰。

 Observable<Integer> observable = Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> e) throws Exception {
                Logger("Emit 111");
                e.onNext(111);
                Logger("Emit 222");
                e.onNext(222);
                Logger("Emit 333");
                e.onNext(333);
                Logger("Emit onError");
                e.onError(new Throwable("Test Disposable onError"));

            }
        });

        Observer<Integer> observer = new Observer<Integer>() {
            Disposable mDisposable;
            @Override
            public void onSubscribe(Disposable d) {
                Logger("onSubscribe");
                mDisposable = d;
            }

            @Override
            public void onNext(Integer integer) {
                Logger("onNext integer = " + integer);
                if(mDisposable!=null && !mDisposable.isDisposed() && integer == 222) {
                    mDisposable.dispose();
                }
            }

            @Override
            public void onError(Throwable e) {
                Logger("onError e = " + e.getMessage());
            }

            @Override
            public void onComplete() {
                Logger("onComplete");
            }
        };
        observable.subscribe(observer);

執行結果:
這裡寫圖片描述

2. Observer

建立Observer,無甚特殊,注意onSubscribe,onNext,onError傳入的引數不能為空以及Disposable 的使用。

public interface Observer<T> {
    void onSubscribe(@NonNull Disposable d);

    void onNext(@NonNull T t);

    void onError(@NonNull Throwable e);

    void onComplete();
}

3. observable.subscribe(observer);

分析一下subscribe方法

    public final void subscribe(Observer<? super T> observer) {
        //observer非空檢查
        ObjectHelper.requireNonNull(observer, "observer is null");
        try {
            //關聯observable和observer
            observer = RxJavaPlugins.onSubscribe(this, observer);

            ObjectHelper.requireNonNull(observer, "Plugin returned null Observer");
            //這個方法在Observable中是個抽象方法,但是結合上面Observerable的create過程,可以知道這裡實際上呼叫的是ObservableCreate的subscribeActual方法,也就是上面我們分析的過程,沒毛病
            subscribeActual(observer);
        } catch (NullPointerException e) { // NOPMD
            throw e;
        } catch (Throwable e) {
            Exceptions.throwIfFatal(e);
            // can't call onError because no way to know if a Disposable has been set or not
            // can't call onSubscribe because the call might have set a Subscription already
            RxJavaPlugins.onError(e);

            NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
            npe.initCause(e);
            throw npe;
        }
    }

一路看下來,我們就可以很快的和我們做的測試對應上,先呼叫onSubscribe方法,然後執行subscribe方法中的發射器操作,根據發射器的操作Observer作出對應的處理。

4. subscribeOn

subscribeOn用來指定Observable在哪個執行緒執行自己的subscribe方法。

    public final Observable<T> subscribeOn(Scheduler scheduler) {
        //scheduler非空檢查
        ObjectHelper.requireNonNull(scheduler, "scheduler is null");
        return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
    }

建立了一個ObservableSubscribeOn,這個類千萬別和上面建立Observable過程中使用的ObservableOnSubscribe介面弄混淆,結合當前操作為subscribeOn來記住這個類名。

    public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) {
        super(source);
        this.scheduler = scheduler;
    }

Observerable繼承自ObservableSource,所以建立ObservableSubscribeOn的時候Observable和scheduler傳遞過來。
ObservableSubscribeOn繼承
AbstractObservableWithUpstream,而後者又繼承Observable。所以實際上經過subscribeOn操作之後,後續操作的物件從Observerable變成了ObservableSubscribeOn,所以,當後面執行subscribe時執行的subscribeActual方法為ObservableSubscribeOn重的對應方法

    @Override
    public void subscribeActual(final Observer<? super T> s) {
        //封裝Observer,實際Observer由其內部actual維護
        final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);
        //Observer執行onSubscribe方法,SubscribeOnObserver實現Disposable介面,所以上面的例子中onSubscribe傳遞的是Disposable型別
        s.onSubscribe(parent);
//這裡有3個操作:
//1. 建立SubscribeTask,傳入封裝後的Observer
//2. 排程器執行scheduleDirect操作
//3. 封裝後的Observer執行setDisposable操作
 parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
    }

關注scheduler.scheduleDirect(new SubscribeTask(parent)),先看下

    final class SubscribeTask implements Runnable {
        private final SubscribeOnObserver<T> parent;

        SubscribeTask(SubscribeOnObserver<T> parent) {
            this.parent = parent;
        }

        @Override
        public void run() {
            source.subscribe(parent);
        }
    }

實現Runnable,終於看到一個有點熟悉的東西。傳入的parent為封裝後的Observer。而source則是建立ObservableSubscribeOn過程中傳入的Observable。
在看scheduleDirect方法之前,我們得先弄清楚這個scheduler是個什麼東西?在Schedulers類中,不同的Scheduler已經初始化完成。

    static {
        SINGLE = RxJavaPlugins.initSingleScheduler(new SingleTask());

        COMPUTATION = RxJavaPlugins.initComputationScheduler(new ComputationTask());

        IO = RxJavaPlugins.initIoScheduler(new IOTask());

        TRAMPOLINE = TrampolineScheduler.instance();

        NEW_THREAD = RxJavaPlugins.initNewThreadScheduler(new NewThreadTask());
    }

就看下NEW_THREAD 這個,首先new NewThreadTask()

    static final class NewThreadTask implements Callable<Scheduler> {
        @Override
        public Scheduler call() throws Exception {
            return NewThreadHolder.DEFAULT;
        }
    }
    static final class NewThreadHolder {
        static final Scheduler DEFAULT = new NewThreadScheduler();
    }
public final class NewThreadScheduler extends Scheduler {

    final ThreadFactory threadFactory;

    private static final String THREAD_NAME_PREFIX = "RxNewThreadScheduler";
    private static final RxThreadFactory THREAD_FACTORY;

    /** The name of the system property for setting the thread priority for this Scheduler. */
    private static final String KEY_NEWTHREAD_PRIORITY = "rx2.newthread-priority";

    static {
        int priority = Math.max(Thread.MIN_PRIORITY, Math.min(Thread.MAX_PRIORITY,
                Integer.getInteger(KEY_NEWTHREAD_PRIORITY, Thread.NORM_PRIORITY)));

        THREAD_FACTORY = new RxThreadFactory(THREAD_NAME_PREFIX, priority);
    }

    public NewThreadScheduler() {
        this(THREAD_FACTORY);
    }

    public NewThreadScheduler(ThreadFactory threadFactory) {
        this.threadFactory = threadFactory;
    }

    @NonNull
    @Override
    public Worker createWorker() {
        return new NewThreadWorker(threadFactory);
    }
}

看到了ThreadFactory,RxThreadFactory,而RxThreadFactory實現了ThreadFactory介面,所以最後還是執行緒的使用,只是RxJava對這些基礎的東西做了深度的封裝和流程上的優化,讓我們更方便的使用。
回溯到上面的scheduleDirect方法,

    public Disposable scheduleDirect(@NonNull Runnable run) {
        return scheduleDirect(run, 0L, TimeUnit.NANOSECONDS);
    }
    public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
        //建立工作執行緒,以NewThreadScheduler為例,是建立NewThreadWorker
        final Worker w = createWorker();
        //
        final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);

        //封裝出一個帶Dispose的Task,方便控制
        DisposeTask task = new DisposeTask(decoratedRun, w);
        //以NewThreadScheduler為例,這裡執行的是NewThreadWorker中的schedule方法
        w.schedule(task, delay, unit);

        return task;
    }
    public Disposable schedule(@NonNull final Runnable action, long delayTime, @NonNull TimeUnit unit) {
        if (disposed) {
            return EmptyDisposable.INSTANCE;
        }
        return scheduleActual(action, delayTime, unit, null);
    }

忽略disposed的影響,最後執行到scheduleActual

   public ScheduledRunnable scheduleActual(final Runnable run, long delayTime, @NonNull TimeUnit unit, @Nullable DisposableContainer parent) {
        Runnable decoratedRun = RxJavaPlugins.onSchedule(run);

        ScheduledRunnable sr = new ScheduledRunnable(decoratedRun, parent);

        if (parent != null) {
            if (!parent.add(sr)) {
                return sr;
            }
        }

        Future<?> f;
        try {
            //直接提交或者進入佇列
            if (delayTime <= 0) {
                f = executor.submit((Callable<Object>)sr);
            } else {
                f = executor.schedule((Callable<Object>)sr, delayTime, unit);
            }
            sr.setFuture(f);
        } catch (RejectedExecutionException ex) {
            if (parent != null) {
                parent.remove(sr);
            }
            RxJavaPlugins.onError(ex);
        }

        return sr;
    }

可以看到熟悉的ScheduledExecutorService和Future。
所以,綜上所述,其內部也是新建立一個執行緒,結合Runnable。

5. observeOn

原理我覺得和subscribeOn沒有太大的差別。不做贅述。

最後

RxJava使用起來方便,其中包含著很多技巧,也不甚瞭解,只能是慢慢用,慢慢理。

【碼道長】公眾號,最近開始運營,有興趣的朋友歡迎來訪。
這裡寫圖片描述