1. 程式人生 > >RxJava的訊息訂閱和執行緒切換原理

RxJava的訊息訂閱和執行緒切換原理

本文由玉剛說寫作平臺提供寫作贊助,版權歸玉剛說微信公眾號所有
原作者:四月葡萄
版權宣告:未經玉剛說許可,不得以任何形式轉載

1.前言

本文主要是對RxJava的訊息訂閱和執行緒切換進行原始碼分析,相關的使用方式等不作詳細介紹。

本文原始碼基於rxjava:2.1.14

2. RxJava簡介

RxJava is a Java VM implementation of Reactive Extensions: a library for composing asynchronous and event-based programs by using observable sequences.

It extends the observer pattern to support sequences of data/events and adds operators that allow you to compose sequences together declaratively while abstracting away concerns about things like low-level threading, synchronization, thread-safety and concurrent data structures.

上面這段話來自於RxJava在github上面的官方介紹。翻譯成中文的大概意思就是:

RxJava是一個在Java虛擬機器上的響應式擴充套件,通過使用可觀察的序列將非同步和基於事件的程式組合起來的一個庫。

它擴充套件了觀察者模式來支援資料/事件序列,並且添加了操作符,這些操作符允許你宣告性地組合序列,同時抽象出要關注的問題:比如低階執行緒、同步、執行緒安全和併發資料結構等。

簡單點來說, RxJava就是一個使用了觀察者模式,能夠非同步的庫。

3. 觀察者模式

上面說到,RxJava擴充套件了觀察者模式,那麼什麼是觀察模式呢?我們先來了解一下。

舉個例子,以微信公眾號為例,一個微信公眾號會不斷產生新的內容,如果我們讀者對這個微信公眾號的內容感興趣,就會訂閱這個公眾號,當公眾號有新內容時,就會推送給我們。我們收到新內容時,如果是我們感興趣的,就會點進去看下;如果是廣告的話,就可能直接忽略掉。這就是我們生活中遇到的典型的觀察者模式。

在上面的例子中,微信公眾號就是一個被觀察者(Observable),不斷的產生內容(事件),而我們讀者就是一個觀察者(Observer) ,通過訂閱(subscribe)就能夠接受到微信公眾號(被觀察者)推送的內容(事件),根據不同的內容(事件)做出不同的操作。

3.1 Rxjava角色說明

RxJava的擴充套件觀察者模式中就是存在這麼4種角色:

角色 角色功能
被觀察者(Observable 產生事件
觀察者(Observer 響應事件並做出處理
事件(Event 被觀察者和觀察者的訊息載體
訂閱(Subscribe 連線被觀察者和觀察者

3.2 RxJava事件型別

RxJava中的事件分為三種類型:Next事件、Complete事件和Error事件。具體如下:

事件型別 含義 說明
Next 常規事件 被觀察者可以傳送無數個Next事件,觀察者也可以接受無數個Next事件
Complete 結束事件 被觀察者傳送Complete事件後可以繼續傳送事件,觀察者收到Complete事件後將不會接受其他任何事件
Error 異常事件 被觀察者傳送Error事件後,其他事件將被終止傳送,觀察者收到Error事件後將不會接受其他任何事件

4.RxJava的訊息訂閱

在分析RxJava訊息訂閱原理前,我們還是先來看下它的簡單使用步驟。這裡為了方便講解,就不用鏈式程式碼來舉例了,而是採用分步驟的方式來逐一說明(平時寫程式碼的話還是建議使用鏈式程式碼來呼叫,因為更加簡潔)。其使用步驟如下:

  1. 建立被觀察者(Observable),定義要傳送的事件。
  2. 建立觀察者(Observer),接受事件並做出響應操作。
  3. 觀察者通過訂閱(subscribe)被觀察者把它們連線到一起。

4.1 RxJava的訊息訂閱例子

這裡我們就根據上面的步驟來實現這個例子,如下:

        //步驟1. 建立被觀察者(Observable),定義要傳送的事件。
        Observable observable = Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> emitter) throws Exception {
                emitter.onNext("文章1");
                emitter.onNext("文章2");
                emitter.onNext("文章3");
                emitter.onComplete();
            }
        });

        //步驟2. 建立觀察者(Observer),接受事件並做出響應操作。
        Observer<String> observer = new Observer<String>() {
            @Override
            public void onSubscribe(Disposable d) {
                Log.d(TAG, "onSubscribe");
            }

            @Override
            public void onNext(String s) {
                Log.d(TAG, "onNext : " + s);
            }

            @Override
            public void onError(Throwable e) {
                Log.d(TAG, "onError : " + e.toString());
            }

            @Override
            public void onComplete() {
                Log.d(TAG, "onComplete");
            }
        };

        //步驟3. 觀察者通過訂閱(subscribe)被觀察者把它們連線到一起。
        observable.subscribe(observer);

其輸出結果為:

onSubscribe
onNext : 文章1
onNext : 文章2
onNext : 文章3
onComplete

4.2 原始碼分析

下面我們對訊息訂閱過程中的原始碼進行分析,分為兩部分:建立被觀察者過程和訂閱過程。

4.2.1 建立被觀察者過程

首先來看下建立被觀察者(Observable)的過程,上面的例子中我們是直接使用Observable.create()來建立Observable,我們點進去這個方法看下。

4.2.1.1 Observable類的create()
    public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
        ObjectHelper.requireNonNull(source, "source is null");
        return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
    }

