1. 程式人生 > IOS開發 >RxSwift原始碼解析一

RxSwift原始碼解析一

RxSwift原始碼解析一

一、介紹

一個幫助我們簡化非同步程式設計的Swift框架。

二、核心

  • Observable:產生事件
  • Observer:響應事件
  • Operator:建立變化組合事件
  • Disposable:管理繫結(訂閱)的生命週期
  • Schedulers:執行緒佇列調配

三、ObservableObserver之間的關係

  • 例子
let _ = Observable<Int>.create { (observer) -> Disposable in
    observer.onNext(1)
    observer.onNext(2
) observer.onNext(3) observer.onCompleted() return Disposables.create() }.subscribe(onNext: { (num) in print("receive num \(num)") },onError: { (error) in print("error: \(error.localizedDescription)") },onCompleted: { print("receive complete") }) 複製程式碼

如上程式碼出現兩個重要的方法createsubscribe

。顧名思義,create方法是建立一個Observable物件,而subscribe方法是建立一個訂閱事件。我們先關注下create方法如何建立一個Observable物件。

//Create.swift
extension ObservableType {
    
    public static func create(_ subscribe: @escaping (AnyObserver<Element>) -> Disposable) -> Observable<Element> {
        return AnonymousObservable
(subscribe) } } 複製程式碼

首先看它傳入的引數為一個閉包:AnyObserver<Element> -> Disposable,然後返回的是一個Observable<Element>物件。對比我們的例子,我們可以確定Element為我們指定的Int,即泛型Element表示資料來源型別。

上面返回的是一個AnonymousObservable物件,並將閉包作為引數傳入。

//Create.swift
final private class AnonymousObservable<Element> : Producer<Element> {
    
    typealias SubscribeHandler = (AnyObserver<Element>) -> Disposable
    
    let _subscribeHandler: SubscribeHandler
    
    init(_ subscribeHandler: @escaping SubscribeHandler) {
        self._subscribeHandler = subscribeHandler
    }
    
}
複製程式碼

AnonymousObservable將傳入的閉包賦值給變數_subscribeHandler。至此建立完了一個Observable物件。然後執行其subscribe方法:

public func subscribe(onNext: ((Element) -> Void)? = nil,onError: ((Swift.Error) -> Void)? = nil,onCompleted:(() -> Void)? = nil,onDisposed:(() -> Void)? = nil) -> Disposable {
    let disposable: Disposable
    
    if let disposed = onDisposed {
        disposable = Disposables.create(with: disposed)
    } else {
        disposable = Disposables.create()
    }
    
    let observer = AnonymousObserver<Element> { event in
        switch event {
        case .next(let value):
            onNext?(value)
        case .error(let error):
            if let onError = onError {
                onError(error)
            }
        
            disposable.dispose()
        case .completed:
            onCompleted?()
            disposable.dispose()
        }
    }
    return Disposables.create(self.asObservable().subscribe(observer),disposable)
}
複製程式碼

這一段程式碼比較長,我們先從引數下手,可以看到引數中包括onNext(產生下一個事件)、onError(產生錯誤)、onCompleted(產生完成)和onDisposed四個不同的閉包。我們先暫時不管Disposed部分內容,直接看到下面相關程式碼:

let observer = AnonymousObserver<Element> { event in
    switch event {
    case .next(let value):
        onNext?(value)
    case .error(let error):
        if let onError = onError {
            onError(error)
        }
    
        disposable.dispose()
    case .completed:
        onCompleted?()
        disposable.dispose()
    }
}
複製程式碼

上面程式碼建立了一個AnonymousObserver物件,並將引數的閉包事件與自身產生的event事件關聯在一起。

final class AnonymousObserver<Element>: ObserverBase<Element> {
    
    typealias EventHandler = (Event<Element>) -> Void
    
    private let _eventHandler: EventHandler
    
    init(_ eventHandler: @escaping EventHandler) {
        self._eventHandler = eventHandler
    }
    
}
複製程式碼

