筆記之RxJava原理
阿新 • • 發佈:2018-12-18
RxJava也用了一段時間了,操作符看了一遍又一遍,熟悉了很多,記錄下RxJava原理和自己的理解
RxJava最基本,先貼程式碼
Observable .create(new ObservableOnSubscribe<String>() { @Override public void subscribe(ObservableEmitter<String> emitter) throws Exception { } }) .subscribe(new Observer<String>() { @Override public void onSubscribe(Disposable d) { } @Override public void onNext(String s) { } @Override public void onError(Throwable e) { } @Override public void onComplete() { } });
上門程式碼是不是很簡單呀,看下操作,首先看追create
public static <T> Observable<T> create(ObservableOnSubscribe<T> source) { // 非null判斷 ObjectHelper.requireNonNull(source, "source is null"); // 建立了ObservableCreate物件,onAssembly沒必要追, 就是判斷了onObservableAssembly是否為null,不為null執行其的apply方法,都返回ObservableCreate物件 return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source)); }
那麼接下來看下ObservableCreate
public final class ObservableCreate<T> extends Observable<T> { final ObservableOnSubscribe<T> source; public ObservableCreate(ObservableOnSubscribe<T> source) { this.source = source; } @Override protected void subscribeActual(Observer<? super T> observer) { CreateEmitter<T> parent = new CreateEmitter<T>(observer); observer.onSubscribe(parent); try { source.subscribe(parent); } catch (Throwable ex) { Exceptions.throwIfFatal(ex); parent.onError(ex); } } ...... }
ObservableCreate集成了Observable,這裡就可以衍生出一個問題,所有的建立觀察者的操作符其實都是集成了Observable的,接下來追subscribe方法
public final void subscribe(Observer<? super T> observer) {
// 非null判斷
ObjectHelper.requireNonNull(observer, "observer is null");
try {
// 還是返回當前的observer
observer = RxJavaPlugins.onSubscribe(this, observer);
ObjectHelper.requireNonNull(observer, "The RxJavaPlugins.onSubscribe hook returned a null Observer. Please change the handler provided to RxJavaPlugins.setOnObservableSubscribe for invalid null returns. Further reading: https://github.com/ReactiveX/RxJava/wiki/Plugins");
// 注意關鍵在這裡,也就是呼叫了實現類的subscribeActual也就是
//ObservableCreate的subscribeActual方法
subscribeActual(observer);
} catch (NullPointerException e) { // NOPMD
throw e;
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
// can't call onError because no way to know if a Disposable has been set or not
// can't call onSubscribe because the call might have set a Subscription already
RxJavaPlugins.onError(e);
NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
npe.initCause(e);
throw npe;
}
}
接下來閱讀ObservableCreate的subscribeActual方法
protected void subscribeActual(Observer<? super T> observer) {
// 建立了一個CreateEmitter物件包裹了下,然後傳遞給了
CreateEmitter<T> parent = new CreateEmitter<T>(observer);
// 呼叫了observer的onSubscribe方法,也就是訂閱開始方法
observer.onSubscribe(parent);
try {
// 大家還記的source是誰嗎?就是ObservableOnSubscribe物件,在建立的時候建立的物件
source.subscribe(parent);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
parent.onError(ex);
}
}
讀到這裡就是最後,拿著包裹著的observer的CreateEmitter物件執行了onext或onError方法,彈射資料,這就是RxJava最基本的實現邏輯