可以看到,create()方法中也沒做什麼,就是建立一個ObservableCreate物件出來,然後把我們自定義的ObservableOnSubscribe作為引數傳到ObservableCreate中去,最後就是呼叫 RxJavaPlugins.onAssembly()方法。

我們先來看看ObservableCreate類:

4.2.1.2 ObservableCreate類
public final class ObservableCreate<T> extends Observable<T> {//繼承自Observable
    public ObservableCreate(ObservableOnSubscribe<T> source) {
        this.source = source;//把我們建立的ObservableOnSubscribe物件賦值給source。
    }
}

可以看到,ObservableCreate是繼承自Observable的,並且會把ObservableOnSubscribe物件給存起來。

再看下RxJavaPlugins.onAssembly()方法

4.2.1.3 RxJavaPlugins類的onAssembly()
    public static <T> Observable<T> onAssembly(@NonNull Observable<T> source) {
        //省略無關程式碼
        return source;
    }

很簡單,就是把上面建立的ObservableCreate給返回。

4.2.1.4 簡單總結

所以Observable.create()中就是把我們自定義的ObservableOnSubscribe物件重新包裝成一個ObservableCreate物件,然後返回這個ObservableCreate物件。
注意,這種重新包裝新物件的用法在RxJava中會頻繁用到,後面的分析中我們還會多次遇到。
放個圖好理解,包起來哈~
被觀察者.png

4.2.1.5 時序圖

Observable.create()的時序圖如下所示:
Observable.create()時序圖.png

4.2.2 訂閱過程

接下來我們就看下訂閱過程的程式碼,同樣,點進去Observable.subscribe()

4.2.2.1 Observable類的subscribe()
    public final void subscribe(Observer<? super T> observer) {
            //省略無關程式碼

            observer = RxJavaPlugins.onSubscribe(this, observer);

            subscribeActual(observer);

            //省略無關程式碼
    }

可以看到,實際上其核心的程式碼也就兩句,我們分開來看下:

4.2.2.2 RxJavaPlugins類的onSubscribe()
    public static <T> Observer<? super T> onSubscribe(@NonNull Observable<T> source, @NonNull Observer<? super T> observer) {
        //省略無關程式碼

        return observer;
    }

跟之前程式碼一樣,這裡同樣也是把原來的observer返回而已。
再來看下subscribeActual()方法。

4.2.2.3 Observable類的subscribeActual()
    protected abstract void subscribeActual(Observer<? super T> observer);

Observable類的subscribeActual()中的方法是一個抽象方法,那麼其具體實現在哪呢?還記得我們前面建立被觀察者的過程嗎,最終會返回一個ObservableCreate物件,這個ObservableCreate就是Observable的子類,我們點進去看下:

4.2.2.4 ObservableCreate類的subscribeActual()
    @Override
    protected void subscribeActual(Observer<? super T> observer) {
        CreateEmitter<T> parent = new CreateEmitter<T>(observer);
        //觸發我們自定義的Observer的onSubscribe(Disposable)方法
        observer.onSubscribe(parent);

        try {
            source.subscribe(parent);
        } catch (Throwable ex) {
            Exceptions.throwIfFatal(ex);
            parent.onError(ex);
        }
    }

可以看到,subscribeActual()方法中首先會建立一個CreateEmitter物件,然後把我們自定義的觀察者observer作為引數給傳進去。這裡同樣也是包裝起來,放個圖:
觀察者.png
這個CreateEmitter實現了ObservableEmitter介面和Disposable介面,如下:

    static final class CreateEmitter<T>
    extends AtomicReference<Disposable>
    implements ObservableEmitter<T>, Disposable {
        //程式碼省略
    }

然後就是呼叫了observer.onSubscribe(parent),實際上就是呼叫觀察者的onSubscribe()方法,即告訴觀察者已經成功訂閱到了被觀察者。

繼續往下看,subscribeActual()方法中會繼續呼叫source.subscribe(parent),這裡的source就是ObservableOnSubscribe物件,即這裡會呼叫ObservableOnSubscribesubscribe()方法。
我們具體定義的subscribe()方法如下:

        Observable observable = Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> emitter) throws Exception {
                emitter.onNext("文章1");
                emitter.onNext("文章2");
                emitter.onNext("文章3");
                emitter.onComplete();
            }
        });

ObservableEmitter,顧名思義,就是被觀察者發射器。
所以,subscribe()裡面的三個onNext()方法和一個onComplete()會逐一被呼叫。
這裡的ObservableEmitter介面其具體實現為CreateEmitter,我們看看CreateEmitte類的onNext()方法和onComplete()的實現:

