1. 程式人生 > >Rxjava 2.x 原始碼系列

Rxjava 2.x 原始碼系列

前言

本篇部落格講解的 Rxjava 的原理基於版本 2.1.4,RxAndroid 的原理的版本基於 2.0.2 。

基本框架

Rxjava 有四個基本的概念

  • Observable (可觀察者,即被觀察者)
  • Observer (觀察者)
  • subscribe (訂閱) 通過該方法,將 Observable 與 Observer 關聯起來
  • 事件 (包括 onNext,onComplete,onError 等事件)

簡單來說:Observable 和 Observer 通過 subscribe() 方法實現訂閱關係,從而 Observable 可以在需要的時候發出事件來通知 Observer,並且回撥 Observer 的相應的方法。

用一張簡單的圖來描述大概如下

Observable

public abstract class Observable<T> implements ObservableSource<T> {
}

可以看到 Observable 是一個抽象類,實現了 ObservableSource 介面

Observer

Observer 其實也是一個介面,裡面定義了若干方法,onSubscribe ,onNext,onError,onComplete 方法。

public interface Observer<T> {


    void
onSubscribe(@NonNull Disposable d); void onNext(@NonNull T t); void onError(@NonNull Throwable e); void onComplete(); }
  • 一個正常的事件序列的呼叫順序會是這樣的 onSubscribe > onNext > onComplete,若中途出錯了,那呼叫順序可能是這樣的 onSubscribe > onNext > onError
  • onSubscribe 方法,當我們呼叫 Observable 的 subscribe 方法的時候,會先回調 Observer 的 onSubscribe 方法,此方法的呼叫順序先於 onNext,onError ,onComplete 方法。
  • onError 方法與 onComplete 方法可以說是互斥的,呼叫了其中一個方法就不會呼叫另外一個方法

原始碼解析

基本使用

在講解原理之前,我們先來看一下 Rxjava 的一個基本使用。

Observable
           .create(new ObservableOnSubscribe<String>() {
                @Override
                public void subscribe(ObservableEmitter<String> emitter) throws Exception {
                    emitter.onNext("a");
                    emitter.onNext("b");
                    emitter.onNext("c");
                    emitter.onComplete();
                }
            })
            .subscribe(new Observer<String>() {
                @Override
                public void onSubscribe(Disposable d) {
                    Log.e("TAG", "onSubscribe():  ");
                }

                @Override
                public void onNext(String s) {
                    Log.e("TAG", "onNext():  " + s);
                }

                @Override
                public void onError(Throwable e) {

                }

                @Override
                public void onComplete() {
                    Log.e("TAG", "onComplete():  ");
                }
            });
E/TAG: onSubscribe():  
E/TAG: onNext():  a
E/TAG: onNext():  b
E/TAG: onNext():  c
E/TAG: onComplete():

首先我們先從上面簡單的例子回顧起:

先來看 Observable 的 create 方法

public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
    ObjectHelper.requireNonNull(source, "source is null");
    return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
}

在 create 方法中,其實很簡單,只是對 source 進行判空處理,並將 source 用 ObservableCreate 包裝起來,並返回回去。下面讓我們一起來看一下 ObservableCreate 是什麼東西?

public final class ObservableCreate<T> extends Observable<T> {
    final ObservableOnSubscribe<T> source;

    public ObservableCreate(ObservableOnSubscribe<T> source) {
        this.source = source;
    }

    @Override
    protected void subscribeActual(Observer<? super T> observer) {
        CreateEmitter<T> parent = new CreateEmitter<T>(observer);
        observer.onSubscribe(parent);

        try {
            source.subscribe(parent);
        } catch (Throwable ex) {
            Exceptions.throwIfFatal(ex);
            parent.onError(ex);
        }
    }

ObservableCreate 其實也很簡單,它是 Observable 的子類,持有了上游 source 的引用,並重寫 subscribeActual 方法。

接下來我們來看重點了,即 Observable 的 subscribe 方法,在該方法中,他會將 Observalble 與 observer 關聯起來。

@SchedulerSupport(SchedulerSupport.NONE)
@Override
public final void subscribe(Observer<? super T> observer) {
    // 檢查 observer 是否為 null,為 null 丟擲異常
    ObjectHelper.requireNonNull(observer, "observer is null");
    try {
       // RxJavaPlugins 外掛的,暫時不管
        observer = RxJavaPlugins.onSubscribe(this, observer);


      // 檢查 observer 是否為 null,為 null 丟擲異常
        ObjectHelper.requireNonNull(observer, "Plugin returned null Observer");

        subscribeActual(observer);
    } catch (NullPointerException e) { // NOPMD
        throw e;
    } catch (Throwable e) {
        Exceptions.throwIfFatal(e);
        // can't call onError because no way to know if a Disposable has been set or not
        // can't call onSubscribe because the call might have set a Subscription already
        RxJavaPlugins.onError(e);

        NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
        npe.initCause(e);
        throw npe;
    }
}

subscribe 方法也比較簡單,大概可以分為以下兩步:

  • 首先檢查 observer 是否為空,為 null 丟擲異常
  • 第二步,呼叫 subscribeActual 方法,而我們知道在 Observable 類中 subscribeActual 是抽象方法,因此,我們只需要關注其實現類的 subscribeActual 方法。從上面的分析,我們知道,當我們呼叫 Observable create(ObservableOnSubscribe source) 方法的時候,最終會返回 ObservableCreate 例項。因此,我們只需要關注 ObservableCreate 的 subscribeActual 方法
public final class ObservableCreate<T> extends Observable<T> {
    final ObservableOnSubscribe<T> source;

    public ObservableCreate(ObservableOnSubscribe<T> source) {
        this.source = source;
    }

    @Override
    protected void subscribeActual(Observer<? super T> observer) {
        CreateEmitter<T> parent = new CreateEmitter<T>(observer);
        observer.onSubscribe(parent);

        try {
            source.subscribe(parent);
        } catch (Throwable ex) {
            Exceptions.throwIfFatal(ex);
            parent.onError(ex);
        }
    }

    ----
}

ObservableCreate 的核心程式碼主要也只有幾行,source 是上游 ObservableOnSubscribe 的引用,而 CreateEmitter 這個類,它是 ObservableCreate 的一個靜態內部類,實現了 ObservableEmitter,Disposable 介面 它持有 observer 的引用,當我們呼叫 CreateEmitter 的 next 方法的時候,它會判斷當前的 CreateEmitter 有沒有被 dispose 掉,如果沒有,呼叫他持有的 observer 的 onNext 方法, 同理 onComplete 方法一一樣,只不過執行完 onComplete 方法的時候,還會執行 dispose 方法,dispose 當前的 CreateEmitter。(dispose 方法這裡先記住以下,下面會講到

static final class CreateEmitter<T>
extends AtomicReference<Disposable>
implements ObservableEmitter<T>, Disposable {


    private static final long serialVersionUID = -3434801548987643227L;

    final Observer<? super T> observer;

    CreateEmitter(Observer<? super T> observer) {
        this.observer = observer;
    }

    @Override
    public void onNext(T t) {
        if (t == null) {
            onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
            return;
        }
        if (!isDisposed()) {
            observer.onNext(t);
        }
    }

    @Override
    public void onError(Throwable t) {
        if (!tryOnError(t)) {
            RxJavaPlugins.onError(t);
        }
    }

    @Override
    public boolean tryOnError(Throwable t) {
        if (t == null) {
            t = new NullPointerException("onError called with null. Null values are generally not allowed in 2.x operators and sources.");
        }
        if (!isDisposed()) {
            try {
                observer.onError(t);
            } finally {
                dispose();
            }
            return true;
        }
        return false;
    }

    @Override
    public void onComplete() {
        if (!isDisposed()) {
            try {
                observer.onComplete();
            } finally {
                dispose();
            }
        }
    }

    @Override
    public void setDisposable(Disposable d) {
        DisposableHelper.set(this, d);
    }

    @Override
    public void setCancellable(Cancellable c) {
        setDisposable(new CancellableDisposable(c));
    }

    @Override
    public ObservableEmitter<T> serialize() {
        return new SerializedEmitter<T>(this);
    }

    @Override
    public void dispose() {
        DisposableHelper.dispose(this);
    }

    @Override
    public boolean isDisposed() {
        return DisposableHelper.isDisposed(get());
    }
}

好,看完上面的程式碼,我們回到 ObservableCreate 的 subscribeActual 方法,我們呼叫 observer.onSubscribe 方法的時候,會將 parent 物件作為方法引數暴露出去(而這個 parent 正是我們的 CreateEmitter,通過 CreateEmitter 的 dispose 方法可以取消訂閱關係)。接著,當我們呼叫 source.subscribe(parent) 的時候,會呼叫 ObservableOnSubscribe 的 subscribe 方法。

    CreateEmitter<T> parent = new CreateEmitter<T>(observer);
    observer.onSubscribe(parent);

    try {
        source.subscribe(parent);
    } catch (Throwable ex) {
        Exceptions.throwIfFatal(ex);
        parent.onError(ex);
    }

因此,在我們上面的例子中,若不出錯,呼叫順序

Observable subcrible > Observable subscribeActual > ObservableCreate subscribeActual > observer.onSubscribe > ObservableOnSubscribe subscribe(emitter 是 CreateEmitter 的例項,包裝了 observer,呼叫 emitter 的相應方法 ,會進而呼叫 observer 的 onNext onComplete 方法,而不會呼叫 onError 方法)

若在呼叫 onNext 方法的過程中出錯,那呼叫順序可能是這樣的

Observable subcrible > Observable subscribeActual > ObservableCreate subscribeActual > observer.onSubscribe > ObservableOnSubscribe subscribe(@NonNull ObservableEmitter emitter)
(emitter 是 CreateEmitter 的例項,包裝了 observer,呼叫 emitter 的相應方法 ,會進而呼叫 observer 的 onNext onError 方法,而不會呼叫 onComplete 方法 )

observable 與 Observer 是如何取消訂閱關係的

在上面講解的時候,其實我們已經有提到 CreateEmitter 的 dispose 方法,該方法就是用來取消訂閱關係的。

假設這樣一個場景,當我們收到的 value 的值大於等於 2 的時候,這個時候認為是異常的,解決兩者之間的訂閱關係

    Observable<Integer> observable=Observable.create(new ObservableOnSubscribe<Integer>() {

            @Override
            public void subscribe(ObservableEmitter<Integer> e) throws Exception {
                e.onNext(1);
                e.onNext(2);
                e.onNext(3);
                e.onNext(4);
                e.onComplete();
            }
        });

    Observer<Integer> observer = new Observer<Integer>() {
            private Disposable disposable;

            @Override
            public void onSubscribe(Disposable d) {
                disposable = d;
            }

            @Override
            public void onNext(Integer value) {
                Log.d("xujun", value.toString());
                if (value >=2) {   // >=2  時為異常資料,解除訂閱
                    disposable.dispose();
                }
            }

            @Override
            public void onError(Throwable e) {

            }

            @Override
            public void onComplete() {



            }
        };

    observable.subscribe(observer); //建立訂閱關係
D/xujun: 1
   2

總結

Rxjava 的原理其實不難,Observable 和 Observer 通過 subscribe() 方法實現訂閱關係,從而 Observable 可以在需要的時候發出事件來通知 Observer,並且回撥 Observer 的相應的方法。

用一張簡單的流程圖描述如下:

下一篇部落格,將會講解到 Rxjava 的執行緒切換問題,敬請期待。

相關推薦

Rxjava 2.x 原始碼系列

前言 本篇部落格講解的 Rxjava 的原理基於版本 2.1.4,RxAndroid 的原理的版本基於 2.0.2 。 基本框架 Rxjava 有四個基本的概念 Observable (可觀察者,即被觀察者) Obs

RxJava 2.x 教程及原始碼揭祕(一)入門理解及基本操作符

目錄 前言 Rxjava的介紹 Rxjava的優勢 Rxjava是觀察者模式 Rxjava是裝飾者模式 Observable Rxjava的操作符 subScribeOn與observeOn切換執行緒 其他操作符 補充 前言   &nbs

RxJava 2.x 教程及原始碼揭祕(三)Rxjava操作符原始碼解析

本文將探究: 知道執行緒排程是怎麼實現的 知道操作符是怎麼實現的 RxJava最強大的莫過於它的執行緒排程 和 花式操作符。 map操作符 map是一個高頻的操作符,我們首先拿他開刀。 例子如下,源頭Observable傳送的是String型別的數字,利用map轉換成

RxJava 2.x 理解-1

nbsp div lan ble user activit ted 需要 androi 在RxJava 1.x 系列中,講解了RxJava的大致用法,因為現在都用RxJava 2了,所以Rxjava 1就不細講,主要來學習RxJava 2。 基本使用: /**

RxJava 2.x 理解-3

com span gpo log div .com itl http 教程 背壓: 給初學者的RxJava2.0教程(四) 給初學者的RxJava2.0教程(五) 給初學者的RxJava2.0教程(六) 給初學者的RxJava2.0教程(七) 給初學者的RxJava2.0教

RxJava 2.x 之圖解建立、訂閱、發射流程

從一個例子開始 建立過程 訂閱過程 發射過程 小結 從一個例子開始 Observable.create(new ObservableOnSubscribe<Integer>() { @Override

RxJava 2.x 之條件操作符

條件操作符 all操作符 ambArray操作符 contains操作符 any操作符 isEmpty操作符 defaultIfEmpty操作符 switchIfEmpty操作符 sequenceEqual操作符 takeUnt

RxJava 2.x 之聚合操作符

聚合操作符 startWith操作符 startWithArray操作符 concat/concatArray操作符 merge/mergeArray操作符 concatDelayError/mergeDelayError操作符 zip操作符

RxJava 2.x 之過濾操作符

最近幾天想把rxjava2的操作符都整理一下,看到網上的很多文章都總結的很好,但是時間久了依然會忘記。 過濾操作符 filter操作符 take操作符 takeLast操作符 firstElement/lastElement操作符 first/la

RxJava 2.x 之建立操作符

最近幾天想把Rxjava2的操作符都整理一下,看到網上的很多文章都總結的很好,但是時間久了依然會忘記。 建立操作符 just操作符 fromArray操作符 empty操作符 error操作符 never操作符 fromIterable操作符

RxJava 2.x 之變換操作符

變換操作符 map操作符 flatMap操作符 flatMapIterable操作符 concatMap操作符 switchMap操作符 cast操作符 scan操作符 buffer操作符 toList操作符 groupB

Rxjava 2.x筆記(一)

這麼火的框架,現在才開始學,實在是有點落伍(太落伍)了。因為2.x是獨立於1.x的存在,所以為了儘快趕上時代潮流,本文基於2.x版本,整理一下學習中2.x的知識。因為是做筆記,所以肯定要參考大神的部落格了。(當然,其中的程式碼咱會先敲一遍再貼上的,原來是用筆記本打筆記,當然查詢的時候又不好查

RxJava 2.x 入門

之前只大概瞭解RxJava,並沒在實際的專案中實戰過,但最近在研究訊飛語音的一個demo的時候發現,他們都在使用mvvm,dagger2,rxjava2.x, 姿態很優雅,很吸引人,心想,臥槽再不嘗試一下就落後了,於是決定在專案中採用這些優秀的框架,與時俱進。在這裡記錄梳理一下Rxjava2.x

Quartz_2.2.X學習系列二十:Example 8

Demonstrates how a Holiday calendar can be used to exclude execution of jobs on a holiday ---------------------------------------------

RxJava 2.X 中的observable鏈是怎樣形成的?

要理解RxJava框架,就需要理清楚其鏈路是怎樣形成的。 先看一段簡單的程式碼: Observable.create(new ObservableOnSubscribe<String>() { @Override

Android中rxJava 2.x的observeOn報錯NoClassdefound

rxJava 2.x Observable回撥observeOn報錯的問題 最近從rxJava 1.x轉到新版rxJava 2.x時,在使用Observable回撥observeOn(AndroidSchedulers.mainThread())時,報了以下錯

RxJava 2.x 使用最佳實踐

以前寫過 Rxjava 系列教程, 如下所示 上面的這些教程覆蓋了 rxjava 的方方面面,很詳細。只是當時寫的時候是基於 rxjava 1.X 的版本寫的,後來 rxjava 進入了快速迭代的時期,很快就出現了 2.x 版本。根據 Rxjava

這可能是最好的RxJava 2.x 教程

為什麼要學 RxJava? 提升開發效率,降低維護成本一直是開發團隊永恆不變的宗旨。近兩年來國內的技術圈子中越來越多的開始提及 RxJava ,越來越多的應用和麵試中都會有 RxJava ,而就目前的情況,Android 的網路庫基本被 Retrofit + OkHttp 一統天下了,而配合上響應式

spark 2.x 原始碼分析 之 Logistic Regression 邏輯迴歸

Logistic Regression 邏輯迴歸 注:第一次寫部落格,希望互相交流改進。如果公式顯示不完整,請看github原文 一、二元邏輯迴歸 1、簡介 迴歸是解決變數之間的對映關係(x->y),而邏輯迴歸則通過sigmoi