RxSwift原始碼解析一
RxSwift原始碼解析一
一、介紹
一個幫助我們簡化非同步程式設計的Swift框架。
二、核心
-
Observable
:產生事件 -
Observer
:響應事件 -
Operator
:建立變化組合事件 -
Disposable
:管理繫結(訂閱)的生命週期 -
Schedulers
:執行緒佇列調配
三、Observable
與Observer
之間的關係
- 例子
let _ = Observable<Int>.create { (observer) -> Disposable in
observer.onNext(1)
observer.onNext(2 )
observer.onNext(3)
observer.onCompleted()
return Disposables.create()
}.subscribe(onNext: { (num) in
print("receive num \(num)")
},onError: { (error) in
print("error: \(error.localizedDescription)")
},onCompleted: {
print("receive complete")
})
複製程式碼
如上程式碼出現兩個重要的方法create
和subscribe
create
方法是建立一個Observable
物件,而subscribe
方法是建立一個訂閱事件。我們先關注下create
方法如何建立一個Observable
物件。
//Create.swift
extension ObservableType {
public static func create(_ subscribe: @escaping (AnyObserver<Element>) -> Disposable) -> Observable<Element> {
return AnonymousObservable (subscribe)
}
}
複製程式碼
首先看它傳入的引數為一個閉包:AnyObserver<Element> -> Disposable
,然後返回的是一個Observable<Element>
物件。對比我們的例子,我們可以確定Element
為我們指定的Int
,即泛型Element
表示資料來源型別。
上面返回的是一個AnonymousObservable
物件,並將閉包作為引數傳入。
//Create.swift
final private class AnonymousObservable<Element> : Producer<Element> {
typealias SubscribeHandler = (AnyObserver<Element>) -> Disposable
let _subscribeHandler: SubscribeHandler
init(_ subscribeHandler: @escaping SubscribeHandler) {
self._subscribeHandler = subscribeHandler
}
}
複製程式碼
AnonymousObservable
將傳入的閉包賦值給變數_subscribeHandler
。至此建立完了一個Observable
物件。然後執行其subscribe
方法:
public func subscribe(onNext: ((Element) -> Void)? = nil,onError: ((Swift.Error) -> Void)? = nil,onCompleted:(() -> Void)? = nil,onDisposed:(() -> Void)? = nil) -> Disposable {
let disposable: Disposable
if let disposed = onDisposed {
disposable = Disposables.create(with: disposed)
} else {
disposable = Disposables.create()
}
let observer = AnonymousObserver<Element> { event in
switch event {
case .next(let value):
onNext?(value)
case .error(let error):
if let onError = onError {
onError(error)
}
disposable.dispose()
case .completed:
onCompleted?()
disposable.dispose()
}
}
return Disposables.create(self.asObservable().subscribe(observer),disposable)
}
複製程式碼
這一段程式碼比較長,我們先從引數下手,可以看到引數中包括onNext
(產生下一個事件)、onError
(產生錯誤)、onCompleted
(產生完成)和onDisposed
四個不同的閉包。我們先暫時不管Disposed
部分內容,直接看到下面相關程式碼:
let observer = AnonymousObserver<Element> { event in
switch event {
case .next(let value):
onNext?(value)
case .error(let error):
if let onError = onError {
onError(error)
}
disposable.dispose()
case .completed:
onCompleted?()
disposable.dispose()
}
}
複製程式碼
上面程式碼建立了一個AnonymousObserver
物件,並將引數的閉包事件與自身產生的event
事件關聯在一起。
final class AnonymousObserver<Element>: ObserverBase<Element> {
typealias EventHandler = (Event<Element>) -> Void
private let _eventHandler: EventHandler
init(_ eventHandler: @escaping EventHandler) {
self._eventHandler = eventHandler
}
}
複製程式碼
AnonymousObserver
物件將上面關聯的一個事件轉換閉包作為引數儲存到變數_eventHandler
中。其實可以簡單地理解AnonymousObserver
物件將上面subscribe
方法中的引數閉包儲存起來了。
再回到subscribe
方法,看到最後一句程式碼:
return Disposables.create(self.asObservable().subscribe(observer),disposable)
複製程式碼
我們關注到這裡self.asObservable().subscribe(observer)
,首先呼叫了asObservable()
方法:
//ObservableConvertibleType.swift
public protocol ObservableConvertibleType {
associatedtype Element
func asObservable() -> Observable<Element>
}
//Observable.swift
public class Observable<Element> : ObservableType {
public func subscribe<Observer: ObserverType>(_ observer: Observer) -> Disposable where Observer.Element == Element {
rxAbstractMethod()
}
public func asObservable() -> Observable<Element> {
return self
}
}
複製程式碼
如上所示,我們可以看到asObservable()
方法返回的自己本身Observable
。但我們看到這裡對應的subscribe
方法為"抽象方法",上面我們建立的是AnonymousObservable
物件,在它的父類Producer
中實現了:
class Producer<Element> : Observable<Element> {
override func subscribe<Observer: ObserverType>(_ observer: Observer) -> Disposable where Observer.Element == Element {
if !CurrentThreadScheduler.isScheduleRequired {
let disposer = SinkDisposer()
let sinkAndSubscription = self.run(observer,cancel: disposer)
disposer.setSinkAndSubscription(sink: sinkAndSubscription.sink,subscription: sinkAndSubscription.subscription)
return disposer
} else {
return CurrentThreadScheduler.instance.schedule(()) { _ in
let disposer = SinkDisposer()
let sinkAndSubscription = self.run(observer,cancel: disposer)
disposer.setSinkAndSubscription(sink: sinkAndSubscription.sink,subscription: sinkAndSubscription.subscription)
return disposer
}
}
}
func run<Observer: ObserverType>(_ observer: Observer,cancel: Cancelable) -> (sink: Disposable,subscription: Disposable) where Observer.Element == Element {
rxAbstractMethod()
}
}
複製程式碼
同樣我們先暫時不管Scheduler
相關內容,這裡呼叫了self.run()
方法,但它本身並未實現該方法,同樣我們在AnonymousObservale
中可以找到:
final private class AnonymousObservable<Element> : Producer<Element> {
override func run<Observer: ObserverType>(_ observer: Observer,subscription: Disposable) where Element == Observer.Element {
let sink = AnonymousObservableSink(observer: observer,cancel: cancel)
let subscription = sink.run(self)
return (sink: sink,subscription: subscription)
}
}
複製程式碼
在run
方法中建立了一個AnonymousObservableSink
方法,然後呼叫了它的run
方法。
final private class AnonymousObservableSink<Observer: ObserverType>: Sink<Observer>,ObserverType {
typealias Element = Observer.Element
typealias Parent = AnonymousObservable<Element>
private let _isStopped = AtomicInt(0)
override init(observer: Observer,cancel: Cancelable) {
super.init(observer: observer,cancel: cancel)
}
func run(_ parent: Parent) -> Disposable {
return parent._subscribeHandler(AnyObserver(self))
}
}
複製程式碼
我們先看到其run
方法,可以看到它執行了parent._subscribeHandler(AnyObserver(self))
,這一句很關鍵,這裡的parent
其實指的是我們在呼叫create
方法時,建立的AnonymousObservable
物件。因此,這裡的_subscribeHandler
就是我們create
方法傳遞的引數閉包。我們可以看到這裡建立了一個AnyObserver
物件傳入到閉包中。
回到例子中的閉包內容:
{ (observer) -> Disposable in
observer.onNext(1)
observer.onNext(2)
observer.onNext(3)
observer.onCompleted()
return Disposables.create()
}
複製程式碼
這裡呼叫了onNext
方法產生元素1
:
public protocol ObserverType {
associatedtype Element
func on(_ event: Event<Element>)
}
extension ObserverType {
public func onNext(_ element: Element) {
self.on(.next(element))
}
}
複製程式碼
這裡呼叫了on
方法傳遞元素,而我們上面知道這裡的ObserverType
是AnyObserver
物件:
public struct AnyObserver<Element> : ObserverType {
public typealias EventHandler = (Event<Element>) -> Void
private let observer: EventHandler
public init<Observer: ObserverType>(_ observer: Observer) where Observer.Element == Element {
self.observer = observer.on
}
public func on(_ event: Event<Element>) {
return self.observer(event)
}
}
複製程式碼
接著呼叫了self.observer(event)
將事件傳遞下去,而這裡的observer
是在建立parent._subscribeHandler(AnyObserver(self))
時傳入的。即self.observer = observer.on => AnonymousObservableSink.on
。
final private class AnonymousObservableSink<Observer: ObserverType>: Sink<Observer>,ObserverType {
func on(_ event: Event<Element>) {
switch event {
case .next:
if load(self._isStopped) == 1 {
return
}
self.forwardOn(event)
case .error,.completed:
if fetchOr(self._isStopped,1) == 0 {
self.forwardOn(event)
self.dispose()
}
}
}
}
複製程式碼
因此,事件會傳遞到AnonymousObservableSink
中,並通過fowardOn
方法繼續傳遞事件.
class Sink<Observer: ObserverType>: Disposable {
fileprivate let _observer: Observer
fileprivate let _cancel: Cancelable
init(observer: Observer,cancel: Cancelable) {
self._observer = observer
self._cancel = cancel
}
final func forwardOn(_ event: Event<Observer.Element>) {
if isFlagSet(self._diposed,1) {
return
}
self._observer.on(event)
}
}
複製程式碼
這裡呼叫了self._observer.on(event)
方法傳遞事件,而這裡的_observer
物件就是我們在呼叫subscribe
方法時,傳遞進來的AnonymousObserver
物件。而AnonymousObserver
本身沒有實現on
方法,而是在父類ObserverBase
中實現了:
class ObserverBase<Element> : Disposable,ObserverType {
private let _isStopped = AtomicInt(0)
func on(_ event: Event<Element>) {
switch event {
case .next:
if load(self._isStopped) == 0 {
self.onCore(event)
}
case .error,1) == 0 {
self.onCore(event)
}
}
}
}
複製程式碼
最後呼叫了onCore
方法傳遞事件:
final class AnonymousObserver<Element>: ObserverBase<Element> {
typealias EventHandler = (Event<Element>) -> Void
private let _eventHandler: EventHandler
init(_ eventHandler: @escaping EventHandler) {
self._eventHandler = eventHandler
}
override func onCore(_ event: Event<Element>) {
return self._eventHandler(event)
}
}
複製程式碼
而這裡的_eventHandler
即是我們呼叫subscribe
方法時,建立的閉包(將外部的Event
和內部的Event
關聯)
public func subscribe(onNext: ((Element) -> Void)? = nil,onDisposed:(() -> Void)? = nil) -> Disposable {
....
let observer = AnonymousObserver<Element> { event in
switch event {
case .next(let value):
onNext?(value)
case .error(let error):
if let onError = onError {
onError(error)
}
disposable.dispose()
case .completed:
onCompleted?()
disposable.dispose()
}
}
return Disposables.create(self.asObservable().subscribe(observer),disposable)
}
複製程式碼
因此,我們會在外部接收到receive num 1
的事件訊息。