4.2.2.5 CreateEmitter類的onNext()和onComplete()等
        //省略其他程式碼

        @Override
        public void onNext(T t) {
            //省略無關程式碼
            if (!isDisposed()) {
                //呼叫觀察者的onNext()
                observer.onNext(t);
            }
        }

        @Override
        public void onComplete() {
            if (!isDisposed()) {
                try {
                    //呼叫觀察者的onComplete()
                    observer.onComplete();
                } finally {
                    dispose();
                }
            }
        }

可以看到,最終就是會呼叫到觀察者的onNext()onComplete()方法。至此,一個完整的訊息訂閱流程就完成了。
另外,可以看到,上面有個isDisposed()方法能控制訊息的走向,即能夠切斷訊息的傳遞,這個後面再來說。

4.2.2.6 簡單總結

Observable(被觀察者)和Observer(觀察者)建立連線(訂閱)之後,會創建出一個發射器CreateEmitter,發射器會把被觀察者中產生的事件傳送到觀察者中去,觀察者對發射器中發出的事件做出響應處理。可以看到,是訂閱之後,Observable(被觀察者)才會開始傳送事件。

放張事件流的傳遞圖:
訂閱過程.png

4.2.2.7 時序流程圖

再來看下訂閱過程的時序流程圖:
訂閱過程時序圖.png

4.3 切斷訊息

之前有提到過切斷訊息的傳遞,我們先來看下如何使用:

4.3.1 切斷訊息

        Observable observable = Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> emitter) throws Exception {
                emitter.onNext("文章1");
                emitter.onNext("文章2");
                emitter.onNext("文章3");
                emitter.onComplete();
            }
        });

        Observer<String> observer = new Observer<String>() {
            private Disposable mDisposable;
            @Override
            public void onSubscribe(Disposable d) {
                Log.d(TAG, "onSubscribe : " + d);
                mDisposable=d;
            }

            @Override
            public void onNext(String s) {
                Log.d(TAG, "onNext : " + s);
                mDisposable.dispose();
                Log.d(TAG, "切斷觀察者與被觀察者的連線");
            }

            @Override
            public void onError(Throwable e) {
                Log.d(TAG, "onError : " + e.toString());
            }

            @Override
            public void onComplete() {
                Log.d(TAG, "onComplete");
            }
        };

        observable.subscribe(observer);

輸出結果為:

onSubscribe : null
onNext : 文章1
切斷觀察者與被觀察者的連線

可以看到,要切斷訊息的傳遞很簡單,呼叫下Disposabledispose()方法即可。呼叫dispose()之後,被觀察者雖然能繼續傳送訊息,但是觀察者卻收不到訊息了。
另外有一點需要注意,上面onSubscribe輸出的Disposable值是"null",並不是空引用null

4.3.2 切斷訊息原始碼分析

我們這裡來看看下dispose()的實現。Disposable是一個介面,可以理解Disposable為一個聯結器,呼叫dispose()後,這個聯結器將會中斷。其具體實現在CreateEmitter類,之前也有提到過。我們來看下CreateEmitterdispose()方法:

4.3.2.1 CreateEmitter的dispose()
        @Override
        public void dispose() {
            DisposableHelper.dispose(this);
        }

就是呼叫DisposableHelper.dispose(this)而已。

4.3.2.2 DisposableHelper類
public enum DisposableHelper implements Disposable {

    DISPOSED
    ;

    //其他程式碼省略

    public static boolean isDisposed(Disposable d) {
        //判斷Disposable型別的變數的引用是否等於DISPOSED
        //即判斷該聯結器是否被中斷
        return d == DISPOSED;
    }

    public static boolean dispose(AtomicReference<Disposable> field) {
        Disposable current = field.get();
        Disposable d = DISPOSED;
        if (current != d) {
            //這裡會把field給設為DISPOSED
            current = field.getAndSet(d);
            if (current != d) {
                if (current != null) {
                    current.dispose();
                }
                return true;
            }
        }
        return false;
    }
}

可以看到DisposableHelper是一個列舉類,並且只有一個值:DISPOSEDdispose()方法中會把一個原子引用field設為DISPOSED,即標記為中斷狀態。因此後面通過isDisposed()方法即可以判斷聯結器是否被中斷。

4.3.2.3 CreateEmitter類中的方法

再回頭看看CreateEmitter類中的方法:

        @Override
        public void onNext(T t) {
            //省略無關程式碼

            if (!isDisposed()) {
                //如果沒有dispose(),才會呼叫onNext()
                observer.onNext(t);
            }
        }

        @Override
        public void onError(Throwable t) {
            if (!tryOnError(t)) {
                //如果dispose()了,會呼叫到這裡,即最終會崩潰
                RxJavaPlugins.onError(t);
            }
        }

        @Override
        public boolean tryOnError(Throwable t) {
            //省略無關程式碼
            if (!isDisposed()) {
                try {
                    //如果沒有dispose(),才會呼叫onError()
                    observer.onError(t);
                } finally {
                    //onError()之後會dispose()
                    dispose();
                }
                //如果沒有dispose(),返回true
                return true;
            }
            //如果dispose()了,返回false
            return false;
        }

        @Override
        public void onComplete() {
            if (!isDisposed()) {
                try {
                    //如果沒有dispose(),才會呼叫onComplete()
                    observer.onComplete();
                } finally {
                    //onComplete()之後會dispose()
                    dispose();
                }
            }
        }

