Rxjava基本原理解析(四)
接著上一篇的分享模式,今天我們介紹和分析執行緒切換操作符subscribeOn以及其原始碼設計。
Rxjava的一個最大優點之一就是靈活的執行緒切換,切換過程不影響整體鏈式邏輯流程,既方便又清新。為了對比,還是再次將一個操作符的圖放上:
subscribeOn操作符用於切換事件源的執行緒,一般用在第一個observable的後面:
Observable.create((ObservableOnSubscribe<String>) e -> e.onNext("hello")) .subscribeOn(Schedulers.newThread()) .subscribe(s -> System.out.println("onNext"));
與上一篇create操作符使用例項程式碼少了很多,原因是使用鏈式寫法和lamda表示式。create操作符的事件源還是一個ObservableOnSubscribe物件,但是訂閱的不再是observer,而是一個Consumer,這個後面文章會講解。
subscribeOn操作符需要傳一個scheduler用於指定執行緒,並建立一個被觀察者:ObservableSubscribeOn和一個觀察者SubscribeOnObserver。 ObservableSubscribeOn的source為上一個observable,最重的是其subscribeActual()方法,原始碼如下:
@Override public void subscribeActual(final Observer<? super T> s) { final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s); s.onSubscribe(parent); parent.setDisposable(scheduler.scheduleDirect(new Runnable() { @Override public void run() { source.subscribe(parent); } })); }
subscribeOn操作符是改變事件源的執行緒,即設定第一個observable.subscribe的執行緒。subscribeActual方法中不是直接呼叫source.subscribe(parent);而是通過傳入的scheduler呼叫。由於傳入的scheduler指定了新的執行緒,那麼scheduler內部會切換到指定執行緒然後呼叫source.subscribe(parent);實現切換事件源執行緒。
下面來看看scheduler內部是如何實現執行緒切換的。本篇以Schedulers.newThread()為例進行分析,其他執行緒以及對比分析後面會單獨寫一篇。Scheduler通過入口方法scheduleDirect傳入一個runable介面,通過runable的run方法呼叫source.subscribe(parent);,scheduleDirect內部實際是呼叫如下的過載方法:
public Disposable scheduleDirect(Runnable run, long delay, TimeUnit unit) {
final Worker w = createWorker();
final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
w.schedule(new Runnable() {
@Override
public void run() {
try {
decoratedRun.run();
} finally {
w.dispose();
}
}
}, delay, unit);
return w;
}
該方法呼叫了createWorker()獲得一個Worker,然後通過呼叫worker的schedule方法來呼叫傳入的runable介面方法。createWorker是個抽象方法,需要子類重新。Schedulers.newThread()實際是個Scheduler子類物件NewThreadScheduler。該類重新了createWorker並得到一個Worker的子類NewThreadWorker。那麼真正的執行緒切換就是靠Worker來控制的。NewThreadWorker的schedule方法實際呼叫的是scheduleActual方法,原始碼如下:
public ScheduledRunnable scheduleActual(final Runnable run, ...) {
Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
ScheduledRunnable sr = new ScheduledRunnable(decoratedRun, parent);
//...
try {
if (delayTime <= 0) {
f = executor.submit((Callable<Object>)sr);
} else {
f = executor.schedule((Callable<Object>)sr, delayTime, unit);
}
sr.setFuture(f);
}
catch (RejectedExecutionException ex) {
//...
}
return sr;
}
最後會使用執行緒池executor.submit((Callable<Object>)sr);去執行傳入的runable介面方法。整體流程執行緒變換如下圖所示:
注意以下兩點:
1.根據ObservableSubscribeOn 的subscribeActual原始碼可知,s.onSubscribe(parent)線上程切換前執行,那麼onSubscribe不受subscribeOn執行緒切換影響,在原執行緒執行;
2.在使用subscribeOn操作符切換到新執行緒後,上游的所有subscribe訂閱方法都會在新執行緒執行,一直穿透到事件源。如果整鏈式操作中,上游有新的subscribeOn操作符,那麼執行緒又會改變,因此多個subscribeOn後,只有第一個會對事件源執行緒有效。