RxJava2原始碼分析一
RxJava 在最近兩年迅速火爆起來,最近學習RxJava2,免不了需要學習它的原始碼,寫下部落格記錄學習結果。
RxJava 的設計理念基於觀察者模式,這裡就需要先了解一下它所涉及的東西。Observable,稱為被觀察者,由它產生一系列的事件。Observer,稱為觀察者。Observer和Observable之間通過subscribe方法發生訂閱關係。這樣Observer就可以 ”觀察“ Observable發生的事件,並根據這些事件做出相應的動作。
先看示例程式碼:
Observable.create(new ObservableOnSubscribe<String>() { @Override public void subscribe(ObservableEmitter<String> emitter) throws Exception { emitter.onNext( 1); emitter.onNext( 2); } }).subscribe(new Observer<Integer>() { @Override public void onSubscribe(Disposable d) { System.out.println("FirstMain.onSubscribe d="+d.isDisposed()); } @Override public void onNext(Integer integer) { System.out.println("FirstMain.onNext i="+integer); } @Override public void onError(Throwable e) { System.out.println("FirstMain.onError"); } @Override public void onComplete() { System.out.println("FirstMain.onComplete"); } });
可以看到如上程式碼,首先呼叫Observable的create方法建立一個Observable物件,實際建立的是ObservableCreate的物件,ObservableCreate是Observable的子類。在create方法中首先進行判空。然後建立並返回ObservableCreate物件。
public static <T> Observable<T> create(ObservableOnSubscribe<T> source) { ObjectHelper.requireNonNull(source, "source is null");//判空 return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source)); }
返回ObservableCreate物件時先呼叫了RxJavaPlugins.onAssembly方法,下面看一下這個方法:
public static <T> Observable<T> onAssembly(@NonNull Observable<T> source) {
Function<? super Observable, ? extends Observable> f = onObservableAssembly;
if (f != null) {
return apply(f, source);
}
return source;
}
static <T, R> R apply(@NonNull Function<T, R> f, @NonNull T t) {
try {
return f.apply(t);
} catch (Throwable ex) {
throw ExceptionHelper.wrapOrThrow(ex);
}
}
可以看到RxJavaPlugins.onAssembly方法是對Observable物件經行一步操作,就是使用Function物件變數onObservableAssembly給Observable物件經行操作。這裡onObservableAssembly物件變數為null,所以實際上onAssembly方法返回的時原來的Observable物件。如果有需要對Observable物件進行什麼操作的話,可以給onObservableAssembly物件變數賦值。
接著呼叫了Observable的subscribe方法,是的Observer和Observable發生訂閱關係。
public final void subscribe(Observer<? super T> observer) {
ObjectHelper.requireNonNull(observer, "observer is null");
try {
observer = RxJavaPlugins.onSubscribe(this, observer);
ObjectHelper.requireNonNull(observer, "Plugin returned null Observer");
subscribeActual(observer);
} catch (NullPointerException e) { // NOPMD
throw e;
} catch (Throwable e) {
//...
}
}
可以看到在subscribe方法中,先對傳進來的引數判空,然後RxJavaPlugins.onSubscribe也是根據onObservableSubscribe是否為空對observer經行操作。原始碼如下:
public static <T> Observer<? super T> onSubscribe(@NonNull Observable<T> source, @NonNull Observer<? super T> observer) {
BiFunction<? super Observable, ? super Observer, ? extends Observer> f = onObservableSubscribe;
if (f != null) {
return apply(f, source, observer);
}
return observer;
}
static <T, U, R> R apply(@NonNull BiFunction<T, U, R> f, @NonNull T t, @NonNull U u) {
try {
return f.apply(t, u);
} catch (Throwable ex) {
throw ExceptionHelper.wrapOrThrow(ex);
}
}
接著呼叫subscribeActual方法,這裡可以要知道,因為create方法返回的是ObservableCreate物件,所以呼叫的是ObservableCreate的subscribeActual方法。
protected void subscribeActual(Observer<? super T> observer) {
CreateEmitter<T> parent = new CreateEmitter<T>(observer);
observer.onSubscribe(parent);
try {
source.subscribe(parent);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
parent.onError(ex);
}
}
static final class CreateEmitter<T>
extends AtomicReference<Disposable>
implements ObservableEmitter<T>, Disposable {
//...
CreateEmitter(Observer<? super T> observer) {
this.observer = observer;
}
//...
}
subscribeActual方法中,先建立了CreateEmitter物件,它是用來發射事件的。然後呼叫了Observer的onSubscribe方法。
public interface Observer<T> {
void onSubscribe(@NonNull Disposable d);
void onNext(@NonNull T t);
void onError(@NonNull Throwable e);
void onComplete();
}
可以看到Observer是一個介面,它是在Observable呼叫subscribe函式的時候建立的,並實現了Observer的四個方法,而ObservableCreate的subscribeActual方法中observer.onSubscribe()這一句呼叫的就是此時實現的onSubscribe方法。
緊接著呼叫了source.subscribe(parent);這裡的source是在crete方法中建立ObservableCreate物件的時候傳進去的。
public final class ObservableCreate<T> extends Observable<T> {
final ObservableOnSubscribe<T> source;
public ObservableCreate(ObservableOnSubscribe<T> source) {
this.source = source;
}
}
所以source.subscribe呼叫的是ObservableOnSubscribe的subscribe方法,而這個方法就是在create方法的引數實現的回撥函式。
總結
上面程式碼調整幅度太大,這裡簡單總結方法呼叫過程(有部分同名方法,所以帶上了類名):
Observable.create實現ObservableOnSubscribe.subscribe
Observable.subscribe(攜帶Observer)呼叫ObservableCreate.subscribeActual(攜帶Observer)
ObservableCreate.subscribeActual(攜帶Observer)呼叫ObservableOnSubscribe.subscribe
ObservableOnSubscribe.subscribe又呼叫CreateEmitter物件(攜帶Observer)的方法(onNext、onComplete、onError)發射事件
CreateEmitter的方法中最終呼叫到了Observer的方法(onNext、onComplete、onError)
到這,RxJava分析的整個流程就結束了,就是資料從Observable中流出,通過subscribe方法關聯Observer,然後Observer接收資料,並對資料經行處理。