從上面的程式碼可以看到:

  1. 如果沒有disposeobserver.onNext()才會被呼叫到。
  2. onError()onComplete()互斥,只能其中一個被呼叫到,因為呼叫了他們的任意一個之後都會呼叫dispose()
  3. onError()onComplete()onComplete()不會被呼叫到。反過來,則會崩潰,因為onError()中丟擲了異常:RxJavaPlugins.onError(t)。實際上是dispose後繼續呼叫onError()都會炸。

5.RxJava的執行緒切換

上面的例子和分析都是在同一個執行緒中進行,這中間也沒涉及到執行緒切換的相關問題。但是在實際開發中,我們通常需要在一個子執行緒中去進行一些資料獲取操作,然後要在主執行緒中去更新UI,這就涉及到執行緒切換的問題了,通過RxJava我們也可以把執行緒切換寫得還簡潔。

5.1 執行緒切換例子

關於RxJava如何使用執行緒切換,這裡就不詳細講了。
我們直接來看一個例子,並分別列印RxJava在執行過程中各個角色所在的執行緒。

        new Thread() {
            @Override
            public void run() {
                Log.d(TAG, "Thread run() 所線上程為 :" + Thread.currentThread().getName());
                Observable
                        .create(new ObservableOnSubscribe<String>() {
                            @Override
                            public void subscribe(ObservableEmitter<String> emitter) throws Exception {
                                Log.d(TAG, "Observable subscribe() 所線上程為 :" + Thread.currentThread().getName());
                                emitter.onNext("文章1");
                                emitter.onNext("文章2");
                                emitter.onComplete();
                            }
                        })
                        .subscribeOn(Schedulers.io())
                        .observeOn(AndroidSchedulers.mainThread())
                        .subscribe(new Observer<String>() {
                            @Override
                            public void onSubscribe(Disposable d) {
                                Log.d(TAG, "Observer onSubscribe() 所線上程為 :" + Thread.currentThread().getName());
                            }

                            @Override
                            public void onNext(String s) {
                                Log.d(TAG, "Observer onNext() 所線上程為 :" + Thread.currentThread().getName());
                            }

                            @Override
                            public void onError(Throwable e) {
                                Log.d(TAG, "Observer onError() 所線上程為 :" + Thread.currentThread().getName());
                            }

                            @Override
                            public void onComplete() {
                                Log.d(TAG, "Observer onComplete() 所線上程為 :" + Thread.currentThread().getName());
                            }
                        });
            }
        }.start();

輸出結果為:

Thread run() 所線上程為 :Thread-2
Observer onSubscribe() 所線上程為 :Thread-2
Observable subscribe() 所線上程為 :RxCachedThreadScheduler-1
Observer onNext() 所線上程為 :main
Observer onNext() 所線上程為 :main
Observer onComplete() 所線上程為 :main

從上面的例子可以看到:

  1. Observer(觀察者)的onSubscribe()方法執行在當前執行緒中。
  2. Observable(被觀察者)中的subscribe()執行在subscribeOn()指定的執行緒中。
  3. Observer(觀察者)的onNext()onComplete()等方法執行在observeOn()指定的執行緒中。

5.2 原始碼分析

下面我們對執行緒切換的原始碼進行一下分析,分為兩部分:subscribeOn()observeOn()

5.2.1 subscribeOn()原始碼分析

首先來看下subscribeOn(),我們的例子中是這麼個使用的:

    .subscribeOn(Schedulers.io())

subscribeOn()方法要傳入一個Scheduler類物件作為引數,Scheduler是一個排程類,能夠延時或週期性地去執行一個任務。

5.2.1.1 Scheduler型別

通過Schedulers類我們可以獲取到各種Scheduler的子類。RxJava提供了以下這些執行緒排程類供我們使用:

Scheduler型別 使用方式 含義 使用場景
IoScheduler Schedulers.io() io操作執行緒 讀寫SD卡檔案,查詢資料庫,訪問網路等IO密集型操作
NewThreadScheduler Schedulers.newThread() 建立新執行緒 耗時操作等
SingleScheduler Schedulers.single() 單例執行緒 只需一個單例執行緒時
ComputationScheduler Schedulers.computation() CPU計算操作執行緒 圖片壓縮取樣、xml,json解析等CPU密集型計算
TrampolineScheduler Schedulers.trampoline() 當前執行緒 需要在當前執行緒立即執行任務時
HandlerScheduler AndroidSchedulers.mainThread() Android主執行緒 更新UI等
5.2.1.2 Schedulers類的io()

