1. 程式人生 > >Rxjava2原始碼解析超詳細~~~

Rxjava2原始碼解析超詳細~~~

轉載:Est 的小窩 原文地址:https://blog.codeest.moe/2017/03/25/android-rxjava2/?utm_medium=email&utm_source=gank.io

前言

和之前的 Glide 篇一樣,這篇 RxJava2 原始碼分析也會先列出一些要點,然後按這些點一步一步分析
注意,文章中的 RxJava 均是指RxJava2

  • Rxjava 中的主要型別
  • Rxjava 訂閱與終止訂閱的過程(Observable、Observer、create、just、dispose)
  • Rxjava 操作符原理(map、lift、compose)
  • Rxjava 執行緒排程原理(subscribeOn、observeOn、io、main)
  • Rxjava 背壓處理原理(buffer、latest、drop)
  • Rxjava 冷熱 Observable(publish、share、connect、refCount)
  • Rxjava 封裝庫 RxBinding 原理

RxJava 中的主要型別

開始之前先梳理下幾個關鍵類的作用和他們之間的關係

  • Observable 被觀察者(事件源),不處理背壓
  • Observer 觀察者,用於訂閱Observable
  • Subject 繼承了Observable實現了Observer,既可做觀察者也可做被觀察者,通常作為兩者的橋樑或代理

  • Flowable(Publisher) 被觀察者(事件源),有背壓處理策略

  • Subscriber 觀察者,用於訂閱Flowable
  • Processor 實現類FlowableProcessor繼承了Flowable實現了FlowableSubscriber,類似Subject

  • Single/SingleObserver 僅發生一次訊息,遵循onSubscribe (onSuccess | onError)?

  • Completable/CompletableObserver 僅發生一次訊息,遵循onSubscribe (onComplete | onError)?
  • Maybe/MaybeObserver 僅發生一次訊息,遵循onSubscribe (onSuccess | onError | onComplete)?

  • Disposable 替代了 RxJava1 中的Subscription ,實現該介面的資源具備可被取消 (dispose) 的能力

  • Subscription 在Subscriber訂閱時回撥的物件,具備拉取 (request) 和取消訂閱 (cancel) 的能力

RxJava 訂閱與終止訂閱的過程

這裡先以最基礎的Observable.create為例

123456789101112131415161718192021222324252627
Observable.create(new ObservableOnSubscribe<String>() {            @Override            public void subscribe(ObservableEmitter<String> e) throws Exception {            }        })        .subscribe(new Observer<String>() {            @Override            public void onSubscribe(Disposable d) {            }            @Override            public void onNext(String s) {            }            @Override            public void onError(Throwable e) {            }            @Override            public void onComplete() {            }        });

進入create方法,requireNonNull只是一個簡單的判空處理,然後由onAssembly返回Observable<T>物件

12345678910111213141516
@CheckReturnValue   @SchedulerSupport(SchedulerSupport.NONE)   public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {       ObjectHelper.requireNonNull(source, "source is null");       return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));   }   @SuppressWarnings({ "rawtypes", "unchecked" })   @NonNull   public static <T> Observable<T> onAssembly(@NonNull Observable<T> source) {       Function<? super Observable, ? extends Observable> f = onObservableAssembly;       if (f != null) {           return apply(f, source);       }       return source;   }

onAssembly是一個具有 hook 作用的方法,它會判斷它的Function型別成員變數onObservableAssembly是否存在,不存在則直接把傳入的引數返回,存在則把經過onObservableAssembly處理後的結果返回,相當於提供了一層允許插入額外操作的 hook 層。在當前場景下它直接返回了我們建立的ObservableCreate<T>,它是一個繼承了Observable的類,之後我們會呼叫Observablesubscribe方法來完成訂閱