AnonymousObserver物件將上面關聯的一個事件轉換閉包作為引數儲存到變數_eventHandler中。其實可以簡單地理解AnonymousObserver物件將上面subscribe方法中的引數閉包儲存起來了。

再回到subscribe方法,看到最後一句程式碼:

return Disposables.create(self.asObservable().subscribe(observer),disposable)
複製程式碼

我們關注到這裡self.asObservable().subscribe(observer),首先呼叫了asObservable()方法:

//ObservableConvertibleType.swift
public protocol ObservableConvertibleType {
    
    associatedtype Element
    
    func asObservable() -> Observable<Element>
}

//Observable.swift
public class Observable<Element> : ObservableType {
    
    public func subscribe<Observer: ObserverType>(_ observer: Observer) -> Disposable where Observer.Element == Element {
        rxAbstractMethod()
    }
    
    public func asObservable() -> Observable<Element> {
        return self
    }
    
}
複製程式碼

如上所示,我們可以看到asObservable()方法返回的自己本身Observable。但我們看到這裡對應的subscribe方法為"抽象方法",上面我們建立的是AnonymousObservable物件,在它的父類Producer中實現了:

class Producer<Element> : Observable<Element> {
    
    override func subscribe<Observer: ObserverType>(_ observer: Observer) -> Disposable where Observer.Element == Element {
        if !CurrentThreadScheduler.isScheduleRequired {
            let disposer = SinkDisposer()
            let sinkAndSubscription = self.run(observer,cancel: disposer)
            disposer.setSinkAndSubscription(sink: sinkAndSubscription.sink,subscription: sinkAndSubscription.subscription)
            return disposer
        } else {
            return CurrentThreadScheduler.instance.schedule(()) { _ in
                let disposer = SinkDisposer()
                let sinkAndSubscription = self.run(observer,cancel: disposer)
                disposer.setSinkAndSubscription(sink: sinkAndSubscription.sink,subscription: sinkAndSubscription.subscription)
                return disposer
            }
        }
    }
    
    func run<Observer: ObserverType>(_ observer: Observer,cancel: Cancelable) -> (sink: Disposable,subscription: Disposable) where Observer.Element == Element {
        rxAbstractMethod()
    }
    
}
複製程式碼

同樣我們先暫時不管Scheduler相關內容,這裡呼叫了self.run()方法,但它本身並未實現該方法,同樣我們在AnonymousObservale中可以找到:

final private class AnonymousObservable<Element> : Producer<Element> {
    
    override func run<Observer: ObserverType>(_ observer: Observer,subscription: Disposable) where Element == Observer.Element {
        let sink = AnonymousObservableSink(observer: observer,cancel: cancel)
        let subscription = sink.run(self)
        return (sink: sink,subscription: subscription)
    }
    
}
複製程式碼

run方法中建立了一個AnonymousObservableSink方法,然後呼叫了它的run方法。

final private class AnonymousObservableSink<Observer: ObserverType>: Sink<Observer>,ObserverType {
    
    typealias Element = Observer.Element
    typealias Parent = AnonymousObservable<Element>
    
    private let _isStopped = AtomicInt(0)
    
    override init(observer: Observer,cancel: Cancelable) {
        super.init(observer: observer,cancel: cancel)
    }
    
    func run(_ parent: Parent) -> Disposable {
        return parent._subscribeHandler(AnyObserver(self))
    }
    
}
複製程式碼

我們先看到其run方法,可以看到它執行了parent._subscribeHandler(AnyObserver(self)),這一句很關鍵,這裡的parent其實指的是我們在呼叫create方法時,建立的AnonymousObservable物件。因此,這裡的_subscribeHandler就是我們create方法傳遞的引數閉包。我們可以看到這裡建立了一個AnyObserver物件傳入到閉包中。

回到例子中的閉包內容:

{ (observer) -> Disposable in
    observer.onNext(1)
    observer.onNext(2)
    observer.onNext(3)
    observer.onCompleted()
    return Disposables.create()
}
複製程式碼