下面我們來看下Schedulers.io()的程式碼,其他的Scheduler子類都差不多,就不逐以分析了,有興趣的請自行檢視哈~


    @NonNull
    static final Scheduler IO;

    @NonNull
    public static Scheduler io() {
        //1.直接返回一個名為IO的Scheduler物件
        return RxJavaPlugins.onIoScheduler(IO);
    }

    static {
        //省略無關程式碼

        //2.IO物件是在靜態程式碼塊中例項化的,這裡會建立按一個IOTask()
        IO = RxJavaPlugins.initIoScheduler(new IOTask());
    }

    static final class IOTask implements Callable<Scheduler> {
        @Override
        public Scheduler call() throws Exception {
            //3.IOTask中會返回一個IoHolder物件
            return IoHolder.DEFAULT;
        }
    }

    static final class IoHolder {
        //4.IoHolder中會就是new一個IoScheduler物件出來
        static final Scheduler DEFAULT = new IoScheduler();
    }

可以看到,Schedulers.io()中使用了靜態內部類的方式來創建出了一個單例IoScheduler物件出來,這個IoScheduler是繼承自Scheduler的。這裡mark一發,後面會用到這個IoScheduler的。

5.2.1.3 Observable類的subscribeOn()

然後,我們就來看下subscribeOn()的程式碼:

    public final Observable<T> subscribeOn(Scheduler scheduler) {
        //省略無關程式碼
        return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
    }

可以看到,首先會將當前的Observable(其具體實現為ObservableCreate)包裝成一個新的ObservableSubscribeOn物件。
放個圖:
ObservableSubscribeOn.png

跟前面一樣,RxJavaPlugins.onAssembly()也是將ObservableSubscribeOn物件原樣返回而已,這裡就不看了。
可以看下ObservableSubscribeOn的構造方法:

5.2.1.4 ObservableSubscribeOn類的構造方法
    public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) {
        super(source);
        this.scheduler = scheduler;
    }

也就是把sourcescheduler這兩個儲存一下,後面會用到。

然後subscribeOn()方法就完了。好像也沒做什麼,就是重新包裝一下物件而已,然後將新物件返回。即將一箇舊的被觀察者包裝成一個新的被觀察者。

5.2.1.5 ObservableSubscribeOn類的subscribeActual()

接下來我們回到訂閱過程,為什麼要回到訂閱過程呢?因為事件的傳送是從訂閱過程開始的啊。
雖然我們這裡用到了執行緒切換,但是呢,其訂閱過程前面的內容跟上一節分析的是一樣的,我們這裡就不重複了,直接從不一樣的地方開始。還記得訂閱過程中Observable類的subscribeActual()是個抽象方法嗎?因此要看其子類的具體實現。在上一節訂閱過程中,其具體實現是在ObservableCreate類。但是由於我們呼叫subscribeOn()之後,ObservableCreate物件被包裝成了一個新的ObservableSubscribeOn物件了。因此我們就來看看ObservableSubscribeOn類中的subscribeActual()方法:

    @Override
    public void subscribeActual(final Observer<? super T> s) {
        final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);

        s.onSubscribe(parent);

        parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
    }

subscribeActual()中同樣也將我們自定義的Observer給包裝成了一個新的SubscribeOnObserver物件。同樣,放張圖:
SubscribeOnObserver.png
然後就是呼叫ObserveronSubscribe()方法,可以看到,到目前為止,還沒出現過任何執行緒相關的東西,所以ObserveronSubscribe()方法就是執行在當前執行緒中。
然後我們重點看下最後一行程式碼,首先建立一個SubscribeTask物件,然後就是呼叫scheduler.scheduleDirect().。
我們先來看下SubscribeTask類:

5.2.1.6 SubscribeTask類
    //SubscribeTask是ObservableSubscribeOn的內部類
    final class SubscribeTask implements Runnable {
        private final SubscribeOnObserver<T> parent;

        SubscribeTask(SubscribeOnObserver<T> parent) {
            this.parent = parent;
        }

        @Override
        public void run() {
            //這裡的source就是我們自定義的Observable物件,即ObservableCreate
            source.subscribe(parent);
        }
    }

很簡單的一個類,就是實現了Runnable介面,然後run()中呼叫Observer.subscribe()

5.2.1.7 Scheduler類的scheduleDirect()

再來看下scheduler.scheduleDirect()方法

    public Disposable scheduleDirect(@NonNull Runnable run) {
        return scheduleDirect(run, 0L, TimeUnit.NANOSECONDS);
    }

往下看:

    public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {

        //createWorker()在Scheduler類中是個抽象方法,所以其具體實現在其子類中
        //因此這裡的createWorker()應當是在IoScheduler中實現的。
        //Worker中可以執行Runnable
        final Worker w = createWorker();

        //實際上decoratedRun還是這個run物件,即SubscribeTask
        final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);

        //將Runnable和Worker包裝成一個DisposeTask
        DisposeTask task = new DisposeTask(decoratedRun, w);

        //Worker執行這個task
        w.schedule(task, delay, unit);

        return task;
    }

我們來看下建立WorkerWorker執行任務的過程。

