1. 程式人生 > IOS開發 >RxSwift底層核心邏輯、流程分析

RxSwift底層核心邏輯、流程分析

介紹

我們都知道,RxSwift很強,作為一名開發者,閱讀開源框架的原始碼,能讓我們受益頗多,學習優秀的開發者的思路是很有必要的。

當我們寫這麼一份程式碼時,會疑惑為什麼他兩個閉包之間會能夠聯絡起來。

Observable<String>.create { ob -> Disposable in
            ob.onNext("tets")
            return Disposables.create()
            }.subscribe(onNext: { str in
                print(str)
            },onError: { error in
print(error) },onCompleted: { print("complete") }) 複製程式碼

本文結尾專案地址: github.com/GitHubYhb/R…
這是一份幫助閱讀RxSwift底層執行流程原理的RxSwift原始碼 我往其中加了一些列印的資訊,從create到onNext每一步在哪裡做了什麼都很清楚。

下載專案之後 -> 執行 RxExample-iOS


那麼廢話不多說。直接開幹。

Observable

Observable

原型是 Observable<Element>

public class Observable<Element> : ObservableType {
    // ...部分省略
    
    // ObservableType 協議定義的方法。       
    public func subscribe<Observer: ObserverType>(_ observer: Observer) -> Disposable where Observer.Element == Element {
        rxAbstractMethod()
    }
    
    // ObservableConvertibleType 定義的方法。
public func asObservable() -> Observable<Element> { return self } } 複製程式碼

它遵循了ObservableType協議。
ObservableType協議中,要求遵循者需要有subscribe()方法。

public protocol ObservableType: ObservableConvertibleType {
    // ...省略一堆註釋
    func subscribe<Observer: ObserverType>(_ observer: Observer) -> Disposable where Observer.Element == Element
}
extension ObservableType {
    //遵循ObservableConvertibleType 的方法
    public func asObservable() -> Observable<Element> {
       // ...省略實現程式碼
    }
}
複製程式碼

同時ObservableType協議,又遵循了ObservableConvertibleType協議。 在ObservableConvertibleType協議中,建立了叫Element的關聯型別。
以及要求遵循者要有 asObservable()方法

public protocol ObservableConvertibleType {
    associatedtype Element
    
    // E 重新命名 Element
    @available(*,deprecated,message: "Use `Element` instead.")
    typealias E = Element

    func asObservable() -> Observable<Element>
}
複製程式碼

由下圖我們可以看出來這三者之間的關係

看懂了Observable,我們再來看看create(),找到Create.swift檔案

create()

extension ObservableType {
    // ... 省略一堆註釋
    public static func create(_ subscribe: @escaping (AnyObserver<Element>) -> Disposable) -> Observable<Element> {
        return AnonymousObservable(subscribe)
    }
}
複製程式碼

吐槽:這句程式碼是真的又長又難看懂。

create() 是什麼

首先,create()ObservableType的擴充套件
上文已經有提到ObservableType是一個protocol協議

所以,create()也是ObservableType下定義的一個方法,跟subscribe()一樣。

解析

create()中有一個引數,名為subscribe的閉包,並且用_做了省略。
返回型是Disposable
看上去就是一長串,很不友好,為了更好的理解,請看我下面的程式碼

// 仿照 ObservableType 建立一個協議,並定義一個create方法
protocol TestProtocol {
    func create(_ subscribeBlock: @escaping (String) -> String) -> UILabel
}
// UILabel遵循這個協議
extension UILabel: TestProtocol{
    func create(_ subscribeBlock: @escaping (String) -> String) -> UILabel {
        //呼叫block
        let newStr = subscribeBlock("block幹活")
        print("block返回 == " + newStr)
        return UILabel.init()
    }
}
複製程式碼

這份程式碼中
(String) -> String
------------ ↓ 對應 ------------
(AnyObserver<Element>) -> Disposable
然後來測試一下這段程式碼

let lb = UILabel.init()
lb.create { str in
    print("block傳遞出來 == " + str)
    return "test"
}
複製程式碼

