RxSwift底層核心邏輯、流程分析
介紹
我們都知道,RxSwift很強,作為一名開發者,閱讀開源框架的原始碼,能讓我們受益頗多,學習優秀的開發者的思路是很有必要的。
當我們寫這麼一份程式碼時,會疑惑為什麼他兩個閉包之間會能夠聯絡起來。
Observable<String>.create { ob -> Disposable in
ob.onNext("tets")
return Disposables.create()
}.subscribe(onNext: { str in
print(str)
},onError: { error in
print(error)
},onCompleted: {
print("complete")
})
複製程式碼
本文結尾專案地址: github.com/GitHubYhb/R…
下載專案之後 -> 執行 RxExample-iOS
這是一份幫助閱讀RxSwift底層執行流程原理的RxSwift原始碼 我往其中加了一些列印的資訊,從create到onNext每一步在哪裡做了什麼都很清楚。
那麼廢話不多說。直接開幹。
Observable
Observable
Observable<Element>
public class Observable<Element> : ObservableType {
// ...部分省略
// ObservableType 協議定義的方法。
public func subscribe<Observer: ObserverType>(_ observer: Observer) -> Disposable where Observer.Element == Element {
rxAbstractMethod()
}
// ObservableConvertibleType 定義的方法。
public func asObservable() -> Observable<Element> {
return self
}
}
複製程式碼
它遵循了ObservableType
協議。
在ObservableType
協議中,要求遵循者需要有subscribe()
方法。
public protocol ObservableType: ObservableConvertibleType {
// ...省略一堆註釋
func subscribe<Observer: ObserverType>(_ observer: Observer) -> Disposable where Observer.Element == Element
}
extension ObservableType {
//遵循ObservableConvertibleType 的方法
public func asObservable() -> Observable<Element> {
// ...省略實現程式碼
}
}
複製程式碼
同時ObservableType
協議,又遵循了ObservableConvertibleType
協議。
在ObservableConvertibleType
協議中,建立了叫Element
的關聯型別。
以及要求遵循者要有 asObservable()
方法
public protocol ObservableConvertibleType {
associatedtype Element
// E 重新命名 Element
@available(*,deprecated,message: "Use `Element` instead.")
typealias E = Element
func asObservable() -> Observable<Element>
}
複製程式碼
由下圖我們可以看出來這三者之間的關係
看懂了Observable
,我們再來看看create()
,找到Create.swift
檔案
create()
extension ObservableType {
// ... 省略一堆註釋
public static func create(_ subscribe: @escaping (AnyObserver<Element>) -> Disposable) -> Observable<Element> {
return AnonymousObservable(subscribe)
}
}
複製程式碼
吐槽:這句程式碼是真的又長又難看懂。
create() 是什麼
首先,create()
是ObservableType
的擴充套件
上文已經有提到ObservableType
是一個protocol協議
所以,
create()
也是ObservableType
下定義的一個方法,跟subscribe()
一樣。
解析
create()
中有一個引數,名為subscribe
的閉包,並且用_
做了省略。
返回型是Disposable
。
看上去就是一長串,很不友好,為了更好的理解,請看我下面的程式碼
// 仿照 ObservableType 建立一個協議,並定義一個create方法
protocol TestProtocol {
func create(_ subscribeBlock: @escaping (String) -> String) -> UILabel
}
// UILabel遵循這個協議
extension UILabel: TestProtocol{
func create(_ subscribeBlock: @escaping (String) -> String) -> UILabel {
//呼叫block
let newStr = subscribeBlock("block幹活")
print("block返回 == " + newStr)
return UILabel.init()
}
}
複製程式碼
這份程式碼中(String) -> String
------------ ↓ 對應 ------------(AnyObserver<Element>) -> Disposable
然後來測試一下這段程式碼
let lb = UILabel.init()
lb.create { str in
print("block傳遞出來 == " + str)
return "test"
}
複製程式碼
看列印結果
block傳遞出來 == block幹活
block返回 == test
複製程式碼
相信看到這裡,大家心裡已經有點明白的意思了。
- 在我們呼叫完
create()
之後,閉包先返回給我們一個AnyObserver<Element>
型別的引數- 我們通過這個引數做了操作之後,就會再返回給閉包一個
Disposable
,給它操作。
那麼這個AnyObserver<Element>
是什麼呢
AnyObserver<Element>
我們就看AnyObserver
,它遵循了ObserverType
協議
注意:這個協議跟上文的ObservableType
不一樣,字母位數都不一樣的 - 0 -
public struct AnyObserver<Element> : ObserverType {
//... 省略
}
複製程式碼
先看看ObserverType
public protocol ObserverType {
associatedtype Element
@available(*,message: "Use `Element` instead.")
typealias E = Element
func on(_ event: Event<Element>)
}
extension ObserverType {
public func onNext(_ element: Element) {
self.on(.next(element))
}
public func onCompleted() {
self.on(.completed)
}
public func onError(_ error: Swift.Error) {
self.on(.error(error))
}
}
複製程式碼
從程式碼中我們可以看到ObserverType
同ObservableType
都定義了一個關聯類Element
。
不同的地方在於定義的方法是func on(_ event: Event<Element>)
,並且在拓展中定義了三個方便開發者使用的方法,也就是我們平時常用的onNext()、onCompleted() 、onError()
。
也就是說,我們可以拐彎抹角的這麼玩。
Observable<String>.create { ob -> Disposable in
// 下面三行程式碼一個意思
ob.on(Event<String>.next("any"))
ob.on(.next("test"))
ob.onNext("tets")
return Disposables.create()
}
複製程式碼
既然已經到了Event
了,不如先進去一探究竟
public enum Event<Element> {
case next(Element)
case error(Swift.Error)
case completed
}
複製程式碼
Event
是一個列舉,包含了我們熟悉的三個列舉值,next/error/completed
還有一系列的擴充套件方法 比如map()
這裡先不一一列舉了。
回到AnyObserver
看看完整的程式碼。
public struct AnyObserver<Element> : ObserverType {
// 宣告閉包 EventHandler
public typealias EventHandler = (Event<Element>) -> Void
// 私有常量 observer
private let observer: EventHandler
// 初始化方法 1
public init(eventHandler: @escaping EventHandler) {
self.observer = eventHandler
}
// 初始化方法 2
public init<Observer: ObserverType>(_ observer: Observer) where Observer.Element == Element {
self.observer = observer.on
}
// on 方法
public func on(_ event: Event<Element>) {
return self.observer(event)
}
// asObserver 方法
public func asObserver() -> AnyObserver<Element> {
return self
}
}
extension AnyObserver {
typealias s = Bag<(Event<Element>) -> Void>
}
複製程式碼
那麼create()
方法宣告的部分就先到這裡。接下來我們看看,他後續返回了AnonymousObservable(subscribe)
AnonymousObservable()
AnonymousObservable
翻譯過來就是 匿名觀察序列
看下面程式碼能發現:
- 這是一個
Create.swift
中的私有類 -
AnonymousObservable
繼承自Producer<Element>
- 我們上文中就是return了
AnonymousObservable()
初始化方法。
final private class AnonymousObservable<Element>: Producer<Element> {
...先不看他都幹了什麼
}
複製程式碼
既然是繼承自Producer<Element>
,那麼不如先看看Producer<Element>
class Producer<Element> : Observable<Element> {
... 也先不看
}
複製程式碼
最終,他還是Observable<Element>
,函式要求的返回格式。
回到AnonymousObservable()
final private class AnonymousObservable<Element>: Producer<Element> {
typealias SubscribeHandler = (AnyObserver<Element>) -> Disposable
let _subscribeHandler: SubscribeHandler
init(_ subscribeHandler: @escaping SubscribeHandler) {
self._subscribeHandler = subscribeHandler
}
override func run<Observer: ObserverType>(_ observer: Observer,cancel: Cancelable) -> (sink: Disposable,subscription: Disposable) where Observer.Element == Element {
let sink = AnonymousObservableSink(observer: observer,cancel: cancel)
let subscription = sink.run(self)
return (sink: sink,subscription: subscription)
}
}
複製程式碼
上文有說到create()
中return了AnonymousObservable()
初始化方法。
那麼我們把注意力放到初始化方法
這裡他把我們傳進來的閉包儲存了起來。
那麼,我們是不是可以猜想,他儲存起來之後,是不是可以想什麼時候用就什麼時候用呢。
所以當前,我們return了AnonymousObservable()
初始化方法之後,create()
已經執行完畢。
那麼現在我們可以這麼理解,下面這段程式碼
Observable<String>
.create { (ob) -> Disposable in
ob.on(Event<String>.next("any"))
return Disposables.create()
}.subscribe(onNext: { str in
},onError: { (error) in
},onCompleted: {
})
複製程式碼
等同於
AnonymousObservable(subscribe)
.subscribe(onNext: { str in
},onCompleted: {
})
複製程式碼
下一步就是subscribe()
subscribe()
在說之前,我得先跟大家說一下,接下來我們會遇到的subscribe()
有兩個,如圖所示。
然後開始吧。
我做了一些列印,幫助我能更好的理解整個訂閱過程。這張圖上主要做了兩點。
- 建立了
AnonymousObserver
,這個跟AnonymousObservable
不一樣,AnonymousObserver
是觀察者,而AnonymousObservable
是被觀察物件。雖然創建出來,但是並沒有進行呼叫。 - 跟
AnonymousObservable
一樣,把這個閉包作為一個臨時變數。通過self.asObservable().subscribe(observer)
傳遞出去。
那麼被傳遞到哪裡去了呢。我們看到self
,這是我們當前的呼叫物件AnonymousObservable
,他這裡呼叫的subscribe(observer)
是我們上面做過標記的2 。
AnonymousObservable
的subscribe()
繼承自Producer
這裡主要是呼叫了run
方法,又把傳進來的observer傳到別的地方,然後在當前的Producer
裡面的run
是個抽象方法,我們回到AnonymousObservable
,他重寫了這個run
方法
可以看到他建立了一個AnonymousObservableSink
,初始化時又傳入了observer
,一路跟進到Sink
的初始化方法可以看到
AnonymousObservable
時的操作類似,把observer
儲存了起來。
在初始化AnonymousObservableSink
後,有呼叫了另外一個run
方法。
在這個AnonymousObservableSink
中,他重新命名了AnonymousObservable
run
方法接收到我們傳遞過來的Parent
(AnonymousObservable)parent
執行先前儲存起來的閉包,並將AnyObserver(self)
傳遞了進去,這樣才能讓儲存起來的閉包有一個觀察者來執行Event
。
這裡的self
是AnonymousObservableSink
,通過AnyObserver
轉成觀察者。
可以這麼理解:通過AnyObserver()
將先前儲存起來的observer
,提取出來
這時候,已經把兩個儲存起來的元素串聯起來了。
開始執行閉包中的內容。
這是最開始我寫好的閉包內容。
let ob = Observable<String>.create { ob -> Disposable in
print("開始在我們儲存的閉包裡面搞事,ob的值已經傳進來了,就是AnyObserver(self)")
ob.onNext("tets")
return Disposables.create()
}
_ = ob.subscribe(onNext: { str in
print(str)
})
複製程式碼
那麼,這個onNext
是什麼時候執行的呢?
onNext
什麼時候執行
ob.onNext("tets")
的本質是AnyObserver(self).onNext("tets")
self
是AnonymousObservableSink
我們需要看看AnyObserver(self)
發生了什麼
public struct AnyObserver<Element> : ObserverType {
public typealias EventHandler = (Event<Element>) -> Void
private let observer: EventHandler
public init<Observer: ObserverType>(_ observer: Observer) where Observer.Element == Element {
print("呼叫AnyObserver(self)")
self.observer = observer.on
}
...
}
複製程式碼
初始化方法中,將observer.on
儲存了起來,也就是說
把AnonymousObservableSink
的on
方法儲存了起來
那麼這個on
方法都做了什麼呢
func on(_ event: Event<Element>) {
print("AnonymousObservableSink 的 on 方法 event == \(event.element!)")
switch event {
case .next:
if load(self._isStopped) == 1 {
return
}
self.forwardOn(event)
case .error,.completed:
if fetchOr(self._isStopped,1) == 0 {
self.forwardOn(event)
self.dispose()
}
}
}
複製程式碼
可以看到on
方法又呼叫了一個forwardOn
,再點進去看看
final func forwardOn(_ event: Event<Observer.Element>) {
//...省略
self._observer.on(event)
}
複製程式碼
記得我們上文有提到在初始化AnonymousObservableSink
時,把在subscribe()
中建立的AnonymousObserver
儲存了起來,這裡他就通過這個_observer
,呼叫了on
方法。
這時才跳到了AnonymousObserver
。
回到建立AnonymousObserver
的地方
let observer = AnonymousObserver<Element> { event in
...省略不看
}
複製程式碼
看看AnonymousObserver
的本身
final class AnonymousObserver<Element>: ObserverBase<Element> {
typealias EventHandler = (Event<Element>) -> Void
private let _eventHandler : EventHandler
init(_ eventHandler: @escaping EventHandler) {
#if TRACE_RESOURCES
_ = Resources.incrementTotal()
#endif
self._eventHandler = eventHandler
}
override func onCore(_ event: Event<Element>) {
return self._eventHandler(event)
}
}
複製程式碼
AnonymousObserver
繼承自ObserverBase
,貫徹到底
class ObserverBase<Element> : Disposable,ObserverType {
private let _isStopped = AtomicInt(0)
func on(_ event: Event<Element>) {
switch event {
case .next:
if load(self._isStopped) == 0 {
self.onCore(event)
}
case .error,1) == 0 {
self.onCore(event)
}
}
}
func onCore(_ event: Event<Element>) {
rxAbstractMethod()
}
func dispose() {
fetchOr(self._isStopped,1)
}
}
複製程式碼
我們可以看到ObserverBase
有一個on方法,還記得我們最上面說過的嗎
Observable<String>.create { ob -> Disposable in
// 下面三行程式碼一個意思
ob.on(Event<String>.next("any"))
ob.on(.next("test"))
ob.onNext("tets")
return Disposables.create()
}
複製程式碼
這三行是同一個意思。也就是說,在我們呼叫onNext
時,會先呼叫on
方法,在呼叫.next
,然後就走到了self.onCore
,當前的onCore
也是一個抽象方法。
我們回到AnonymousObserver
重寫onCore
override func onCore(_ event: Event<Element>) {
return self._eventHandler(event)
}
複製程式碼
self._eventHandler(event)
就會執行這段紅框程式碼
直到這裡,才執行到了onNext
,才會到我們自己在外部寫的程式碼。
看起來可能很亂,這裡做一下
總結
- 我們寫的閉包中
ob.onNext()
呼叫時,首先呼叫了AnonymousObservableSink
的on
方法AnonymousObservableSink
的on
方法呼叫了self.forwardOn(event)
forwardOn(event)
通過先前儲存起來的_observer
,呼叫了AnonymousObserver
的on
方法AnonymousObserver
的on
方法呼叫重寫父類的self.onCore(event)
onCore(event)
呼叫AnonymousObserver
的self._eventHandler(event)
,這個_eventHandler
就是上面紅框裡面的內容被儲存了下來。- 紅框內接收到
self._eventHandler(event)
傳遞來的event
,判斷event
型別,執行我們自己在外部寫的內容。
或許還是很亂,我調整了一下我的列印內容,一步一步的都寫得很清楚。