5.2.1.8 IoScheduler的createWorker()和schedule()
    final AtomicReference<CachedWorkerPool> pool;

    public Worker createWorker() {
        //就是new一個EventLoopWorker,並且傳一個Worker快取池進去
        return new EventLoopWorker(pool.get());
    }

    static final class EventLoopWorker extends Scheduler.Worker {
        private final CompositeDisposable tasks;
        private final CachedWorkerPool pool;
        private final ThreadWorker threadWorker;

        final AtomicBoolean once = new AtomicBoolean();

        //構造方法
        EventLoopWorker(CachedWorkerPool pool) {
            this.pool = pool;
            this.tasks = new CompositeDisposable();
            //從快取Worker池中取一個Worker出來
            this.threadWorker = pool.get();
        }

        @NonNull
        @Override
        public Disposable schedule(@NonNull Runnable action, long delayTime, @NonNull TimeUnit unit) {
            //省略無關程式碼

            //Runnable交給threadWorker去執行
            return threadWorker.scheduleActual(action, delayTime, unit, tasks);
        }
    }

注意,不同的Scheduler類會有不同的Worker實現,因為Scheduler類最終是交到Worker中去執行排程的。

我們來看下Worker快取池的操作:

5.2.1.9 CachedWorkerPool的get()
    static final class CachedWorkerPool implements Runnable {
        ThreadWorker get() {
            if (allWorkers.isDisposed()) {
                return SHUTDOWN_THREAD_WORKER;
            }
            while (!expiringWorkerQueue.isEmpty()) {
                //如果緩衝池不為空,就從快取池中取threadWorker
                ThreadWorker threadWorker = expiringWorkerQueue.poll();
                if (threadWorker != null) {
                    return threadWorker;
                }
            }

            //如果緩衝池中為空,就建立一個並返回。
            ThreadWorker w = new ThreadWorker(threadFactory);
            allWorkers.add(w);
            return w;
        }
    }
5.2.1.10 NewThreadWorker的scheduleActual()

我們再來看下threadWorker.scheduleActual()
ThreadWorker類沒有實現scheduleActual()方法,其父類NewThreadWorker實現了該方法,我們點進去看下:

public class NewThreadWorker extends Scheduler.Worker implements Disposable {
    private final ScheduledExecutorService executor;

    volatile boolean disposed;

    public NewThreadWorker(ThreadFactory threadFactory) {
        //構造方法中建立一個ScheduledExecutorService物件,可以通過ScheduledExecutorService來使用執行緒池
        executor = SchedulerPoolFactory.create(threadFactory);
    }

    public ScheduledRunnable scheduleActual(final Runnable run, long delayTime, @NonNull TimeUnit unit, @Nullable DisposableContainer parent) {
        //這裡的decoratedRun實際還是run物件
        Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
        //將decoratedRun包裝成一個新物件ScheduledRunnable
        ScheduledRunnable sr = new ScheduledRunnable(decoratedRun, parent);

        //省略無關程式碼

        if (delayTime <= 0) {
            //執行緒池中立即執行ScheduledRunnable
            f = executor.submit((Callable<Object>)sr);
        } else {
            //執行緒池中延遲執行ScheduledRunnable
            f = executor.schedule((Callable<Object>)sr, delayTime, unit);
        }

        //省略無關程式碼

        return sr;
    }
}

這裡的executor就是使用執行緒池去執行任務,最終SubscribeTaskrun()方法會線上程池中被執行,即Observablesubscribe()方法會在IO執行緒中被呼叫。這與上面例子中的輸出結果符合:

Observable subscribe() 所線上程為 :RxCachedThreadScheduler-1
5.2.1.11 簡單總結
  1. Observer(觀察者)的onSubscribe()方法執行在當前執行緒中,因為在這之前都沒涉及到執行緒切換。
  2. 如果設定了subscribeOn(指定執行緒),那麼Observable(被觀察者)中subscribe()方法將會執行在這個指定執行緒中去。
5.2.1.12 時序圖

來張總的subscribeOn()切換執行緒時序圖
subscribeOn()切換執行緒時序圖.png

5.2.1.13 多次設定subscribeOn()的問題

如果我們多次設定subscribeOn(),那麼其執行執行緒是在哪一個呢?先來看下例子

        //省略前後程式碼,看重點部分
        .subscribeOn(Schedulers.io())//第一次
        .subscribeOn(Schedulers.newThread())//第二次
        .subscribeOn(AndroidSchedulers.mainThread())//第三次

其輸出結果為:

Observable subscribe() 所線上程為 :RxCachedThreadScheduler-1

即只有第一次的subscribeOn()起作用了。這是為什麼呢?
我們知道,每呼叫一次subscribeOn()就會把舊的被觀察者包裝成一個新的被觀察者,經過了三次呼叫之後,就變成了下面這個樣子:
多次設定subscribeOn().png
同時,我們知道,被觀察者被訂閱時是從最外面的一層通知到裡面的一層,那麼當傳到上圖第三層時,也就是ObservableSubscribeOn(第一次)那一層時,管你之前是在哪個執行緒,subscribeOn(Schedulers.io())都會把執行緒切到IO執行緒中去執行,所以多次設定subscribeOn()時,只有第一次生效。

5.2.2 observeOn()

我們再來看下observeOn(),還是先來回顧一下我們例子中的設定:

    //指定在Android主執行緒中執行
    .observeOn(AndroidSchedulers.mainThread())