這裡呼叫了onNext方法產生元素1

public protocol ObserverType {
    associatedtype Element
    
    func on(_ event: Event<Element>)
}

extension ObserverType {
    
    public func onNext(_ element: Element) {
        self.on(.next(element))
    }
    
}
複製程式碼

這裡呼叫了on方法傳遞元素,而我們上面知道這裡的ObserverTypeAnyObserver物件:

public struct AnyObserver<Element> : ObserverType {
    
    public typealias EventHandler = (Event<Element>) -> Void
    
    private let observer: EventHandler
    
    public init<Observer: ObserverType>(_ observer: Observer) where Observer.Element == Element {
        self.observer = observer.on
    }
    
    public func on(_ event: Event<Element>) {
        return self.observer(event)
    }
  
}
複製程式碼

接著呼叫了self.observer(event)將事件傳遞下去,而這裡的observer是在建立parent._subscribeHandler(AnyObserver(self))時傳入的。即self.observer = observer.on => AnonymousObservableSink.on

final private class AnonymousObservableSink<Observer: ObserverType>: Sink<Observer>,ObserverType {
    
    func on(_ event: Event<Element>) {
        switch event {
        case .next:
            if load(self._isStopped) == 1 {
                return
            }
            self.forwardOn(event)
        case .error,.completed:
            if fetchOr(self._isStopped,1) == 0 {
                self.forwardOn(event)
                self.dispose()
            }
        }
    }
    
}
複製程式碼

因此,事件會傳遞到AnonymousObservableSink中,並通過fowardOn方法繼續傳遞事件.


class Sink<Observer: ObserverType>: Disposable {
    
    fileprivate let _observer: Observer
    fileprivate let _cancel: Cancelable
    
    init(observer: Observer,cancel: Cancelable) {
        self._observer = observer
        self._cancel = cancel
    }
    
    final func forwardOn(_ event: Event<Observer.Element>) {
        if isFlagSet(self._diposed,1) {
            return
        }
        self._observer.on(event)
    }
   
}
複製程式碼

這裡呼叫了self._observer.on(event)方法傳遞事件,而這裡的_observer物件就是我們在呼叫subscribe方法時,傳遞進來的AnonymousObserver物件。而AnonymousObserver本身沒有實現on方法,而是在父類ObserverBase中實現了:

class ObserverBase<Element> : Disposable,ObserverType {
    
    private let _isStopped = AtomicInt(0)
    
    func on(_ event: Event<Element>) {
        switch event {
        case .next:
            if load(self._isStopped) == 0 {
                self.onCore(event)
            }
        case .error,1) == 0 {
                self.onCore(event)
            }
        }
    }
    
}
複製程式碼

最後呼叫了onCore方法傳遞事件:

final class AnonymousObserver<Element>: ObserverBase<Element> {
    
    typealias EventHandler = (Event<Element>) -> Void
    
    private let _eventHandler: EventHandler
    
    init(_ eventHandler: @escaping EventHandler) {
        self._eventHandler = eventHandler
    }
    
    override func onCore(_ event: Event<Element>) {
        return self._eventHandler(event)
    }
    
}
複製程式碼

而這裡的_eventHandler即是我們呼叫subscribe方法時,建立的閉包(將外部的Event和內部的Event關聯)

public func subscribe(onNext: ((Element) -> Void)? = nil,onDisposed:(() -> Void)? = nil) -> Disposable {
    ....
    let observer = AnonymousObserver<Element> { event in
        switch event {
        case .next(let value):
            onNext?(value)
        case .error(let error):
            if let onError = onError {
                onError(error)
            }
        
            disposable.dispose()
        case .completed:
            onCompleted?()
            disposable.dispose()
        }
    }
    return Disposables.create(self.asObservable().subscribe(observer),disposable)
}
複製程式碼

因此,我們會在外部接收到receive num 1的事件訊息。

四、Observable與Observer的執行過程