看列印結果

block傳遞出來 == block幹活
block返回 == test
複製程式碼

相信看到這裡,大家心裡已經有點明白的意思了。

  1. 在我們呼叫完create()之後,閉包先返回給我們一個AnyObserver<Element>型別的引數
  2. 我們通過這個引數做了操作之後,就會再返回給閉包一個Disposable,給它操作。

那麼這個AnyObserver<Element> 是什麼呢

AnyObserver<Element>

我們就看AnyObserver,它遵循了ObserverType協議
注意:這個協議跟上文的ObservableType不一樣,字母位數都不一樣的 - 0 -

public struct AnyObserver<Element> : ObserverType {
    //... 省略
}
複製程式碼

先看看ObserverType

public protocol ObserverType {
    associatedtype Element
    @available(*,message: "Use `Element` instead.")
    typealias E = Element
    func on(_ event: Event<Element>)
}
extension ObserverType {
    public func onNext(_ element: Element) {
        self.on(.next(element))
    }
    public func onCompleted() {
        self.on(.completed)
    }
    public func onError(_ error: Swift.Error) {
        self.on(.error(error))
    }
}
複製程式碼

從程式碼中我們可以看到ObserverTypeObservableType都定義了一個關聯類Element
不同的地方在於定義的方法是func on(_ event: Event<Element>),並且在拓展中定義了三個方便開發者使用的方法,也就是我們平時常用的onNext()、onCompleted() 、onError()

也就是說,我們可以拐彎抹角的這麼玩。

Observable<String>.create { ob -> Disposable in
    // 下面三行程式碼一個意思
    ob.on(Event<String>.next("any"))
    ob.on(.next("test"))
    ob.onNext("tets")
    return Disposables.create()
}
複製程式碼

既然已經到了Event了,不如先進去一探究竟

public enum Event<Element> {
    case next(Element)

    case error(Swift.Error)

    case completed
}
複製程式碼

Event是一個列舉,包含了我們熟悉的三個列舉值,next/error/completed
還有一系列的擴充套件方法 比如map()這裡先不一一列舉了。

回到AnyObserver
看看完整的程式碼。

public struct AnyObserver<Element> : ObserverType {
    // 宣告閉包 EventHandler
    public typealias EventHandler = (Event<Element>) -> Void
    // 私有常量 observer
    private let observer: EventHandler
    // 初始化方法 1
    public init(eventHandler: @escaping EventHandler) {
        self.observer = eventHandler
    }
    // 初始化方法 2
    public init<Observer: ObserverType>(_ observer: Observer) where Observer.Element == Element {
        self.observer = observer.on
    }
    // on 方法
    public func on(_ event: Event<Element>) {
        return self.observer(event)
    }
    // asObserver 方法
    public func asObserver() -> AnyObserver<Element> {
        return self
    }
}
extension AnyObserver {
    typealias s = Bag<(Event<Element>) -> Void>
}

複製程式碼

那麼create()方法宣告的部分就先到這裡。接下來我們看看,他後續返回AnonymousObservable(subscribe)

AnonymousObservable()

AnonymousObservable 翻譯過來就是 匿名觀察序列
看下面程式碼能發現:

  1. 這是一個Create.swift中的私有類
  2. AnonymousObservable繼承自Producer<Element>
  3. 我們上文中就是return了AnonymousObservable()初始化方法。
final private class AnonymousObservable<Element>: Producer<Element> {
    ...先不看他都幹了什麼
}
複製程式碼

既然是繼承自Producer<Element>,那麼不如先看看Producer<Element>

class Producer<Element> : Observable<Element> {
    ... 也先不看
}
複製程式碼

最終,他還是Observable<Element>,函式要求的返回格式。

回到AnonymousObservable()

final private class AnonymousObservable<Element>: Producer<Element> {
    typealias SubscribeHandler = (AnyObserver<Element>) -> Disposable

    let _subscribeHandler: SubscribeHandler

    init(_ subscribeHandler: @escaping SubscribeHandler) {
        self._subscribeHandler = subscribeHandler
    }

