RxJava2原始碼分析(一):基本流程分析
前言:到現在這個階段,網上關於RxJava2原始碼分析的文章已經滿天飛了,我寫這篇文章的目的並不是說我會分析的比他們好,比他們透徹,這篇文章的目的只是單純的記錄自己分析RxJava2原始碼的成功及收穫。
概述
對於一個程式設計人的技術成長,一般會經歷三個階段,首先是學會使用開源庫,然後是知道開源庫的原理,最後就是自己寫一個開源庫。雖然在日常的開發中使用RxJava2已經達到了得心應手的地步,但是不瞭解具體的原理,總感覺有點虛。於是就想靜下心來,好好的分析一下RxJava原始碼,達到不僅知其然更知其所以然的地步。
下圖是分析RxJava基本流程後,畫的UML圖,對於已經分析過原始碼的大神,可以看下圖畫的是否正確,對於沒有分析過原始碼的人,可以看下,先有個映像,然後再跟著文章的內容,一點點的理解。(點選圖片檢視大圖)
原始碼分析
先看RxJava2基礎用法的程式碼
private void basicUseRxJava() {
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
emitter.onNext(1);
emitter.onNext(2 );
emitter.onNext(3);
}
}).subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(String s) {
Log.e("wizardev", "onNext: " +s);
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
}
複製程式碼
以上程式碼,只是RxJava2的基本使用,並沒有涉及任何的操作符程式碼,下面我們就按方法順序開始分析原始碼。
create方法分析
看下create()
方法的程式碼
public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
//1、判空
ObjectHelper.requireNonNull(source, "source is null");
//2、
return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
}
複製程式碼
從以上程式碼可以看出,create
方法的返回值型別是Observable
,引數是ObservableOnSubscribe<T>
,可以先看下這個ObservableOnSubscribe
類,原始碼如下
public interface ObservableOnSubscribe<T> {
/**
* Called for each Observer that subscribes.
* @param emitter the safe emitter instance, never null
* @throws Exception on error
*/
void subscribe(@NonNull ObservableEmitter<T> emitter) throws Exception;
}
複製程式碼
可以發現ObservableOnSubscribe
類是一個介面,裡面有一個subscribe
方法。現在繼續看create
方法中的程式碼,在“1”處程式碼是判斷傳入的引數是否為空。這裡主要看下“2”處,這句RxJavaPlugins.onAssembly
其實是一個Hook方法,**“2”處程式碼實質就是return new ObservableCreate<T>(source);
,**不信的話,可以看下onAssembly
方法,如下
public static <T> Observable<T> onAssembly(@NonNull Observable<T> source) {
Function<? super Observable, ? extends Observable> f = onObservableAssembly;
if (f != null) {
return apply(f, source);
}
return source;
}
複製程式碼
經除錯,onObservableAssembly
為null,所以上面的程式碼就直接返回了new ObservableCreate<T>(source)
。
現在看下ObservableCreate
類,如下
public final class ObservableCreate<T> extends Observable<T> {
//1、全域性變數
final ObservableOnSubscribe<T> source;
//2、構造方法中將source賦值
public ObservableCreate(ObservableOnSubscribe<T> source) {
this.source = source;
}
//3、這個方法是在呼叫subscribe方法才呼叫的
@Override
protected void subscribeActual(Observer<? super T> observer) {
CreateEmitter<T> parent = new CreateEmitter<T>(observer);
observer.onSubscribe(parent);
try {
source.subscribe(parent);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
parent.onError(ex);
}
}
//...省略部分程式碼
}
複製程式碼
從上面的程式碼可以知道,ObservableCreate
類繼承自Observable
,在例項化的時候將create
方法中的ObservableOnSubscribe<T> source
引數注入了進來,作為成員變數source
。
結論
通過分析Observable
類的create
方法,可以有以下結論:
create
方法的返回值型別是Observable
;create
方法的引數的型別是介面;create
方法實際返回的是ObservableCreate
類,而ObservableCreate
類是Observable
的子類;- 在例項化
ObservableCreate
類的時候將create
的方法的引數注入到了ObservableCreate
類中,作為它的成員變數source
。
這裡重點看下第4個結論,在這裡create
方法的引數實際就是下面的程式碼
new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
emitter.onNext(1);
emitter.onNext(2);
emitter.onNext(3);
}
}
複製程式碼
subscribe方法分析
分析完了create
方法,接著來分析subscribe
方法,其方法程式碼如下
public final void subscribe(Observer<? super T> observer) {
//1、判空
ObjectHelper.requireNonNull(observer, "observer is null");
try {
//2、Hook方法,實質就是observer
observer = RxJavaPlugins.onSubscribe(this, observer);
//判空
ObjectHelper.requireNonNull(observer, "The RxJavaPlugins.onSubscribe hook returned a null Observer. Please change the handler provided to RxJavaPlugins.setOnObservableSubscribe for invalid null returns. Further reading: https://github.com/ReactiveX/RxJava/wiki/Plugins");
//4、重點,
subscribeActual(observer);
} catch (NullPointerException e) { // NOPMD
throw e;
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
// can't call onError because no way to know if a Disposable has been set or not
// can't call onSubscribe because the call might have set a Subscription already
RxJavaPlugins.onError(e);
NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
npe.initCause(e);
throw npe;
}
}
複製程式碼
這裡重點看下“4”處, 這裡呼叫了Obeservable
的subscribeActual
方法,可以看下Obeservable
類中的這個方法,如下
protected abstract void subscribeActual(Observer<? super T> observer);
複製程式碼
這個方法是抽象的,實際呼叫的是它子類中的方法,通過上文的分析,我們知道ObservableCreate
就Obeservable
類的子類,所以,這裡呼叫的實際就是ObservableCreate
類中的subscribeActual
方法。現在,我們再看下這個方法中的程式碼,如下
@Override
protected void subscribeActual(Observer<? super T> observer) {
//1、例項化CreateEmitter
CreateEmitter<T> parent = new CreateEmitter<T>(observer);
//2、回撥方法
observer.onSubscribe(parent);
try {
//3、回撥方法
source.subscribe(parent);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
parent.onError(ex);
}
}
複製程式碼
我們一步步的分析這個方法中的程式碼,先看“1”處的程式碼,這裡例項化了CreateEmitter
這個類,在例項化的同時將observer
傳了進去。看下CreateEmitter
這個類的程式碼,如下
static final class CreateEmitter<T>
extends AtomicReference<Disposable>
implements ObservableEmitter<T>, Disposable {
//...省略部分程式碼
final Observer<? super T> observer;
CreateEmitter(Observer<? super T> observer) {
this.observer = observer;
}
@Override
public void onNext(T t) {
if (t == null) {
onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
return;
}
if (!isDisposed()) {
observer.onNext(t);
}
}
@Override
public void onError(Throwable t) {
if (!tryOnError(t)) {
RxJavaPlugins.onError(t);
}
}
@Override
public boolean tryOnError(Throwable t) {
if (t == null) {
t = new NullPointerException("onError called with null. Null values are generally not allowed in 2.x operators and sources.");
}
if (!isDisposed()) {
try {
observer.onError(t);
} finally {
dispose();
}
return true;
}
return false;
}
@Override
public void onComplete() {
if (!isDisposed()) {
try {
observer.onComplete();
} finally {
dispose();
}
}
}
//...省略部分程式碼
}
複製程式碼
通過上面的程式碼,可以發現CreateEmitter
這個類實現了ObservableEmitter
這個介面,而這個介面是ObservableOnSubscribe
介面中subscribe
方法的引數,是不是發現什麼了?現在繼續往下看,看下“2”處的程式碼,這裡回調了Observer
的onSubscribe
方法,分析到這裡,可以得出下面的結論
onSubscribe()回撥所在的執行緒是ObservableCreate執行subscribe()所在的執行緒,和subscribeOn()/observeOn()無關!
重點來了,這裡看下“3”處的程式碼,還記得source
是誰嗎?**它就是執行Observable.create
方法時,我們注入給ObservableCreate
類的成員變數,是ObservableOnSubscribe
介面的例項。**這裡呼叫的subscribe
方法,實際就是下面程式碼的subscribe
方法,
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
emitter.onNext(1);
emitter.onNext(2);
emitter.onNext(3);
}
複製程式碼
這段程式碼中的subscribe
方法的引數實質就是CreateEmitter
,呼叫的onNext
方法就是CreateEmitter
類中的onNext
方法。繼續看下CreateEmitter
類中的onNext
方法,程式碼如下
@Override
public void onNext(T t) {
//1、判斷傳入的引數是否為null
if (t == null) {
onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
return;
}
if (!isDisposed()) {
//2、呼叫Observer中的onNext方法
observer.onNext(t);
}
}
複製程式碼
分析到這裡,就可以得出以下結論了
subscribe方法中發射器所呼叫的onNext方法,如果程式碼沒有出錯的話,最終呼叫的就是Observer中的onNext方法。
分析CreateEmitter
中的其他方法,還可以知道為什麼Observer
中的onError
和onComplete
方法只有一個會回撥的原因了,原因就是無論呼叫的是哪一個方法都會呼叫dispose()方法取消訂閱。
結論
對Observable.subscribe
方法的分析可以得出以下結論
subscribe
方法最終呼叫了ObservableCreate
類中的subscribeActual
方法。subscribeActual
方法中,例項化了發射器,並開始發射資料。subscribe
方法中發射器所呼叫的onNext
方法,如果程式碼沒有出錯的話,最終呼叫的就是Observer
介面中的onNext
方法。
總結
通過對RxJava基本流程的原始碼分析,是不是對RxJava的原理有了更清晰的認識呢?分析完之後,我們再看下這張圖,是不是感覺現在看起來就明白多了呢?
結束語
想要了解一些開源庫的原理,我們必須要閱讀其原始碼,只有從原始碼中才能得到想要的答案,才能對庫的原理有更清晰的認識。
再說下,閱讀開源庫的注意事項,閱讀原始碼時,我們最好帶著問題來閱讀,閱讀前先有個目標,比如我這次閱讀要搞懂什麼問題,然後再開始閱讀,不然就會很容易在茫茫程式碼中迷失。還有就是不要想著每句程式碼都搞懂,搞懂與自己想要獲取的答案有關的程式碼即可。
轉載請註明出處:www.wizardev.cn