Rxjava2原始碼解析超詳細~~~
轉載:Est 的小窩 原文地址:https://blog.codeest.moe/2017/03/25/android-rxjava2/?utm_medium=email&utm_source=gank.io
前言
和之前的 Glide 篇一樣,這篇 RxJava2 原始碼分析也會先列出一些要點,然後按這些點一步一步分析
注意,文章中的 RxJava 均是指RxJava2
- Rxjava 中的主要型別
- Rxjava 訂閱與終止訂閱的過程(Observable、Observer、create、just、dispose)
- Rxjava 操作符原理(map、lift、compose)
- Rxjava 執行緒排程原理(subscribeOn、observeOn、io、main)
- Rxjava 背壓處理原理(buffer、latest、drop)
- Rxjava 冷熱 Observable(publish、share、connect、refCount)
- Rxjava 封裝庫 RxBinding 原理
RxJava 中的主要型別
開始之前先梳理下幾個關鍵類的作用和他們之間的關係
Observable
被觀察者(事件源),不處理背壓Observer
觀察者,用於訂閱Observable
-
Subject
繼承了Observable
實現了Observer
,既可做觀察者也可做被觀察者,通常作為兩者的橋樑或代理 -
Flowable(Publisher)
被觀察者(事件源),有背壓處理策略 Subscriber
觀察者,用於訂閱Flowable
-
Processor
實現類FlowableProcessor
繼承了Flowable
實現了FlowableSubscriber
,類似Subject
-
Single/SingleObserver
僅發生一次訊息,遵循onSubscribe (onSuccess | onError)?
Completable/CompletableObserver
僅發生一次訊息,遵循onSubscribe (onComplete | onError)?
-
Maybe/MaybeObserver
僅發生一次訊息,遵循onSubscribe (onSuccess | onError | onComplete)?
-
Disposable
替代了 RxJava1 中的Subscription
,實現該介面的資源具備可被取消 (dispose) 的能力 Subscription
在Subscriber
訂閱時回撥的物件,具備拉取 (request) 和取消訂閱 (cancel) 的能力
RxJava 訂閱與終止訂閱的過程
這裡先以最基礎的Observable.create
為例
123456789101112131415161718192021222324252627 |
進入create
方法,requireNonNull
只是一個簡單的判空處理,然後由onAssembly
返回Observable<T>
物件
12345678910111213141516 |
(SchedulerSupport.NONE) |
onAssembly
是一個具有 hook 作用的方法,它會判斷它的Function
型別成員變數onObservableAssembly
是否存在,不存在則直接把傳入的引數返回,存在則把經過onObservableAssembly
處理後的結果返回,相當於提供了一層允許插入額外操作的
hook 層。在當前場景下它直接返回了我們建立的ObservableCreate<T>
,它是一個繼承了Observable
的類,之後我們會呼叫Observable
的subscribe
方法來完成訂閱
12345678910111213141516171819 |
(SchedulerSupport.NONE) |
訂閱方法中的requireNonNull
和RxJavaPlugins
就不再贅述了,最後執行了subscribeActual
方法,這裡是實際完成訂閱的地方。
1234567891011121314151617181920 |
回到ObservableCreate
中,在subscribeActual
裡首先建立了用於發射事件的CreateEmitter
物件,CreateEmitter
實現了介面Emitter
和Disposable
,
並持有observer
。當通過onNext
發射事件時會傳遞給觀察者的onNext
方法
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051 |
最後執行source.subscribe(parent);
使資料來源開始經由發射器發射資料,至此整個建立過程就走通了
1234567 |
經過上面的分析,我們瞭解到了:
-
每次將事件傳遞給觀察者時都會判斷
isDisposed()
檢查是否訂閱已經終止,一旦觸發了onError()
和onComplete()
緊接著就會執行dispose()
-
執行
Observable.subscribe
後才會在subscribeActual
中完成實際的訂閱,並且開始執行發射器發射事件的程式碼,建立型操作符create
,defer
,fromCallable
,just
等均遵循這個規則,稍稍需要注意的是,just()
方法即使沒有訂閱也會立刻執行(是立刻執行該函式本身,不是開始發射資料),他會一開始就把我們要發射的內容作為value
儲存下來
12345678910111213141516171819 |
接下來分析終止訂閱的過程
1234567891011121314151617181920 |