    override func run<Observer: ObserverType>(_ observer: Observer,cancel: Cancelable) -> (sink: Disposable,subscription: Disposable) where Observer.Element == Element {
        let sink = AnonymousObservableSink(observer: observer,cancel: cancel)
        let subscription = sink.run(self)
        return (sink: sink,subscription: subscription)
    }
}
複製程式碼

上文有說到create()中return了AnonymousObservable()初始化方法。
那麼我們把注意力放到初始化方法
這裡他把我們傳進來的閉包儲存了起來。
那麼,我們是不是可以猜想,他儲存起來之後,是不是可以想什麼時候用就什麼時候用呢。

所以當前,我們return了AnonymousObservable()初始化方法之後,create()已經執行完畢。

那麼現在我們可以這麼理解,下面這段程式碼

 Observable<String>
    .create { (ob) -> Disposable in
        ob.on(Event<String>.next("any"))
        return Disposables.create()
    }.subscribe(onNext: { str in
    },onError: { (error) in
    },onCompleted: {
    })
複製程式碼

等同於

AnonymousObservable(subscribe)
    .subscribe(onNext: { str in
    },onCompleted: {
    })
複製程式碼

下一步就是subscribe()

subscribe()

在說之前,我得先跟大家說一下,接下來我們會遇到的subscribe()有兩個,如圖所示。

我們這裡就先把他們標記一下。

然後開始吧。

我做了一些列印,幫助我能更好的理解整個訂閱過程。
這張圖上主要做了兩點。

  1. 建立了AnonymousObserver,這個跟AnonymousObservable不一樣,AnonymousObserver是觀察者,而AnonymousObservable是被觀察物件。雖然創建出來,但是並沒有進行呼叫。
  2. AnonymousObservable一樣,把這個閉包作為一個臨時變數。通過self.asObservable().subscribe(observer)傳遞出去。

那麼被傳遞到哪裡去了呢。我們看到self,這是我們當前的呼叫物件AnonymousObservable,他這裡呼叫的subscribe(observer)是我們上面做過標記的2

AnonymousObservablesubscribe()繼承自Producer

這裡主要是呼叫了run方法,又把傳進來的observer傳到別的地方,然後在當前的Producer裡面的run是個抽象方法,我們回到AnonymousObservable,他重寫了這個run方法

可以看到他建立了一個AnonymousObservableSink,初始化時又傳入了observer,一路跟進到Sink的初始化方法可以看到

AnonymousObservable時的操作類似,把observer儲存了起來。

在初始化AnonymousObservableSink後,有呼叫了另外一個run方法。

在這個AnonymousObservableSink中,他重新命名了AnonymousObservable
run方法接收到我們傳遞過來的Parent(AnonymousObservable)
parent執行先前儲存起來的閉包,並將AnyObserver(self)傳遞了進去,這樣才能讓儲存起來的閉包有一個觀察者來執行Event
這裡的selfAnonymousObservableSink,通過AnyObserver轉成觀察者。

可以這麼理解:通過AnyObserver()將先前儲存起來的observer,提取出來
這時候,已經把兩個儲存起來的元素串聯起來了。

開始執行閉包中的內容。
這是最開始我寫好的閉包內容。

        let ob = Observable<String>.create { ob -> Disposable in
            print("開始在我們儲存的閉包裡面搞事,ob的值已經傳進來了,就是AnyObserver(self)")
            ob.onNext("tets")
            return Disposables.create()
        }
        _ = ob.subscribe(onNext: { str in
            print(str)
        })
複製程式碼

那麼,這個onNext是什麼時候執行的呢?

onNext 什麼時候執行

ob.onNext("tets")的本質是AnyObserver(self).onNext("tets")
selfAnonymousObservableSink
我們需要看看AnyObserver(self)發生了什麼

public struct AnyObserver<Element> : ObserverType {
    public typealias EventHandler = (Event<Element>) -> Void

    private let observer: EventHandler

