RxJava2原始碼解析
基礎解析
我們看下RxJava最簡單的寫法
Observable.create(new ObservableOnSubscribe<String>() { @Override public void subscribe(ObservableEmitter<String> emitter) throws Exception { emitter.onNext("1"); emitter.onComplete(); } }).subscribe(new Observer<String>() { @Override public void onSubscribe(Disposable d) { System.out.println("onSubscribe"); } @Override public void onNext(String s) { System.out.println(s); } @Override public void onError(Throwable e) { System.out.println("onError"+e.getLocalizedMessage()); } @Override public void onComplete() { System.out.println("onComplete"); } })
很簡單的3個步驟:
- 建立 Observable :被觀察者
- 建立 Observer :觀察者
- 通過 subscribe() 方法建立訂閱關係
一個個來看
被觀察者的建立
public static <T> Observable<T> create(ObservableOnSubscribe<T> source) { ObjectHelper.requireNonNull(source, "source is null"); //建立了一個ObservableCreate類,裡面包裝了我們傳入的source引數 return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source)); } public final class ObservableCreate<T> extends Observable<T> { final ObservableOnSubscribe<T> source; public ObservableCreate(ObservableOnSubscribe<T> source) { this.source = source; }
觀察者的建立
這裡很簡單,只是通過new方法生成了一個簡單的Observer物件。
訂閱
訂閱是通過subscribe方法來執行的,我們來跟蹤一下,這個方法是屬於Observable類的
public final void subscribe(Observer<? super T> observer) { //校驗觀察者不為空 ObjectHelper.requireNonNull(observer, "observer is null"); try { observer = RxJavaPlugins.onSubscribe(this, observer); ObjectHelper.requireNonNull(observer, "Plugin returned null Observer"); //呼叫subscribeActual方法,然後入參是observer(被觀察者)。這個方法是抽象方法,具體的實現是交給子類的 subscribeActual(observer); } catch (NullPointerException e) { // NOPMD throw e; } catch (Throwable e) { Exceptions.throwIfFatal(e); // can't call onError because no way to know if a Disposable has been set or not // can't call onSubscribe because the call might have set a Subscription already RxJavaPlugins.onError(e); NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS"); npe.initCause(e); throw npe; } } /** * Operator implementations (both source and intermediate) should implement this method that * performs the necessary business logic. * <p>There is no need to call any of the plugin hooks on the current Observable instance or * the Subscriber. * @param observer the incoming Observer, never null */ protected abstract void subscribeActual(Observer<? super T> observer);
最終通過 subscribeActual(observer) 來實現功能,而這個方法是有具體的子類去實現的。從第一步中我們通過Observable.create()來生成的被觀察者。裡面最終的生成的是 ObservableCreate 這個類。也就是說,這個subscribeActual(observer) 方法是由 ObservableCreate 這個類去實現的,我們去裡面找一下。
@Override
protected void subscribeActual(Observer<? super T> observer) {
//這裡將我們傳入的被觀察者進行了一層封裝,裡面實現了ObservableEmitter<T>, Disposable等介面->裝飾者模式
CreateEmitter<T> parent = new CreateEmitter<T>(observer);
//呼叫被觀察者的onSubscribe方法(這裡很神奇,調起者是observer,而不是被訂閱者,是為了相容Rxajva1麼?)
observer.onSubscribe(parent);
try {
//這裡的source就是我們自己寫的那個ObservableOnSubscribe了,呼叫了裡面的subscriber方法,然後引數是封裝後的觀察者。
source.subscribe(parent);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
parent.onError(ex);
}
}
Observable.create(new ObservableOnSubscribe<String>() {
//看到了哈,實際是執行的這個方法,這裡面的emitter是我們封裝之後的CreateEmitter,那麼這裡面的onNext(),onComplete()又是誰呢?
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
emitter.onNext("1");
emitter.onComplete();
}
})
我們現在回到我們封裝生成的 CreateEmitter 這個類
static final class CreateEmitter<T>
extends AtomicReference<Disposable>
implements ObservableEmitter<T>, Disposable {
private static final long serialVersionUID = -3434801548987643227L;
final Observer<? super T> observer;
//定義的觀察者
CreateEmitter(Observer<? super T> observer) {
this.observer = observer;
}
@Override
public void onNext(T t) {
if (t == null) {
onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
return;
}
//呼叫的是觀察者的onNext()方法
if (!isDisposed()) {
observer.onNext(t);
}
}
@Override
public void onComplete() {
if (!isDisposed()) {
try {
//呼叫的是觀察者的onComplete()方法
observer.onComplete();
} finally {
//執行完onComplete()方法後要取消訂閱
dispose();
}
}
}
.....
}
到這裡為知,最簡單的一個流程基本已經走通了。。
高階用法
執行緒切換
下層切換
RxJava中我們使用的最多的應該就是進行執行緒切換了吧?通過 observeOn() 方法來進行執行緒的隨意切換,舒舒服服,再也不用進行噁心的執行緒處理了。
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
emitter.onNext("1");
emitter.onComplete();
}
}).observeOn(Schedulers.io())
observeOn() 方法是屬於Observable這個類的。我們跟蹤進去這個方法去看看。
public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
//進行空校驗
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
ObjectHelper.verifyPositive(bufferSize, "bufferSize");
return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize));
}
這裡建立了一個 ObservableObserveOn 物件,所以和之前基礎裡面將的一樣,當呼叫 subscribe() 方法的時候,會先呼叫觀察者的 onSubscribe() 方法,然後通過subscribe的層層處理,呼叫這個被觀察者裡面的 subscribeActual() 方法。
@Override
protected void subscribeActual(Observer<? super T> observer) {
if (scheduler instanceof TrampolineScheduler) {//如果傳入的scheduler是TrampolineScheduler,那麼執行緒不需要切換,直接呼叫subscribe方法即可
source.subscribe(observer);
} else {
//根據傳入的scheduler,建立Worker
Scheduler.Worker w = scheduler.createWorker();
//將傳入的observer進行包裝,包裝為ObserveOnObserver類。
source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
}
}
這裡可以依據基礎篇的進行整理一下,這裡將觀察者進行了一層包裝,也就是我們的觀察者由原來的observaer變為了ObserveOnObserver物件。而被觀察者還是之前的ObservableCreate(注意,這裡只是依據基礎中.create()建立的類,所以是ObservableCreate,如果是其他方式建立的被觀察者,那麼這裡可能就是另一個具體的實現類了),並未改變。之前我們講過,當呼叫subscribe方法的onNext(),onComplete()方法,其實是呼叫的觀察者的方法。我們現在看一下ObserveOnObserver的onNext和onComplete方法又是做了什麼神奇的操作。
@Override
public void onNext(T t) {
if (done) {//如果已經完成,直接返回
return;
}
if (sourceMode != QueueDisposable.ASYNC) {
//將onNext的資料放入佇列queue
queue.offer(t);
}
//進行執行緒切換
schedule();
}
void schedule() {
if (getAndIncrement() == 0) {
//呼叫了worker的方法,這裡通過呼叫執行緒池,呼叫了自身的run方法
worker.schedule(this);
}
}
這裡我們使用的是IO執行緒,那麼在 scheduler.createWorker() 中的生成worker時
@NonNull
@Override
public Worker createWorker() {
return new EventLoopWorker(pool.get());
}
那麼跟到這個類裡面的 schedule 方法
@Override
public Disposable schedule(@NonNull Runnable action, long delayTime, @NonNull TimeUnit unit) {
if (tasks.isDisposed()) {
// don't schedule, we are unsubscribed
return EmptyDisposable.INSTANCE;
}
//這裡呼叫了執行緒worker的scheduleActual方法,並把Runable物件傳進去
return threadWorker.scheduleActual(action, delayTime, unit, tasks);
}
public ScheduledRunnable scheduleActual(final Runnable run, long delayTime, @NonNull TimeUnit unit, @Nullable DisposableContainer parent) {
//留下鉤子
Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
ScheduledRunnable sr = new ScheduledRunnable(decoratedRun, parent);
....
Future<?> f;
try {
if (delayTime <= 0) {
//線上程池中呼叫封裝之後的Runnable
f = executor.submit((Callable<Object>)sr);
} else {
f = executor.schedule((Callable<Object>)sr, delayTime, unit);
}
sr.setFuture(f);
} catch (RejectedExecutionException ex) {
if (parent != null) {
parent.remove(sr);
}
RxJavaPlugins.onError(ex);
}
return sr;
}
可以看到,其實最終是通過執行緒池呼叫了 ObserveOnObserver 本身,這個類實現了 Runnable 介面,我們看一下run方法裡面做了什麼。
@Override
public void run() {
if (outputFused) {
drainFused();
} else {
drainNormal();
}
}
//具體的操作
void drainNormal() {
int missed = 1;
//被觀察者onNext傳送的資料佇列
final SimpleQueue<T> q = queue;
//實際的觀察者
final Observer<? super T> a = downstream;
for (;;) {
//檢測是否有異常資訊
if (checkTerminated(done, q.isEmpty(), a)) {
return;
}
//遍歷
for (;;) {
boolean d = done;
T v;
//取出佇列中的資料
try {
v = q.poll();
} catch (Throwable ex) {
//發生異常,則直接呼叫dispose()和onError()方法
Exceptions.throwIfFatal(ex);
disposed = true;
upstream.dispose();
q.clear();
a.onError(ex);
worker.dispose();
return;
}
....
//呼叫實際的觀察者的onNext()方法
a.onNext(v);
}
...
}
}
因為這個操作最終是在scheduler.createWorker()建立的地方進行了處理,才實現了對於之後程式碼處理都在io執行緒中進行了呼叫。從而實現執行緒的切換功能。這裡我們對之前的測試程式碼流程做一個總結。
先看一下對於觀察者的onSubscribe()方法的呼叫流程:
這裡面我們自己定義的觀察者通過subscribe()方法層層往上呼叫,最後呼叫了我們定義的被觀察者裡面的onSubscribe方法,再一層層的往下呼叫,最後到我們自己定義的onSubscribe()方法,裡面很少有執行緒的切換處理,所以這段程式碼在哪兒執行,那麼這段程式碼在那裡執行,這個onSubscribe()方法就是在哪個執行緒執行。
繼續,我們看一下onNext()方法
上層切換
除了 observeOn 方法來處理我們操作流的下層執行緒處理之外,我們也可以通過 subscribeOn 方法來進行對上層流的執行緒處理。
測試用程式碼:
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
emitter.onNext("1");
emitter.onComplete();
}
}).subscribeOn(Schedulers.io())
現在我們跟蹤進 subscribeOn 方法
public final Observable<T> subscribeOn(Scheduler scheduler) {
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
//
return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
}
這裡看到,跟我們基礎篇裡面的 create() 方法有異曲同工之妙,這裡面生成了一個ObservableSubscribeOn類,這個類也是繼承Observable類的,我們跟蹤進去看一下。
public final class ObservableSubscribeOn<T> extends AbstractObservableWithUpstream<T, T> {
final Scheduler scheduler;
public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) {
super(source);
this.scheduler = scheduler;
}
@Override
public void subscribeActual(final Observer<? super T> observer) {
final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(observer);
//呼叫訂閱者的onSubscribe方法,這裡的執行緒還未進行切換
observer.onSubscribe(parent);
//進行執行緒的切換處理
//1.創造一個SubscribeTask的Runable方法
//2.通過scheduler的scheduleDirect進行執行緒的切換
//3.通過parent.setDisposable來進行Disposable的切換
parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
}
看起來是不是很像?在基礎篇我們知道了,這個 subscribeActual 方法裡面的引數就是我們的觀察者。
我們看一下里面和之前分析所不同的地方,也就是執行緒的切換
final class SubscribeTask implements Runnable {
...
@Override
public void run() {
//source是我們上一層的被觀察者,parent是包裝之後的觀察者.
//所以會在相關的worker裡面呼叫source的subscribe方法,
//即上層的資料呼叫已經在woker裡面了(如果是IoScheduler,那麼這裡就是在RxCachedThreadScheduler執行緒池呼叫了這個方法 )
source.subscribe(parent);
}
}
然後看一下這裡面最重要的 scheduler.scheduleDirect 這個方法
@NonNull
public Disposable scheduleDirect(@NonNull Runnable run) {
return scheduleDirect(run, 0L, TimeUnit.NANOSECONDS);
}
@NonNull
public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
//建立一個Worker,這個是有具體的實現類來實現的,比如我們的IOScheduler,ImmediateThinScheduler等,具體要看我們切換傳參
final Worker w = createWorker();
final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
DisposeTask task = new DisposeTask(decoratedRun, w);
w.schedule(task, delay, unit);
return task;
}
這裡我們對上層切換的流程做一個總結:當呼叫 subscribeOn 方法的時候,會在建立的排程器中來執行被觀察者的執行程式碼,從而實現了對上層的執行緒切換功能。
先看一下測試程式碼中的onNext()方法的呼叫流程:
彙總
其實對於執行緒的切換,主要是根據裡面傳遞的執行緒切換函式,將上游或者下游的程式碼在指定的執行緒裡面去執行來實現。
本文由 開了肯 釋出!