RxJava系列6(從微觀角度解讀RxJava源碼)
前言
通過前面五個篇幅的介紹,相信大家對RxJava的基本使用以及操作符應該有了一定的認識。但是知其然還要知其所以然;所以從這一章開始我們聊聊源碼,分析RxJava的實現原理。本文我們主要從三個方面來分析RxJava的實現:
- RxJava基本流程分析
- 操作符原理分析
- 線程調度原理分析
本章節基於RxJava1.1.9版本的源碼
一、RxJava執行流程分析
在RxJava系列2(基本概念及使用介紹)中我們介紹過,一個最基本的RxJava調用是這樣的:
示例A
Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
subscriber.onNext("Hello RxJava!");
subscriber.onCompleted();
}
}).subscribe(new Subscriber<String>() {
@Override
public void onCompleted() {
System.out.println("completed!");
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(String s) {
System.out.println(s);
}
});
首先調用Observable.create()
創建一個被觀察者Observable
,同時創建一個OnSubscribe
作為create()
方法的入參;接著創建一個觀察者Subscriber
,然後通過subseribe()
實現二者的訂閱關系。這裏涉及到三個關鍵對象和一個核心的方法:
- Observable(被觀察者)
- OnSubscribe (從純設計模式的角度來理解,
OnSubscribe.call()
可以看做是觀察者模式中被觀察者用來通知觀察者的notifyObservers()
方法) - Subscriber (觀察者)
- subscribe() (實現觀察者與被觀察者訂閱關系的方法)
1、Observable.create()源碼分析
首先我們來看看Observable.create()
的實現:
public static <T> Observable<T> create(OnSubscribe<T> f) {
return new Observable<T>(RxJavaHooks.onCreate(f));
}
這裏創建了一個被觀察者Observable
,同時將RxJavaHooks.onCreate(f)
作為構造函數的參數,源碼如下:
protected Observable(OnSubscribe<T> f) {
this.onSubscribe = f;
}
我們看到源碼中直接將參數RxJavaHooks.onCreate(f)
賦值給了當前我們構造的被觀察者Observable
的成員變量onSubscribe
。那麽RxJavaHooks.onCreate(f)
返回的又是什麽呢?我們接著往下看:
public static <T> Observable.OnSubscribe<T> onCreate(Observable.OnSubscribe<T> onSubscribe) {
Func1<OnSubscribe, OnSubscribe> f = onObservableCreate;
if (f != null) {
return f.call(onSubscribe);
}
return onSubscribe;
}
由於我們並沒調用RxJavaHooks.initCreate()
,所以上面代碼中的onObservableCreate
為null;因此RxJavaHooks.onCreate(f)
最終返回的就是f
,也就是我們在Observable.create()
的時候new出來的OnSubscribe
。(由於對RxJavaHooks的理解並不影響我們對RxJava執行流程的分析,因此在這裏我們不做進一步的探討。為了方便理解我們只需要知道RxJavaHooks一系列方法的返回值就是入參本身就OK了,例如這裏的RxJavaHooks.onCreate(f)
返回的就是f
)。
至此我們做下邏輯梳理:Observable.create()
方法構造了一個被觀察者Observable
對象,同時將new出來的OnSubscribe
賦值給了該Observable
的成員變量onSubscribe
。
2、Subscriber源碼分析
接著我們看下觀察者Subscriber
的源碼,為了增加可讀性,我去掉了源碼中的註釋和部分代碼。
public abstract class Subscriber<T> implements Observer<T>, Subscription {
private final SubscriptionList subscriptions;//訂閱事件集,所有發送給當前Subscriber的事件都會保存在這裏
...
protected Subscriber(Subscriber<?> subscriber, boolean shareSubscriptions) {
this.subscriber = subscriber;
this.subscriptions = shareSubscriptions && subscriber != null ? subscriber.subscriptions : new SubscriptionList();
}
...
@Override
public final void unsubscribe() {
subscriptions.unsubscribe();
}
@Override
public final boolean isUnsubscribed() {
return subscriptions.isUnsubscribed();
}
public void onStart() {
}
...
}
public interface Subscription {
void unsubscribe();
boolean isUnsubscribed();
}
Subscriber
實現了Subscription
接口,從而對外提供isUnsubscribed()
和unsubscribe()
方法。前者用於判斷是否已經取消訂閱;後者用於將訂閱事件列表(也就是當前觀察者的成員變量subscriptions
)中的所有Subscription
取消訂閱,並且不再接受觀察者Observable
發送的後續事件。
3、subscribe()源碼分析
前面我們分析了觀察者和被觀察者相關的源碼,那麽接下來便是整個訂閱流程中最最關鍵的環節了。
public final Subscription subscribe(Subscriber<? super T> subscriber) {
return Observable.subscribe(subscriber, this);
}
static <T> Subscription subscribe(Subscriber<? super T> subscriber, Observable<T> observable) {
...
subscriber.onStart();
if (!(subscriber instanceof SafeSubscriber)) {
subscriber = new SafeSubscriber<T>(subscriber);
}
try {
RxJavaHooks.onObservableStart(observable, observable.onSubscribe).call(subscriber);
return RxJavaHooks.onObservableReturn(subscriber);
} catch (Throwable e) {
...
return Subscriptions.unsubscribed();
}
}
subscribe()
方法中將傳進來的subscriber
包裝成了SafeSubscriber
,SafeSubscriber
其實是subscriber
的一個代理,對subscriber
的一系列方法做了更加嚴格的安全校驗。保證了onCompleted()
和onError()
只會有一個被執行且只執行一次,一旦它們其中方法被執行過後onNext()
就不在執行了。
上述代碼中最關鍵的就是RxJavaHooks.onObservableStart(observable, observable.onSubscribe).call(subscriber)
。這裏的RxJavaHooks和之前提到的一樣,RxJavaHooks.onObservableStart(observable, observable.onSubscribe)
返回的正是他的第二個入參observable.onSubscribe
,也就是當前observable
的成員變量onSubscribe
。而這個成員變量我們前面提到過,它是我們在Observable.create()
的時候new出來的。所以這段代碼可以簡化為onSubscribe.call(subscriber)
。這也印證了我在RxJava系列2(基本概念及使用介紹)中說的,onSubscribe.call(subscriber)
中的subscriber
正是我們在subscribe()
方法中new出來的觀察者。
到這裏,我們對RxJava的執行流程做個總結:首先我們調用crate()
創建一個觀察者,同時創建一個OnSubscribe
作為該方法的入參;接著調用subscribe()
來訂閱我們自己創建的觀察者Subscriber
。
一旦調用subscribe()
方法後就會觸發執行OnSubscribe.call()
。然後我們就可以在call方法調用觀察者subscriber
的onNext()
,onCompleted()
,onError()
。
最後我用張圖來總結下之前的分析結果:
RxJava基本流程分析二、操作符原理分析
之前我們介紹過幾十個操作符,要一一分析它們的源碼顯然不太現實。在這裏我拋磚引玉,選取一個相對簡單且常用的map
操作符來分析。
我們先來看一個map
操作符的簡單應用:
示例B
Observable.create(new Observable.OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> subscriber) {
subscriber.onNext(1);
subscriber.onCompleted();
}
}).map(new Func1<Integer, String>() {
@Override
public String call(Integer integer) {
return "This is " + integer;
}
}).subscribe(new Subscriber<String>() {
@Override
public void onCompleted() {
System.out.println("onCompleted!");
}
@Override
public void onError(Throwable e) {
System.out.println(e.getMessage());
}
@Override
public void onNext(String s) {
System.out.println(s);
}
});
為了便於表述,我將上面的代碼做了如下拆解:
Observable<Integer> observableA = Observable.create(new Observable.OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> subscriber) {
subscriber.onNext(1);
subscriber.onCompleted();
}
});
Subscriber<String> subscriberOne = new Subscriber<String>() {
@Override
public void onCompleted() {
System.out.println("onCompleted!");
}
@Override
public void onError(Throwable e) {
System.out.println(e.getMessage());
}
@Override
public void onNext(String s) {
System.out.println(s);
}
};
Observable<String> observableB =
observableA.map(new Func1<Integer, String>() {
@Override
public String call(Integer integer) {
return "This is " + integer;;
}
});
observableB.subscribe(subscriberOne);
map()
的源碼和上一小節介紹的create()
一樣位於Observable
這個類中。
public final <R> Observable<R> map(Func1<? super T, ? extends R> func) {
return create(new OnSubscribeMap<T, R>(this, func));
}
通過查看源碼我們發現調用map()
的時候實際上是創建了一個新的被觀察者Observable
,我們姑且稱它為ObservableB
;一開始通過Observable.create()
創建的Observable
我們稱之為ObservableA
。在創建ObservableB
的時候同時創建了一個OnSubscribeMap
,而ObservableA
和變換函數Func1
則作為構造OnSubscribeMap
的參數。
public final class OnSubscribeMap<T, R> implements OnSubscribe<R> {
final Observable<T> source;//ObservableA
final Func1<? super T, ? extends R> transformer;//map操作符中的轉換函數Func1。T為轉換前的數據類型,在上面的例子中為Integer;R為轉換後的數據類型,在該例中為String。
public OnSubscribeMap(Observable<T> source, Func1<? super T, ? extends R> transformer) {
this.source = source;
this.transformer = transformer;
}
@Override
public void call(final Subscriber<? super R> o) {//結合第一小節的分析結果,我們知道這裏的入參o其實就是我們自己new的觀察者subscriberOne。
MapSubscriber<T, R> parent = new MapSubscriber<T, R>(o, transformer);
o.add(parent);
source.unsafeSubscribe(parent);
}
static final class MapSubscriber<T, R> extends Subscriber<T> {
final Subscriber<? super R> actual;//這裏的actual就是我們在調用subscribe()時創建的觀察者mSubscriber
final Func1<? super T, ? extends R> mapper;//變換函數
boolean done;
public MapSubscriber(Subscriber<? super R> actual, Func1<? super T, ? extends R> mapper) {
this.actual = actual;
this.mapper = mapper;
}
@Override
public void onNext(T t) {
R result;
try {
result = mapper.call(t);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
unsubscribe();
onError(OnErrorThrowable.addValueAsLastCause(ex, t));
return;
}
actual.onNext(result);
}
@Override
public void onError(Throwable e) {
...
actual.onError(e);
}
@Override
public void onCompleted() {
...
actual.onCompleted();
}
@Override
public void setProducer(Producer p) {
actual.setProducer(p);
}
}
}
OnSubscribeMap
實現了OnSubscribe
接口,因此OnSubscribeMap
就是一個OnSubscribe
。在調用map()
的時候創建了一個新的被觀察者ObservableB
,然後我們用ObservableB.subscribe(subscriberOne)
訂閱了觀察者subscriberOne
。結合我們在第一小節的分析結果,所以OnSubscribeMap.call(o)
中的o
就是subscribe(subscriberOne)
中的subscriberOne
;一旦調用了ObservableB.subscribe(subscriberOne)
就會執行OnSubscribeMap.call()
。
在call()
方法中,首先通過我們的觀察者o
和轉換函數transformer
構造了一個MapSubscriber
,最後調用了source
也就是observableA
的unsafeSubscribe()
方法。即observableA
訂閱了一個觀察者MapSubscriber
。
public final Subscription unsafeSubscribe(Subscriber<? super T> subscriber) {
try {
...
RxJavaHooks.onObservableStart(this, onSubscribe).call(subscriber);
return RxJavaHooks.onObservableReturn(subscriber);
} catch (Throwable e) {
...
return Subscriptions.unsubscribed();
}
}
上面這段代碼最終執行了onSubscribe
也就是OnSubscribeMap
的call()
方法,call()
方法中的參數就是之前在OnSubscribeMap.call()
中new出來的MapSubscriber
。最後在call()
方法中執行了我們自己的業務代碼:
subscriber.onNext(1);
subscriber.onCompleted();
其實也就是執行了MapSubscriber
的onNext()
和onCompleted()
。
@Override
public void onNext(T t) {
R result;
try {
result = mapper.call(t);
} catch (Throwable ex) {
...
return;
}
actual.onNext(result);
}
onNext(T t)
方法中的的mapper
就是變換函數,actual
就是我們在調用subscribe()
時創建的觀察者subscriberOne
。這個T
就是我們例子中的Integer
,R
就是String
。在onNext()
中首先調用變換函數mapper.call()
將T
轉換成R
(在我們的例子中就是將Integer
類型的1轉換成了String
類型的“This is 1”);接著調用subscriberOne.onNext(String result)
。同樣在調用MapSubscriber.onCompleted()
時會執行subscriberOne.onCompleted()
。這樣就完成了一直完成的調用流程。
我承認太啰嗦了,花費了這麽大的篇幅才將map()
的轉換原理解釋清楚。我也是希望盡量的將每個細節都呈現出來方便大家理解,如果看我啰嗦了這麽久還是沒能理解,請看下面我畫的這張執行流程圖。
三、線程調度原理分析
在前面的文章中我介紹過RxJava可以很方便的通過subscribeOn()
和observeOn()
來指定數據流的每一部分運行在哪個線程。其中subscribeOn()
指定了處理Observable
的全部的過程(包括發射數據和通知)的線程;observeOn()
指定了觀察者的onNext()
, onError()
和onCompleted()
執行的線程。接下來我們就分析分析源碼,看看線程調度是如何實現的。
在分析源碼前我們先看看一段常見的通過RxJava實現的線程調度代碼:
示例C
Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
subscriber.onNext("Hello RxJava!");
subscriber.onCompleted();
}
}).subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Subscriber<String>() {
@Override
public void onCompleted() {
System.out.println("completed!");
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(String s) {
System.out.println(s);
}
});
1、subscribeOn()源碼分析
public final Observable<T> subscribeOn(Scheduler scheduler) {
...
return create(new OperatorSubscribeOn<T>(this, scheduler));
}
通過上面的代碼我們可以看到,subscribeOn()
和map()
一樣是創建了一個新的被觀察者Observable
。因此我大致就能猜到subscribeOn()
的執行流程應該和map()
差不多,OperatorSubscribeOn
肯定也是一個OnSubscribe
。那我們接下來就看看OperatorSubscribeOn
的源碼:
public final class OperatorSubscribeOn<T> implements OnSubscribe<T> {
final Scheduler scheduler;//線程調度器,用來指定訂閱事件發送、處理等所在的線程
final Observable<T> source;
public OperatorSubscribeOn(Observable<T> source, Scheduler scheduler) {
this.scheduler = scheduler;
this.source = source;
}
@Override
public void call(final Subscriber<? super T> subscriber) {
final Worker inner = scheduler.createWorker();
subscriber.add(inner);
inner.schedule(new Action0() {
@Override
public void call() {
final Thread t = Thread.currentThread();
Subscriber<T> s = new Subscriber<T>(subscriber) {
@Override
public void onNext(T t) {
subscriber.onNext(t);
}
@Override
public void onError(Throwable e) {
try {
subscriber.onError(e);
} finally {
inner.unsubscribe();
}
}
@Override
public void onCompleted() {
try {
subscriber.onCompleted();
} finally {
inner.unsubscribe();
}
}
@Override
public void setProducer(final Producer p) {
subscriber.setProducer(new Producer() {
@Override
public void request(final long n) {
if (t == Thread.currentThread()) {
p.request(n);
} else {
inner.schedule(new Action0() {
@Override
public void call() {
p.request(n);
}
});
}
}
});
}
};
source.unsafeSubscribe(s);
}
});
}
}
OperatorSubscribeOn
實現了OnSubscribe
接口,call()
中對Subscriber
的處理也和OperatorMap
對Subscriber
的處理類似。首先通過scheduler
構建了一個Worker
;然後用傳進來的subscriber
構造了一個新的Subscriber s
,並將s
丟到Worker.schedule()
中來處理;最後用原Observable
去訂閱觀察者s
。而這個Worker
就是線程調度的關鍵!前面的例子中我們通過subscribeOn(Schedulers.io())
指定了Observable
發射處理事件以及通知觀察者的一系列操作的執行線程,正是通過這個Schedulers.io()
創建了我們前面提到的Worker
。所以我們來看看Schedulers.io()
的實現。
首先通過Schedulers.io()
獲得了ioScheduler
並返回,上面的OperatorSubscribeOn
通過這個的Scheduler
的createWorker()
方法創建了我們前面提到的Worker
。
public static Scheduler io() {
return RxJavaHooks.onIOScheduler(getInstance().ioScheduler);
}
接著我們看看這個ioScheduler
是怎麽來的,下面的代碼向我們展現了是如何在Schedulers
的構造函數中通過RxJavaSchedulersHook.createIoScheduler()
來初始化ioScheduler
的。
private Schedulers() {
...
Scheduler io = hook.getIOScheduler();
if (io != null) {
ioScheduler = io;
} else {
ioScheduler = RxJavaSchedulersHook.createIoScheduler();
}
...
}
最終RxJavaSchedulersHook.createIoScheduler()
返回了一個CachedThreadScheduler
,並賦值給了ioScheduler
。
public static Scheduler createIoScheduler() {
return createIoScheduler(new RxThreadFactory("RxIoScheduler-"));
}
public static Scheduler createIoScheduler(ThreadFactory threadFactory) {
...
return new CachedThreadScheduler(threadFactory);
}
到這一步既然我們知道了ioScheduler
就是一個CachedThreadScheduler
,那我們就來看看它的createWorker()
的實現。
public Worker createWorker() {
return new EventLoopWorker(pool.get());
}
上面的代碼向我們赤裸裸的呈現了前面OperatorSubscribeOn
中的Worker
其實就是EventLoopWorker
。我們重點要關註的是他的scheduleActual()
。
static final class EventLoopWorker extends Scheduler.Worker implements Action0 {
private final CompositeSubscription innerSubscription = new CompositeSubscription();
private final CachedWorkerPool pool;
private final ThreadWorker threadWorker;
final AtomicBoolean once;
EventLoopWorker(CachedWorkerPool pool) {
this.pool = pool;
this.once = new AtomicBoolean();
this.threadWorker = pool.get();
}
...
@Override
public Subscription schedule(final Action0 action, long delayTime, TimeUnit unit) {
...
ScheduledAction s = threadWorker.scheduleActual(new Action0() {
@Override
public void call() {
if (isUnsubscribed()) {
return;
}
action.call();
}
}, delayTime, unit);
innerSubscription.add(s);
s.addParent(innerSubscription);
return s;
}
}
通過對源碼的一步步追蹤,我們知道了前面OperatorSubscribeOn.call()
中的inner.schedule()
最終會執行到ThreadWorker
的scheduleActual()
方法。
public ScheduledAction scheduleActual(final Action0 action, long delayTime, TimeUnit unit) {
Action0 decoratedAction = RxJavaHooks.onScheduledAction(action);
ScheduledAction run = new ScheduledAction(decoratedAction);
Future<?> f;
if (delayTime <= 0) {
f = executor.submit(run);
} else {
f = executor.schedule(run, delayTime, unit);
}
run.add(f);
return run;
}
scheduleActual()
中的ScheduledAction
實現了Runnable
接口,通過線程池executor
最終實現了線程切換。上面便是subscribeOn(Schedulers.io())
實現線程切換的全部過程。
2、observeOn()源碼分析
observeOn()
切換線程是通過lift
來實現的,相比subscribeOn()
在實現原理上相對復雜些。不過本質上最終還是創建了一個新的Observable
。
public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
...
return lift(new OperatorObserveOn<T>(scheduler, delayError, bufferSize));
}
public final <R> Observable<R> lift(final Operator<? extends R, ? super T> operator) {
return create(new OnSubscribeLift<T, R>(onSubscribe, operator));
}
OperatorObserveOn
作為OnSubscribeLift
構造函數的參數用來創建了一個新的OnSubscribeLift
對象,接下來我們看看OnSubscribeLift
的實現:
public final class OnSubscribeLift<T, R> implements OnSubscribe<R> {
final OnSubscribe<T> parent;
final Operator<? extends R, ? super T> operator;
public OnSubscribeLift(OnSubscribe<T> parent, Operator<? extends R, ? super T> operator) {
this.parent = parent;
this.operator = operator;
}
@Override
public void call(Subscriber<? super R> o) {
try {
Subscriber<? super T> st = RxJavaHooks.onObservableLift(operator).call(o);
try {
st.onStart();
parent.call(st);
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
st.onError(e);
}
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
o.onError(e);
}
}
}
OnSubscribeLift
繼承自OnSubscribe
,通過前面的分析我們知道一旦調用了subscribe()
將觀察者與被觀察綁定後就會觸發被觀察者所對應的OnSubscribe
的call()
方法,所以這裏會觸發OnSubscribeLift.call()
。在call()
中調用了OperatorObserveOn.call()
並返回了一個新的觀察者Subscriber st
,接著調用了前一級Observable
對應OnSubscriber.call(st)
。
我們再看看OperatorObserveOn.call()
的實現:
public Subscriber<? super T> call(Subscriber<? super T> child) {
...
ObserveOnSubscriber<T> parent = new ObserveOnSubscriber<T>(scheduler, child, delayError, bufferSize);
parent.init();
return parent;
}
OperatorObserveOn.call()
中創建了一個ObserveOnSubscriber
並調用init()
進行了初始化。
static final class ObserveOnSubscriber<T> extends Subscriber<T> implements Action0 {
...
@Override
public void onNext(final T t) {
...
schedule();
}
@Override
public void onCompleted() {
...
schedule();
}
@Override
public void onError(final Throwable e) {
...
schedule();
}
protected void schedule() {
if (counter.getAndIncrement() == 0) {
recursiveScheduler.schedule(this);
}
}
@Override
public void call() {
long missed = 1L;
long currentEmission = emitted;
final Queue<Object> q = this.queue;
final Subscriber<? super T> localChild = this.child;
final NotificationLite<T> localOn = this.on;
for (;;) {
long requestAmount = requested.get();
while (requestAmount != currentEmission) {
boolean done = finished;
Object v = q.poll();
boolean empty = v == null;
if (checkTerminated(done, empty, localChild, q)) {
return;
}
if (empty) {
break;
}
localChild.onNext(localOn.getValue(v));
currentEmission++;
if (currentEmission == limit) {
requestAmount = BackpressureUtils.produced(requested, currentEmission);
request(currentEmission);
currentEmission = 0L;
}
}
if (requestAmount == currentEmission) {
if (checkTerminated(finished, q.isEmpty(), localChild, q)) {
return;
}
}
emitted = currentEmission;
missed = counter.addAndGet(-missed);
if (missed == 0L) {
break;
}
}
}
...
}
ObserveOnSubscriber
繼承自Subscriber
,並實現了Action0
接口。我們看到ObserveOnSubscriber
的onNext()
、onCompleted()
、onError()
都有個schedule()
,這個方法就是我們線程調度的關鍵;通過schedule()
將新觀察者ObserveOnSubscriber
發送給subscriberOne
的所有事件都切換到了recursiveScheduler
所對應的線程,簡單的說就是把subscriberOne
的onNext()
、onCompleted()
、onError()
方法丟到了recursiveScheduler
對應的線程中來執行。
那麽schedule()
又是如何做到這一點的呢?他內部調用了recursiveScheduler.schedule(this)
,recursiveScheduler
其實就是一個Worker
,和我們在介紹subscribeOn()
時提到的worker
一樣,執行schedule()
實際上最終是創建了一個runable
,然後把這個runnable
丟到了特定的線程池中去執行。在runnable
的run()
方法中調用了ObserveOnSubscriber.call()
,看上面的代碼大家就會發現在call()
方法中最終調用了subscriberOne
的onNext()
、onCompleted()
、onError()
方法。這便是它實現線程切換的原理。
好了,我們最後再看看示例C對應的執行流程圖,幫助大家加深理解。
RxJava執行流程總結
這一章以執行流程、操作符實現以及線程調度三個方面為切入點剖析了RxJava源碼。下一章將站在更宏觀的角度來分析整個RxJava的框架結構、設計思想等等。敬請期待~~ :)
鏈接:https://www.jianshu.com/p/bf56e03494bf
RxJava系列6(從微觀角度解讀RxJava源碼)