    public init<Observer: ObserverType>(_ observer: Observer) where Observer.Element == Element {
        print("呼叫AnyObserver(self)")
        self.observer = observer.on
    }
    ...
}
複製程式碼

初始化方法中,將observer.on儲存了起來,也就是說
AnonymousObservableSinkon方法儲存了起來
那麼這個on方法都做了什麼呢

   func on(_ event: Event<Element>) {
        print("AnonymousObservableSink 的 on 方法 event == \(event.element!)")
        switch event {
        case .next:
            if load(self._isStopped) == 1 {
                return
            }
            self.forwardOn(event)
        case .error,.completed:
            if fetchOr(self._isStopped,1) == 0 {
                self.forwardOn(event)
                self.dispose()
            }
        }
    }
複製程式碼

可以看到on方法又呼叫了一個forwardOn,再點進去看看

final func forwardOn(_ event: Event<Observer.Element>) {
        //...省略
        self._observer.on(event)
}
複製程式碼

記得我們上文有提到在初始化AnonymousObservableSink時,把在subscribe()中建立的AnonymousObserver儲存了起來,這裡他就通過這個_observer,呼叫了on方法。
這時才跳到了AnonymousObserver

回到建立AnonymousObserver的地方

let observer = AnonymousObserver<Element> { event in
    ...省略不看
}
複製程式碼

看看AnonymousObserver的本身

final class AnonymousObserver<Element>: ObserverBase<Element> {
    typealias EventHandler = (Event<Element>) -> Void
    
    private let _eventHandler : EventHandler
    
    init(_ eventHandler: @escaping EventHandler) {
#if TRACE_RESOURCES
        _ = Resources.incrementTotal()
#endif
        self._eventHandler = eventHandler
    }

    override func onCore(_ event: Event<Element>) {
        return self._eventHandler(event)
    }
}

複製程式碼

AnonymousObserver繼承自ObserverBase,貫徹到底

class ObserverBase<Element> : Disposable,ObserverType {
    private let _isStopped = AtomicInt(0)

    func on(_ event: Event<Element>) {
        switch event {
        case .next:
            if load(self._isStopped) == 0 {
                self.onCore(event)
            }
        case .error,1) == 0 {
                self.onCore(event)
            }
        }
    }

    func onCore(_ event: Event<Element>) {
        rxAbstractMethod()
    }

    func dispose() {
        fetchOr(self._isStopped,1)
    }
}
複製程式碼

我們可以看到ObserverBase有一個on方法,還記得我們最上面說過的嗎

Observable<String>.create { ob -> Disposable in
    // 下面三行程式碼一個意思
    ob.on(Event<String>.next("any"))
    ob.on(.next("test"))
    ob.onNext("tets")
    return Disposables.create()
}
複製程式碼

這三行是同一個意思。也就是說,在我們呼叫onNext時,會先呼叫on方法,在呼叫.next,然後就走到了self.onCore,當前的onCore也是一個抽象方法。
我們回到AnonymousObserver重寫onCore

override func onCore(_ event: Event<Element>) {
    return self._eventHandler(event)
}
複製程式碼

self._eventHandler(event)就會執行這段紅框程式碼

直到這裡,才執行到了onNext,才會到我們自己在外部寫的程式碼。

看起來可能很亂,這裡做一下

總結

  1. 我們寫的閉包中ob.onNext()呼叫時,首先呼叫了AnonymousObservableSinkon方法
  2. AnonymousObservableSinkon方法呼叫了self.forwardOn(event)
  3. forwardOn(event)通過先前儲存起來的_observer,呼叫了AnonymousObserveron方法
  4. AnonymousObserveron方法呼叫重寫父類的self.onCore(event)
  5. onCore(event)呼叫AnonymousObserverself._eventHandler(event),這個_eventHandler就是上面紅框裡面的內容被儲存了下來。
  6. 紅框內接收到self._eventHandler(event)傳遞來的event,判斷event型別,執行我們自己在外部寫的內容。

或許還是很亂,我調整了一下我的列印內容,一步一步的都寫得很清楚。