5.2.2.1 Observable類的observeOn()
    public final Observable<T> observeOn(Scheduler scheduler) {
        return observeOn(scheduler, false, bufferSize());
    }

    public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
        //省略無關程式碼
        return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize));
    }

同樣,這裡也是新包裝一個ObservableObserveOn物件,注意,這裡包裝的舊被觀察者是ObservableSubscribeOn物件了,因為之前呼叫過subscribeOn()包裝了一層了,所以現在是如下圖所示:
ObservableObserveOn.png

RxJavaPlugins.onAssembly()也是原樣返回。

我們看看ObservableObserveOn的構造方法。

5.2.2.2 ObservableObserveOn類的構造方法
    public ObservableObserveOn(ObservableSource<T> source, Scheduler scheduler, boolean delayError, int bufferSize) {
        super(source);
        this.scheduler = scheduler;
        this.delayError = delayError;
        this.bufferSize = bufferSize;
    }

裡面就是一些變數賦值而已。

5.2.2.3 ObservableObserveOn的subscribeActual()

subscribeOn()差不多,我們就直接來看ObservableObserveOnsubscribeActual()方法了。

    @Override
    protected void subscribeActual(Observer<? super T> observer) {
        //判斷是否當前執行緒
        if (scheduler instanceof TrampolineScheduler) {
            //是當前執行緒的話,直接呼叫裡面一層的subscribe()方法
            //即呼叫ObservableSubscribeOn的subscribe()方法
            source.subscribe(observer);
        } else {
            //建立Worker
            //本例子中的scheduler為AndroidSchedulers.mainThread()
            Scheduler.Worker w = scheduler.createWorker();
            //這裡會將Worker包裝到ObserveOnObserver物件中去
            //注意:source.subscribe沒有涉及到Worker,所以還是在之前設定的執行緒中去執行
            //本例子中source.subscribe就是在IO執行緒中執行。
            source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
        }
    }

同樣,這裡也將observer給包裝了一層,如下圖所示:
ObserveOnObserver.png

source.subscribe()中將會把事件逐一發送出去,我們這裡只看下ObserveOnObserver中的onNext()方法的處理,onComplete()等就不看了,實際上都差不多。

5.2.2.4 ObserveOnObserver的onNext()
        @Override
        public void onNext(T t) {
            //省略無關程式碼
            if (sourceMode != QueueDisposable.ASYNC) {
                //將資訊存入佇列中
                queue.offer(t);
            }
            schedule();
        }

就是呼叫schedule()而已。

5.2.2.5 ObserveOnObserver的schedule()
        void schedule() {
            if (getAndIncrement() == 0) {
                //ObserveOnObserver同樣實現了Runnable介面,所以就把它自己交給worker去排程了
                worker.schedule(this);
            }
        }

Android主執行緒排程器裡面的程式碼就不分析了,裡面實際上是用handler來發送Message去實現的,感興趣的可以看下。
既然ObserveOnObserver實現了Runnable介面,那麼就是其run()方法會在主執行緒中被呼叫。
我們來看下ObserveOnObserverrun()方法:

5.2.2.6 ObserveOnObserver的run()
        @Override
        public void run() {
            //outputFused預設是false
            if (outputFused) {
                drainFused();
            } else {
                drainNormal();
            }
        }

這裡會走到drainNormal()方法。

