一張圖看懂Rxjava的原理
前言
Rxjava是NetFlix出品的Java框架, 官方描述為 a library for composing asynchronous and event-based programs using observable sequences for the Java VM,翻譯過來就是“使用可觀察序列組成的一個非同步地、基於事件的響應式程式設計框架”。一個典型的使用示範如下:
Observable.create(new ObservableOnSubscribe<String>() { @Override public void subscribe(ObservableEmitter<String> emitter) throws Exception { String s = "1234"; //執行耗時任務 emitter.onNext(s); } }).map(new Function<String, Integer>() { @Override public Integer apply(String s) throws Exception { return Integer.parseInt(s); } }).subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe();
本文要講的主要內容是Rxjava的核心思路,利用一張圖並結合原始碼分析Rxjava的實現原理,至於使用以及其比較深入的內容,比如不常用的操作符,背壓等,讀者可以自行學習。另外提一句,本文采用的Rxjava版本是2.2.3,Rxjava最新版本是3.x.x,感興趣的可以自行閱讀,但相信其最核心的原理是不會變化的。
正題
先放出本文最重要的圖:
Rxjava的核心思路被總結在了圖中,下面我們分析這張圖。
流式構建和事件傳遞
在講之前,先提一點,在Rxjava中,有Observable和Observer這兩個核心的概念,但是它們在發生訂閱時,跟普通的觀察者模式寫法不太一樣,因為常識來講,應該是觀察者去訂閱(subscribe)被觀察者,但是Rxjava為了其基於事件的流式程式設計,只能反著來,observable去訂閱observer,所以在rxjava中,subscribe可以理解“注入”觀察者。
首先我們看上面的圖片,先簡單解釋一下:圖中方形的框代表的是Observable,因為它代表節點,所以用Ni表示,圓形框代表的是觀察者Observer,用Oi標識,後面加括號的意思是Oi持有其下游Observer的引用,左側代表上游,右側代表下游。圖片裡有三條有方向的彩色粗線,代表三個不同的流,這三個流是我們為了分析問題而抽象出來的的,代表從構建到訂閱整個事件的流向,按照時間順序從上到下依次流過,它們的含義分別是:
- 從左往右的構建流:用來構建整個事件序列,這個流表徵了整個鏈路的構建過程,相當於構造方法。
- 從右往左的訂閱流:當最終訂閱(subscribe方法)這個行為發生的時候,每個節點從右向左依次執行訂閱行為。
- 從左往右的觀察者回調流:當事件發生以後,會通過這個流依次通知給各個觀察者。
我們依次分析這三條流:
構建流
在使用Rxjava時,其流式構建流程是很大的特色,避免了傳統回撥的繁瑣。怎麼實現的呢?使用過Rxjava的讀者應該都知道,Rxjava的每一步構建過程api都是相同的,這是因為每一步的函式返回結果都是一個Observable,Observable提供了Rxjava所有的功能。那麼Obsevable在Rxjava中到底扮演一個什麼角色呢?事實上,其官方定義就已經告訴我們答案了,前言裡官方定義中有這樣一段:“using Observable sequences”,所以說,Obsevable就是構建流的元件,我們可以看成一個個節點,這些節點串起來組成整個鏈路。Observable這個類實現了一個介面:ObservableSource,這個介面只有一個方法:subscribe(observer),也就是說,所有的Obsevable節點都具有訂閱這個功能,這個功能很重要,是訂閱流的關鍵,待會會講。總結一下:
在我們編寫Rxjava程式碼時,每一步操作都會生成一個新的Observable節點(沒錯,包括ObserveOn和SubscribeOn執行緒變換操作),並將新生成的Observable返回,直到最後一步執行subscribe方法
無論是構建的第一步 create方法,還是observeOn,subscribeOn變換執行緒方法,還是各種操作符比如map,flatMap等,都會生成對應的Observable,每個Observble中要實現一個最重要的方法就是subscribe,我們看其實現:
public final void subscribe(Observer<? super T> observer) {
try {
observer = RxJavaPlugins.onSubscribe(this, observer);
subscribeActual(observer);
} catch (NullPointerException e) { // NOPMD
throw e;
} catch (Throwable e) {
RxJavaPlugins.onError(e);
throw npe;
}
}
這裡提一點,大家看原始碼時遇到RxJavaPlugins時直接略過看裡面的程式碼就好了,它是hook用的,不影響主要流程。所以上面程式碼其實只有一行有用:
subscribeActual(observer);
也就是說,每個節點在執行subscribe時,其實就是在呼叫該節點的subscribeActual方法,這個方法是抽象的,每個節點的實現都不一樣。我們舉個栗子,拿ObseverOn這個操作生成的ObservableSubscribeOn瞧瞧:
public final class ObservableSubscribeOn<T> extends AbstractObservableWithUpstream<T, T> {
final Scheduler scheduler;
public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) {
super(source);
this.scheduler = scheduler;
}
@Override
public void subscribeActual(final Observer<? super T> observer) {
final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(observer);
observer.onSubscribe(parent);
parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
}
//xxx省略
}
其中其父類繼承Observable,所以它是一個Observble。
整個過程有點像builder模式,不同之處是它是生成了新的節點,而builder模式返回的自身。如果你讀過okHttp的原始碼,okHttp中攔截器跟這裡有些相似,okHttp中會構建多個Chain節點,然後用相應的Intercepter去處理Chain。
我們理解了編寫Rxjava程式碼的過程其實就是構建一個一個Observable節點的過程,接下來我們看第二條流。
訂閱流
構建過程只是通過建構函式將一些配置傳給了各個節點,實際還沒有執行任何程式碼,只有最後一步才真正的執行訂閱行為。當最後一個節點呼叫subscribe方法時,是構建流向訂閱流變化的轉折點,我們以圖中為例:最後一個節點是N5,N5節點是最後一個flatmap操作符方法產生的,也就是說,最後是呼叫這個節點的subscribe方法,這個方法最終也是會呼叫到subscribeActual方法中去,我們看其原始碼:
public final class ObservableFlatMap<T, U> extends AbstractObservableWithUpstream<T, U> {
final Function<? super T, ? extends ObservableSource<? extends U>> mapper;
final boolean delayErrors;
final int maxConcurrency;
final int bufferSize;
public ObservableFlatMap(ObservableSource<T> source,
Function<? super T, ? extends ObservableSource<? extends U>> mapper,
boolean delayErrors, int maxConcurrency, int bufferSize) {
super(source);
this.mapper = mapper;
this.delayErrors = delayErrors;
this.maxConcurrency = maxConcurrency;
this.bufferSize = bufferSize;
}
@Override
public void subscribeActual(Observer<? super U> t) {
if (ObservableScalarXMap.tryScalarXMapSubscribe(source, t, mapper)) {
return;
}
source.subscribe(new MergeObserver<T, U>(t, mapper, delayErrors, maxConcurrency, bufferSize));
}
static final class MergeObserver<T, U> extends AtomicInteger implements Disposable, Observer<T> {
final Observer<? super U> downstream;
final Function<? super T, ? extends ObservableSource<? extends U>> mapper;
}
剛才我們分析了,N5節點是Observable節點,其subscribe方法最後呼叫的是subscribeActual方法,我們看上面程式碼中它的這個方法:前面的判斷語句跳過,第二行:
source.subscribe(new MergeObserver<T, U>(t, mapper, delayErrors, maxConcurrency, bufferSize));
這行程式碼需要注意兩點:
- 生成了一個新的Observer,請注意其建構函式中第一個引數t,儲存到了downstream這個“下游”變數中,這個t從哪兒傳進來的呢?對於N5節點來說,這個t就是我們程式碼中最後一步編寫的Observer,比如我們常用的網路請求返回後的回撥。也就是說,這個新生成的Observer包含了它的“下游”觀察者的引用,在圖片中對應最右邊的圓形框O1(observer)。
- 執行訂閱行為,這裡的source是該節點建構函式傳入的source,通過原始碼得知其實就是N5節點的上一個節點N4,因此,這裡的訂閱行為本質上是讓當前節點的上一個節點訂閱當前節點新生成的Observer。
到這裡,我們分析了最後一個節點執行subscribe方法的過程,事實上,每個節點的執行流程都是類似的(subscribeOn節點有些特殊,等會執行緒排程會將),也就是說,N5會呼叫N4的subscribe方法,而在N4的subscribe方法中,又去呼叫了N3的subscribe....一直到N0會呼叫source的subscribe方法。總結下來就是:
從最後一個N5節點的訂閱行為開始,依次執行前面各個節點真正的訂閱方法。在每個節點的訂閱方法中,都會生成一個新的Observer,這個Observer會包含“下游”的Observer,這樣當每個節點都執行完訂閱(subscribeActual)後,也就生成了一串Observer,它們通過downstream,upstream引用連線。
以上就是訂閱流的發生過程,簡單講就是下游節點呼叫上游節點的subscribeActual方法,從而形成了一個呼叫鏈。
觀察者回調流
當訂閱流執行到最後,也就是第一個節點N0時,我們看發生了什麼,首先看看N0節點怎麼建立的:
public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
ObjectHelper.requireNonNull(source, "source is null");
return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
}
生成了ObservableCreate例項,我們看這個類(簡化):
public final class ObservableCreate<T> extends Observable<T> {
final ObservableOnSubscribe<T> source;
public ObservableCreate(ObservableOnSubscribe<T> source) {
this.source = source;
}
@Override
protected void subscribeActual(Observer<? super T> observer) {
CreateEmitter<T> parent = new CreateEmitter<T>(observer);
observer.onSubscribe(parent);
source.subscribe(parent);
}
}
所以訂閱流的最終會掉到上面的subscrbeActual方法,它其實還是和其他節點一樣,最主要的還是執行了
source.subscribe(parent)
這行程式碼,那麼這個節點的source是什麼呢?它就是我們事件的源頭啊!
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
String s = "1234";
//執行耗時任務
emitter.onNext(s);
}
})
上面程式碼直接拿的開頭的例子,這個source是一個ObservableOnSubscribe,看它的subscribe方法裡,這裡很重要,這個函式裡面其實是訂閱流和觀察者流的轉折點,也就是流在這兒“轉向了”。這裡,這個事件源沒有像節點那樣,呼叫上一個節點的訂閱方法,而是呼叫了其引數的emitter的onNext方法,這個emitter對應N0節點的什麼呢?看程式碼知道,時CreateEmitter這個類,我們看這個類裡面
static final class CreateEmitter<T> extends AtomicReference<Disposable>
implements ObservableEmitter<T>, Disposable {
final Observer<? super T> observer;
CreateEmitter(Observer<? super T> observer) {
this.observer = observer;
}
@Override
public void onNext(T t) {
if (!isDisposed()) {
observer.onNext(t);
}
}
//省略
}
看它的onNext方法,執行的是
observer.onNext(t)
observer是誰?建構函式傳進來的,也就是N0節點subscribeActual方法中的observer,這個observer是誰呢?仔細回想一下,前面訂閱流的時候不就是一次訂閱上一個節點生成的Observer嗎,所以這個observer就是前一個節點N1生成的Observer,我們看N1節點,是一個Map,對應的Observable節點裡的Observer原始碼如下:
static final class MapObserver<T, U> extends BasicFuseableObserver<T, U> {
final Function<? super T, ? extends U> mapper;
MapObserver(Observer<? super U> actual, Function<? super T, ? extends U> mapper) {
super(actual);
this.mapper = mapper;
}
@Override
public void onNext(T t) {
if (done) {
return;
}
if (sourceMode != NONE) {
downstream.onNext(null);
return;
}
U v;
try {
v = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper function returned a null value.");
} catch (Throwable ex) {
fail(ex);
return;
}
downstream.onNext(v);
//省略後續
名為MapObserver,看它的onNext方法,忽略前面兩個判斷語句,核心就兩句,一個是mapper.apply(t),另一個就是downstream.onNext(v)。也就是說,這個mapObserver幹了兩件事,一個是把上個節點返回的資料進行一次map變換,另一個就是將map後的結果傳遞給下游,下游是什麼呢?看了訂閱流的讀者自然知道,就是N2節點的Observer,對應圖中O4,依次類推,我們知道了,事件發生以後,通過各個節點的Observer事件源被層層處理並傳遞給下游,一直到最後一個觀察者執行完畢,整個事件處理完成。
至此,我們三個流分析完畢,接下來,我們開始分析執行緒排程是怎麼實現的。
執行緒排程
觀察仔細的讀者可能已經看到了,圖中N2節點左側的所有節點和右側的節點顏色不同,我為什麼要這樣畫呢?其實裡面的玄機就是執行緒排程,接下來我們分別看subscribeOn和observeOn的執行緒切換玄機吧。
SubscribeOn
在訂閱流發生的的時候,大多數節點都是直接呼叫上一個節點的subscribe方法,實現雖有差別,但大同小異。唯一有個最大的不同就是subscribeOn這個節點,我們看原始碼:
public final class ObservableSubscribeOn<T> extends AbstractObservableWithUpstream<T, T> {
final Scheduler scheduler;
public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) {
super(source);
this.scheduler = scheduler;
}
@Override
public void subscribeActual(final Observer<? super T> observer) {
final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(observer);
observer.onSubscribe(parent);
parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
}
普通的節點執行時,大多隻是簡單的執行source.subscribe(observer),但是這個不一樣。先看第二行,它呼叫了觀察者的onSubscribe方法,熟悉Rxjava的人知道,我們在自定義Observer的時候,裡面有這個回撥,其發生時機就在此刻。我們接著看最後一行,忽略parent.setDisposable這個邏輯,我們直接看引數裡面的東西。
scheduler.scheduleDirect(new SubscribeTask(parent))
看看幹了什麼:
@NonNull
public Disposable scheduleDirect(@NonNull Runnable run) {
return scheduleDirect(run, 0L, TimeUnit.NANOSECONDS);
}
繼續:
@NonNull
public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
final Worker w = createWorker();
final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
DisposeTask task = new DisposeTask(decoratedRun, w);
w.schedule(task, delay, unit);
return task;
}
建立了一個worker,一個runnable,然後將二者封裝到一個DisposeTask中,最後用worker執行這個task,那麼這個worker是什麼呢?
@NonNull
public abstract Worker createWorker();
createworker是一個抽象方法,所以需要去找Scheduler的子類,我們回想一下rxjava的使用,如果在子執行緒中執行,我們一般設定排程器為Schedulers.io(),我們看這個子類的實現:
在IOSchedluer類中:
@Override
public Disposable schedule(@NonNull Runnable action, long delayTime, @NonNull TimeUnit unit) {
if (tasks.isDisposed()) {
// don't schedule, we are unsubscribed
return EmptyDisposable.INSTANCE;
}
return threadWorker.scheduleActual(action, delayTime, unit, tasks);
}
繼續:
@NonNull
public ScheduledRunnable scheduleActual(final Runnable run, long delayTime, @NonNull TimeUnit unit, @Nullable DisposableContainer parent) {
Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
ScheduledRunnable sr = new ScheduledRunnable(decoratedRun, parent);
if (parent != null) {
if (!parent.add(sr)) {
return sr;
}
}
Future<?> f;
try {
if (delayTime <= 0) {
f = executor.submit((Callable<Object>)sr);
} else {
f = executor.schedule((Callable<Object>)sr, delayTime, unit);
}
sr.setFuture(f);
} catch (RejectedExecutionException ex) {
if (parent != null) {
parent.remove(sr);
}
RxJavaPlugins.onError(ex);
}
return sr;
}
這裡的executor就是一個ExecutorService,熟悉執行緒池的讀者應該知道,這裡的submit方法,就是將callable丟到執行緒池中去執行任務了。
我們回到主線
scheduler.scheduleDirect(new SubscribeTask(parent))
對於io執行緒的排程器來說,上面的程式碼就是將new SubscribeTask(parent)丟到執行緒池中執行,我們看引數裡面的SubscribeTask:
final class SubscribeTask implements Runnable {
private final SubscribeOnObserver<T> parent;
SubscribeTask(SubscribeOnObserver<T> parent) {
this.parent = parent;
}
@Override
public void run() {
source.subscribe(parent);
}
}
看run方法:source.subscribe(parent),這裡的parent跟普通節點一樣,仍然是本節點生成的新的Observer,對於本節點來說,是一個SubscribeOnObserver。因此,我們就知道了,對於subscribeOn這個節點,它跟普通的節點不同之處在於:
SubscribeOn節點在訂閱的時候,將它的上游節點的訂閱行為,以runnable的形式扔給了一個執行緒池(對於IO排程器來說),也就是說,當訂閱流流到SubscribeOn節點時,執行緒發生了切換,之後流向的節點都在切換後的執行緒中執行。
分析到這裡,我們就知道了subscribeOn的執行緒切換原理了,原來是在訂閱流中塞了一個執行緒變化操作。我們再看圖中的顏色問題,為什麼這個節點上游的節點都是紅色的呢?因為當訂閱流流過這個節點後,後面的節點只是單純的傳遞給上游節點而已,無論是普通的操作符,還是ObserveOn節點,都是簡單的傳遞給上游,沒有做執行緒切換(注意,ObserveOn是在觀察者流中做的執行緒切換,待會會講)。
我們再思考一個問題,如果上游還有別的subscribeOn,會發生什麼?
我們假設N1節點的map修改程subscribeOn(AndroidScheduler.Main),也就是說,切換到主執行緒。我們還是從N2節點開始分析,剛才說到最後會執行到SubscribeTask裡的Run方法,注意此時source.subscribe(parent)發生在子執行緒中,接下來,回撥用N1節點的subscribe,N1節點回呼叫scheduler.scheduleDirect(new SubscribeTask(parent)),方法,此時,因為執行緒排程器是主執行緒的,我們看它的程式碼:
private static final class MainHolder {
static final Scheduler DEFAULT
= new HandlerScheduler(new Handler(Looper.getMainLooper()), false);
}
看看這個HandlerScheduler的方法:
@Override
public Disposable scheduleDirect(Runnable run, long delay, TimeUnit unit) {
run = RxJavaPlugins.onSchedule(run);
ScheduledRunnable scheduled = new ScheduledRunnable(handler, run);
handler.postDelayed(scheduled, unit.toMillis(delay));
return scheduled;
}
熟悉Android Handler機制的讀者應該很清楚,這裡會把N1節點上游的操作,通過Handler機制,扔給主執行緒操作,雖然這一步是在N2節點的子執行緒中執行的,但是它之前的事件仍然會在主執行緒中執行。因此我們有以下結論:
subscribeOn節點影響它前面的節點的執行緒,如果前面還有多個subscribeOn節點,最終只有第一個,也就是最上游的那個節點生效
接下來我們分析observeOn
ObserveOn
前面的subscribeOn執行緒切換是在訂閱流中發生的,接下來的ObserveOn比較簡單,它發生在第三條流-觀察者回調流中,我們看ObserveOn節點的原始碼:
static final class ObserveOnObserver<T> extends BasicIntQueueDisposable<T>
implements Observer<T>, Runnable {
//簡化
@Override
public void onNext(T t) {
if (done) {
return;
}
if (sourceMode != QueueDisposable.ASYNC) {
queue.offer(t);
}
schedule();
}
}
在前面的觀察者流分析時,我們知道,觀察者流是通過onNext()方法傳遞的,我們看最後一行,schedule(),執行緒切換,所以這個ObserveOn節點其實沒幹啥事,就是切換執行緒了,而且是在onNext回撥中切換的。我們進到這個方法:
void schedule() {
if (getAndIncrement() == 0) {
worker.schedule(this);
}
}
worker是這個節點訂閱時指定的 scheduler.createWorker(), 以主執行緒觀察為例:
public Disposable schedule(Runnable run, long delay, TimeUnit unit) {
run = RxJavaPlugins.onSchedule(run);
ScheduledRunnable scheduled = new ScheduledRunnable(handler, run);
Message message = Message.obtain(handler, scheduled);
message.obj = this; // Used as token for batch disposal of this worker's runnables.
if (async) {
message.setAsynchronous(true);
}
handler.sendMessageDelayed(message, unit.toMillis(delay));
// Re-check disposed state for removing in case we were racing a call to dispose().
if (disposed) {
handler.removeCallbacks(scheduled);
return Disposables.disposed();
}
return scheduled;
}
同樣,通過Handler機制,將runnable扔給主執行緒執行,runnable是誰呢,是this,this就是這個ObserveOnObserver,我們看它的run方法:
@Override
public void run() {
if (outputFused) {
drainFused();
} else {
drainNormal();
}
}
繼續看drainNormal
void drainNorml() {
//簡化
final Observer<? super T> a = downstream;
T v;
v = q.poll();
a.onNext(v);
}
抓重點,還是把上游的處理結果扔給下游。也就是說observeOn會將它下游的onNext操作扔給它切換的執行緒中,因此ObserveOn影響的是它的下游,所以我們途中observeOn後面的顏色都是藍的。
同樣我們思考,如果有多個observeOn會發生什麼?很簡單,思路同subscribeOn,每個ObserveOn只會影響它下游一直到下一個obseveOn節點的執行緒,也就是分段的。
總結
到此為止我們就講完了全部內容,包括三條流的原理和執行緒切換的原理,至於Rxjava的其他功能和原理,限於篇幅,本文不會講解,感興趣的讀者自行閱讀原始碼。本文主要為讀者提供了理解Rxjava的思路,真正要去理解它,還是要多看原始碼。
在我看來,Rxjava有點像觀察者模式和責任鏈模式的結合變種,普通的觀察者模式一般是被觀察者通知多個觀察者,而Rxjava則是被觀察者通知第一個Obsever,接下來Observer依次通知其他節點的Observer,將觀察者模式進行了一種類似鏈式的變換,每個節點又會執行它不同的“職責”,非常巧妙,事件在Observable鏈條上進行傳遞,事件結果通過Observer鏈條進行回撥,這或許就是Rxjava的精髓所