12345678910111213141516171819
@SchedulerSupport(SchedulerSupport.NONE)    @Override    public final void subscribe(Observer<? super T> observer) {        ObjectHelper.requireNonNull(observer, "observer is null");        try {            observer = RxJavaPlugins.onSubscribe(this, observer);            ObjectHelper.requireNonNull(observer, "Plugin returned null Observer");            subscribeActual(observer);        } catch (NullPointerException e) { // NOPMD            throw e;        } catch (Throwable e) {            Exceptions.throwIfFatal(e);            RxJavaPlugins.onError(e);            NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");            npe.initCause(e);            throw npe;        }

訂閱方法中的requireNonNullRxJavaPlugins就不再贅述了,最後執行了subscribeActual方法,這裡是實際完成訂閱的地方。

1234567891011121314151617181920
public final class ObservableCreate<T> extends Observable<T> {    final ObservableOnSubscribe<T> source;    public ObservableCreate(ObservableOnSubscribe<T> source) {        this.source = source;    }    @Override    protected void subscribeActual(Observer<? super T> observer) {        CreateEmitter<T> parent = new CreateEmitter<T>(observer);//觀察者傳入Emitter        observer.onSubscribe(parent);	//觸發訂閱時產生的回撥        try {            source.subscribe(parent); //發射器開始發射定義在事件源中的事件        } catch (Throwable ex) {            Exceptions.throwIfFatal(ex);            parent.onError(ex);        }    }}

回到ObservableCreate中,在subscribeActual裡首先建立了用於發射事件的CreateEmitter物件,CreateEmitter實現了介面EmitterDisposable, 並持有observer。當通過onNext發射事件時會傳遞給觀察者的onNext方法

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051
static final class CreateEmitter<T>    extends AtomicReference<Disposable>    implements ObservableEmitter<T>, Disposable {        private static final long serialVersionUID = -3434801548987643227L;        final Observer<? super T> observer;        CreateEmitter(Observer<? super T> observer) {            this.observer = observer;        }        @Override        public void onNext(T t) {            if (t == null) {                onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));                return;            }            if (!isDisposed()) {                observer.onNext(t);	//將Next事件給到observer            }        }        @Override        public void onError(Throwable t) {            if (t == null) {                t = new NullPointerException("onError called with null. Null values are generally not allowed in 2.x operators and sources.");            }            if (!isDisposed()) {                try {                    observer.onError(t);	//將Error事件給到observer並終止訂閱                } finally {                    dispose();                }            } else {                RxJavaPlugins.onError(t);            }        }        @Override        public void onComplete() {            if (!isDisposed()) {                try {                    observer.onComplete();	//將Complete事件給到observer並終止訂閱                } finally {                    dispose();                }            }        }    }

最後執行source.subscribe(parent);使資料來源開始經由發射器發射資料,至此整個建立過程就走通了

1234567
new ObservableOnSubscribe<String>() {            @Override            public void subscribe(ObservableEmitter<String> e) throws Exception {                e.onNext("123");                e.onComplete();            }        }

經過上面的分析,我們瞭解到了:

  • 每次將事件傳遞給觀察者時都會判斷isDisposed()檢查是否訂閱已經終止,一旦觸發了onError()onComplete()緊接著就會執行dispose()

  • 執行Observable.subscribe後才會在subscribeActual中完成實際的訂閱,並且開始執行發射器發射事件的程式碼,建立型操作符createdeferfromCallablejust等均遵循這個規則,稍稍需要注意的是,just()方法即使沒有訂閱也會立刻執行(是立刻執行該函式本身,不是開始發射資料),他會一開始就把我們要發射的內容作為value儲存下來

12345678910111213141516171819
public final class ObservableJust<T> extends Observable<T> implements ScalarCallable<T> {    private final T value;    public ObservableJust(final T value) {        this.value = value;    }    @Override    protected void subscribeActual(Observer<? super T> s) {        ScalarDisposable<T> sd = new ScalarDisposable<T>(s, value);        s.onSubscribe(sd);        sd.run();    }    @Override    public T call() {        return value;    }}

接下來分析終止訂閱的過程

1234567891011121314151617181920

相關推薦

Rxjava2原始碼解析詳細~~~

轉載:Est 的小窩 原文地址:https://blog.codeest.moe/2017/03/25/android-rxjava2/?utm_medium=email&utm_source=gank.io 前言 和之前的 Glide 篇一樣

mybatis原始碼-解析配置檔案(三)之配置檔案Configuration解析(詳細, 值得收藏)

1. 簡介 1.1 系列內容 本系列文章講解的是mybatis解析配置檔案內部的邏輯, 即 Reader reader = Resources.getResourceAsReader("mybatis-config.xml"); SqlSessionFact

mybatis源碼-解析配置文件(三)之配置文件Configuration解析(詳細, 值得收藏)

類型 version 創建對象 越來越大 ... 所有 類名 對象 and 1. 簡介 1.1 系列內容 本系列文章講解的是mybatis解析配置文件內部的邏輯, 即 Reader reader = Resources.getResourceAsReader("mybat

MP3檔案結構解析(詳細)

1. MP3檔案結構解析 1.1. 概述 1.1.1. 音訊相關術語 ü  ID3: 一般位於一個mp3檔案的開頭或末尾的若干位元組內,記錄該mp3檔案的歌手、標題、專輯名稱、年代、風格等資訊,ID3分位兩個版本,V1版ID3在檔案末尾的固定128位元組,以TAG字元開頭,若沒有則認為

RxJava2原始碼解析——基本流程、執行緒排程

本篇文章的目的: ①瞭解RxJava的基本流程 ②瞭解RxJava中執行緒排程的實現 ③瞭解了上面那些,其他的操作符對你來說就不是問題了 RxJava基本流程 我們從基本的使用作為入口: Observable.create(new ObservableOnSubsc

RxJava2 原始碼解析(二)

概述 知道源頭(Observable)是如何將資料傳送出去的。 知道終點(Observer)是如何接收到資料的。 何時將源頭和終點關聯起來的 知道執行緒排程是怎麼實現的 知道操作符是怎麼實現的 本篇計劃講解一下4,5. RxJava最強大的莫過

深入RxJava2 原始碼解析(一)

本文作者JasonChen,原文地址: chblog.me/2018/12/19/… ReactiveX 響應式程式設計庫,這是一個程式庫,通過使用可觀察的事件序列來構成非同步和事件驅動的程式。 其簡化了非同步多執行緒程式設計,在以前多執行緒程式設計的世界中,鎖、可重入鎖、同步佇列器、訊號量、併

Jdk1.8集合框架之HashMap原始碼解析詳細解析紅黑樹)

HashMap特點 不同步,支援null的鍵和值,put或get操作通常是常數時間。 Map介面的實現。 去掉了Hashtable的contains(Object value)方法,保留containsKey和containsValue方法。 使用

Rxjava2 原始碼解析(一)

前言   Rxjava2 我們都知道是很牛逼的一個擴充套件的觀察者模式的Scheduler,基本可以在程式碼中替代非同步執行緒+Handler的開發模式,同時配合MVP設計模式,可以更好的實現業務分離。但是並不能盲目崇拜,不能為了使用而使用,一切以編碼需求為主

RxJava2原始碼解析

基礎解析 我們看下RxJava最簡單的寫法 Observable.create(new ObservableOnSubscribe<String>() { @Override public void subscribe(Observa

詳細DNS正、反向解析以及主從復制

反向 重新 fff tab 軟件 其他 roc 反向解析 image 簡介 DNS(Domain Name System,域名系統),萬維網上作為域名和IP地址相互映射的一個分布式數據庫,能夠使用戶更方便的訪問互聯網,而不用去記住能夠被機器直接讀取的IP數串。通過域名,最終

java執行緒池詳細入門教程即原始碼解析

##1、執行緒池概念      執行緒池是執行緒的集合,通過執行緒池我們不需要自己建立執行緒,將任務提交給執行緒池即可。為什麼要使用執行緒池,首先,使用執行緒池可以重複利用已有的執行緒繼續執行任務,避免執行緒在建立和銷燬時造成的消耗。其次,由

樹狀陣列徹底入門,演算法小白都看得懂的詳細解析

樹狀陣列  重點是在樹狀的陣列 大家都知道二叉樹吧 葉子結點代表A陣列A[1]~A[8]    ....... 現在變形一下  現在定義每一列的頂端結點C[]陣列   如下圖   &

暢通工程續(HDU 1874)附上詳細原始碼

Problem Description 某省自從實行了很多年的暢通工程計劃後,終於修建了很多路。不過路多了也不好,每次要從一個城鎮到另一個城鎮時,都有許多種道路方案可以選擇,而某些方案要比另一些方案行走的距離要短很多。這讓行人很困擾。 現在,已知起點和終點,請你計算出要從起點到終點,最短需要

Android技能樹 — 網路小結之 OkHttp詳細解析

前言: 本文也做了一次標題黨,哈哈,其實寫的還是很水,各位原諒我O(∩_∩)O。 介於自己的網路方面知識爛的一塌糊塗,所以準備寫相關網路的文章,但是考慮全部寫在一篇太長了,所以分開寫,希望大家能仔細看,最好可以指出我的錯誤,讓我也能糾正。 1.講解相關的整個網路體系結構: 網路體系結構小結 2.講解

AM335x啟動流程(BootRom->MLO->Uboot)詳細原始碼分析

寫的非常好,收藏學習 參考檔案: 1,AM335x ARM Cortex-A8 Microprocessors (MPUs) Technical Reference Manual.pdf; 2,am3359.pdf;   1,am335x的cpu上電後,會跳到哪個地址去

網際網路公司分散式叢集架構圖入門解析(簡單通俗易懂,詳細

一、小型公司網路架構 狗子是某大學計算機專業本科應屆畢業生,由於自己的技術不錯,再加上網際網路產業的巨大利潤的驅使,狗子決定走上創業這條路,於是,狗子聯合了同學二黑,雞子,狗蛋等人花費了幾個月的時間寫出了一套網站,是關於足球資訊的pc端網站加上手機APP客戶端。現在產品測試成功了,準備釋

Spring中@Autowire的底層原理解析(附詳細原始碼閱讀步驟)

搭建原始碼閱讀環境 首先在IDEA中建立一個Maven工程,然後在pom.xml中加入依賴,因為以後可能會用到其他的功能,所以這裡直接使用的是springboot的依賴 <?xml version="1.0" encoding="UTF-8"?> <project

okhttp原始碼分析(一)——基本流程(詳細

1.okhttp原始碼分析(一)——基本流程(超詳細) 2.okhttp原始碼分析(二)——RetryAndFollowUpInterceptor過濾器 3.okhttp原始碼分析(三)——CacheInterceptor過濾器 4.okhttp原始碼分析(四)——Conn

詳細的HashMap解析(jdk1.8)

有關 過程 註解 rem tcl 性能 log 詳細 內容 目錄 一、預備知識 時間復雜度 基本數據結構 基本位運算 二、HashMap實現原理 結構 速度 三、源碼分析 基本常量 基本成員變量 構造方法 put方法 remove 四、日常使用註意事項 五、總結