RxJava2原始碼解析——基本流程、執行緒排程
本篇文章的目的: ①瞭解RxJava的基本流程 ②瞭解RxJava中執行緒排程的實現 ③瞭解了上面那些,其他的操作符對你來說就不是問題了
RxJava基本流程
我們從基本的使用作為入口:
Observable.create(new ObservableOnSubscribe<String>() { @Override public void subscribe(ObservableEmitter<String> e) throws Exception { e.onNext("hey"); e.onComplete(); } }).subscribe(new Observer<String>() { @Override public void onSubscribe(Disposable d) { Log.d(TAG, "onSubscribe() : d = " + d ); } @Override public void onNext(String value) { Log.d(TAG, "onNext() : value = " + value ); } @Override public void onError(Throwable e) { Log.d(TAG, "onError() : e = " + e ); } @Override public void onComplete() { Log.d(TAG, "onComplete()"); } });
我們以create方法作為入口,它接受的引數是一個ObservableOnSubscribe物件,ObservableOnSubscribe是一個介面,裡面只有一個subscribe方法我們需要實現:
public interface ObservableOnSubscribe<T> {
void subscribe(@NonNull ObservableEmitter<T> e) throws Exception;
}
create方法接受一個ObservableOnSubscribe物件:
public static <T> Observable<T> create(ObservableOnSubscribe<T> source) { ObjectHelper.requireNonNull(source, "source is null"); return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source)); }
這裡呼叫RxJavaPlugins.onAssembly方法,裡面呼叫相關的hook方法,這裡不詳細講,我們只要知道它返回了原物件。
public static <T> Observable<T> onAssembly(@NonNull Observable<T> source) { Function<? super Observable, ? extends Observable> f = onObservableAssembly; if (f != null) { return apply(f, source); } return source; }
也就是說create方法返回了一個ObservableCreate物件,它繼承自Observable:
public final class ObservableCreate<T> extends Observable<T> {
final ObservableOnSubscribe<T> source;
public ObservableCreate(ObservableOnSubscribe<T> source) {
this.source = source;
}
}
該類的建構函式只是做了source的儲存,該source是我們呼叫create方法時傳入的ObservableOnSubscribe,也就是說,用ObservableCreate對ObservableOnSubscribe進行包裝(即裝飾者模式)。
到這裡,create方法最終返回一個ObservableCreate物件,它繼承自Observable,接下來就是subscribe(Observer)方法了,
.subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "onSubscribe() : d = " + d );
}
@Override
public void onNext(String value) {
Log.d(TAG, "onNext() : value = " + value );
}
...
});
我們看一下observable的subscribe方法:
public final void subscribe(Observer<? super T> observer) {
ObjectHelper.requireNonNull(observer, "observer is null");
try {
observer = RxJavaPlugins.onSubscribe(this, observer);
ObjectHelper.requireNonNull(observer, "Plugin returned null Observer");
subscribeActual(observer);
} catch (NullPointerException e) { // NOPMD
...
}
}
RxJavaPlugins.onSubscribe同樣是hook方法,最後呼叫了自己的subscribeActual(Observer),也就是說:subscribeActual方法是在我們完成訂閱 即呼叫subscribe(Observer)的時候 被呼叫的,引數就是從下游傳遞上來的Observer物件, 那麼我們直接看ObservableCreate的subscribeActual方法:
protected void subscribeActual(Observer<? super T> observer) {
//用observer構造一個CreateEmitter物件
CreateEmitter<T> parent = new CreateEmitter<T>(observer);
//呼叫observer的onSubscribe方法
observer.onSubscribe(parent);
try {
//呼叫被觀察者source的subscribe方法,傳入CreateEmitter
source.subscribe(parent);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
parent.onError(ex);
}
}
可以看到裡面用一個CreateEmitter包裝下游的Observer,然後呼叫Observer的onSubscribe方法,接著呼叫上游物件也就是source(被包裝物件)的subscribe方法。也就是在subscribeActual方法中呼叫了source(被包裝物件)的subscribe方法,這樣自下而上一層層呼叫他們的subscribe方法,其他操作符也和create操作符一樣,包裝,然後再subscribeActual中呼叫上游的subscribe。 這裡的CreateEmitter很熟悉,
static final class CreateEmitter<T>
extends AtomicReference<Disposable>
implements ObservableEmitter<T>, Disposable {
CreateEmitter(Observer<? super T> observer) {
this.observer = observer;
}
}
CreateEmitter實現ObservableEmitter介面,從建構函式中得知它對Observer進行包裝, 在被觀察者的subscribe方法中我們呼叫引數e的onNext和onComplete等來push事件,根據subscribeActual程式碼中source.subscribe(parent)可以知道,這裡的e就是CreateEmitter,
public void subscribe(ObservableEmitter<String> e) throws Exception {
e.onNext("hey");
e.onComplete();
}
看一下CreateEmitter類的onNext方法,最終會呼叫observer的onNext方法
public void onNext(T t) {
if (t == null) {
onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
return;
}
if (!isDisposed()) {
observer.onNext(t);
}
}
下面直接呼叫了subscribeActual,並傳入observer,subscribeActual就是前面講的,自下而上一層層呼叫subscribeActual也就是解包裝(瞎說的- -)
這裡對基本流程做一個總結:
首先順序執行我們的程式,自上而下每個操作符都對上游返回的結果進行包裝,subscribe(Observer)方法呼叫後,自下而上一層層解包裝,最終在subscribe(ObservableEmitter e)呼叫e.onNext等方法,這裡的e就是從下游傳遞上來的observer。
RxJava執行緒排程
知道了RxJava採用裝飾者模式後,理解其他操作符就不難了,subscribeOn操作符是用來決定被觀察者執行的執行緒,
我們直接看subscribeOn方法,返回一個ObservableSubscribeOn物件:
public final Observable<T> subscribeOn(Scheduler scheduler) {
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
}
public final class ObservableSubscribeOn<T> extends AbstractObservableWithUpstream<T, T> {
final Scheduler scheduler;
public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) {
super(source);
this.scheduler = scheduler;
}
@Override
public void subscribeActual(final Observer<? super T> s) {
final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);
//呼叫observer的onSubscribe,說明onSubscribe是在訂閱的執行緒執行的
s.onSubscribe(parent);
parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
}
}
可以看到又是一次包裝,在subscribeActual中直接呼叫了Observer的onSubscribe方法,說明onSubscribe方法執行的執行緒和訂閱的執行緒是一致的。 接下來這條常常的程式碼,我們先看SubscribeTask:
final class SubscribeTask implements Runnable {
private final SubscribeOnObserver<T> parent;
SubscribeTask(SubscribeOnObserver<T> parent) {
this.parent = parent;
}
@Override
public void run() {
source.subscribe(parent);
}
}
它是ObservableSubscribeOn的內部類,實現Runnable類,說明它可以被執行,run方法中正是呼叫了source.subscribe()方法,這裡注意一點,這個source會一層層呼叫上游的程式碼,也就是說在subscribeOn操作符之上的操作都會被影響
parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
接著是scheduler.scheduleDirect方法,假設scheduler是Schedulers.IO,也就是在子執行緒執行source.subscribe, Schedulers.IO最終得到IoScheduler:
public final class IoScheduler extends Scheduler {
public IoScheduler() {
this(WORKER_THREAD_FACTORY);
}
public IoScheduler(ThreadFactory threadFactory) {
this.threadFactory = threadFactory;
this.pool = new AtomicReference<CachedWorkerPool>(NONE);
start();
}
}
內部配置了執行緒池等,由它來執行run方法。
至此,執行緒排程就解析到這裡。
歡迎糾正,喜歡點個贊。