5.2.2.7 ObserveOnObserver的drainNormal()
        void drainNormal() {
            int missed = 
            
           

相關推薦

RxJava訊息訂閱執行切換原理

本文由玉剛說寫作平臺提供寫作贊助,版權歸玉剛說微信公眾號所有 原作者:四月葡萄 版權宣告:未經玉剛說許可,不得以任何形式轉載 1.前言 本文主要是對RxJava的訊息訂閱和執行緒切換進行原始碼分析,相關的使用方式等不作詳細介紹。 本文原始碼基於

RxJava 執行切換原理

  RxJava的執行緒切換主要涉及到 observeOn(),subscribeOn() 我們來分析一下這兩個方法是怎麼做到切換的。 observeOn()作用於上一個構造好的Observable例項,RxJava設計比較巧妙的地方是,把執行緒切換的操作也封裝成了Ob

RxJava執行切換原理

RxJava在圈子裡越來越火,相信很大的一個原因就是它的執行緒切換。它的執行緒切換可以用優雅來形容,鏈式呼叫,簡單、方便。今天,就讓我們來窺探一下RxJava的執行緒切換原理。本次拆輪子,還是按原樣,通過小例子,研讀RxJava原始碼等來理解整個過程、結構、原理,我們首要的

RxJava執行切換原理分析

 Scheduler翻譯成中文就是“排程”的意思,在RxJava裡就是執行緒之間的排程,也就是執行緒之間的切換。  從圖中可以簡單看出,SingleScheduler、ComputationScheduler、IoScheduler、NewThreadSchedule

RxJava(RxAndroid)執行切換機制

  自從專案中使用RxJava以來,可以很方便的切換執行緒。至於是怎麼實現的,一直沒有深入的研究過!本篇文章就是分析RxJava的執行緒模型。   RxJava基本使用   先上一個平時使用RxJava切換執行緒的例子:    Observable

RxJava2探索-執行切換原理之subscribeOn

前言 說起來有點丟人,上週去某公司面試,做足了什麼像java記憶體模型、hashmap原理、設計模式、Android多執行緒、自定義View等等相關的知識準備,然而面試的時候,前面幾個一個沒問!!!自定義view考察了onmeasure和Mnuspace那幾個

Android中的訊息佇列執行佇列機制

下面是訊息機制中幾個重要成員的關係圖: 一個Activity中可以創建出多個工作執行緒,如果這些執行緒把他們訊息放入Activity主執行緒的訊息佇列中,那麼訊息就會在主執行緒中處理了。因為主執行緒一般負責檢視元件的更新操作,對於不是執行緒安全的檢視元件來說,這種方式能夠很好的實現檢視的更新 。  

一文帶你懟明白程序執行通訊原理

程序間通訊 程序是需要頻繁的和其他程序進行交流的。例如,在一個 shell 管道中,第一個程序的輸出必須傳遞給第二個程序,這樣沿著管道進行下去。因此,程序之間如果需要通訊的話,必須要使用一種良好的資料結構以至於不能被中斷。下面我們會一起討論有關 程序間通訊(Inter Process Communicatio

RxJava執行切換代替ThreadHandler

在我們的日常開發中,我們可能會經常涉及到執行緒的切換,比如:需要在子執行緒中載入資料庫中的資料,一般情況下,我們會這樣做: new Thread(new Runnable() { @Override pub

01-實現多執行切換排程實驗介紹(協程原理

本系列文章旨在記錄完成一個小型使用者級執行緒建立、切換與排程框架的過程,並不能代替作業系統為我們提供的執行緒框架。 為什麼要寫這樣的東西呢?目的很簡單,僅僅在於學習作業系統中的程序或執行緒切換與排程原理。所以,請不要把程式碼試圖用到你的工程或者專案中,出了問題

EventBus 訊息執行切換模型與實現原理

一. 序 EventBus 是一個基於觀察者模式的事件訂閱/釋出框架,利用 EventBus 可以在不同模組之間,實現低耦合的訊息通訊。 EventBus 因為其使用簡單且穩定,被廣泛應用在一些生產專案中。 通常我們就是使用 EventBus 分發一些訊息給訊息的訂閱者,除此之外我們還可以通過 EventBu

RxJava 執行切換

RxJava中的執行緒切換 一般來說我們在使用網路請求資料後需要使用View來顯示,網路請求當然是在子執行緒中執行,當獲取到資料後就需要切換到UI執行緒來顯示,否則會報錯,RxJava中已經幫我們處理好了執行緒切換的問題。 Observable類兩個介面 @Sc

Rxjava執行變換原理

public final Observable<T> subscribeOn(Scheduler scheduler) { if (this instanceof ScalarSynchronousObservable) {

Java的Executor框架執行池實現原理

一,Java的Executor框架 1,Executor介面 public interface Executor {     void execute(Runnable command); } Executor介面是Executor框架中最基礎的部分,定義了一個用於

RxJava裡doOnNext的使用執行處理

doOnNext的使用 我對doOnNext的使用是存在疑惑的,按照官方文件 The doOnNext operator is much like doOnEach(Action1) except that the Action that you pa

Android訊息機制原理,仿寫Handler Looper原始碼解析跨執行通訊原理--之仿寫模擬Handler(四)

前篇總結:上一篇實現了用Looper管理訊息佇列和訊息迴圈,但是訊息的傳送和訊息處理都是在Looper中進行的。寫一個子執行緒使用這樣的Looper怎麼才能獲取到loop()死迴圈訊息佇列取出的訊息呢?用回撥!callBack! 第四節 仿寫Handler來發送訊息,實現回

迷之RxJava —— 執行切換

RxJava最迷人的是什麼? 答案就是把非同步序列寫到一個工作流裡!和javascript的Promise/A如出一轍。 OK,在java中做非同步的事情在我們傳統理解過來可不方便,而且,如果要讓非同步按照我們的工作流來,就更困難了。 但是在RxJava中,我們只要呼叫

Android訊息機制原理,仿寫Handler Looper原始碼跨執行通訊原理--之執行間通訊原理(一)

前言:我們都知道Android的執行緒通訊是用Handler、Looper機制實現的,面試也經常問道,網上也有很多文章介紹原始碼但是可能很多小白只是機械是的記憶,回答不清楚原理究竟是怎麼回事。下邊我將一步一步仿寫一個Handler、Looper模擬Android的執行緒間通訊

java多執行學習之一——執行的狀態、上下文切換執行監控

多執行緒 執行緒的狀態 1. NEW(圖中初始狀態):一個剛建立而未啟動的執行緒處於該狀態。由於一個執行緒例項只能被啟動一次,因此一個執行緒只可能有一次處於該狀態。 2. 可執行(RUNNABLE):表示處於改狀態的執行緒可以被JVM的執行緒排程器(scheduler)進

Rxjava執行執行 切換 簡單實現

package com.zgt.demo01.rxjava; import com.zgt.demo01.os2.Handler; import com.zgt.demo01.os2.Message; public abstract class MObse