全面剖析Rxjava2.0的前世今生
引言
隨著專案的不斷壯大,我們的業務越來越複雜,引入RxJava框架是遲早的事,這段時間正好花了幾天時間來認真學習了Rxjava一把,好記性不如爛筆頭,還是乖乖記錄下來學習的所得所感,我覺得不管學習什麼,得先弄懂這個東東是幹什麼的,有什麼好處,為啥要用它,它都有寫啥子?其實rxjava就是為了方便切換執行緒處理資料,至少在你用的這個地方就是這個樣子,明白在哪些地方是要切換執行緒,然後就是要把這兩個分開來放。
一 、Rxjava2.0的前世
1. Rxjava是什麼?
查閱了好多文件後,我給Rxjava的定義是這樣子的:Rxjava就是在觀察者模式的骨架下,通過豐富的操作符和便捷的非同步操作來完成對於複雜業務的處理。即兩個核心點:
(1)觀察者模式
(2)非同步
那麼我們先從一些簡單的場景來認識Rxjava的觀察者骨架,(藉助拉丁吳大神的一個例子來解釋)
例子:如上圖“按下開關,檯燈燈亮”
在這個事件中,檯燈作為觀察者,開關作為被觀察者,檯燈透過電線來觀察開關的狀態並做出相應的處理。
從上面的例子中,可以引出Rxjava的一些重要概念:
(1)開關(被觀察者)作為事件的生產方(產生“開”和“關”這兩個事件),是主動生產的,是整個開燈事件流程的起點。這對應的就是我們Rxjava框架的被觀察者(Observable)的職能。
(2)檯燈(觀察者)作為事件的處理方(處理“燈亮”和“燈滅”這兩個事件)是被動的,是整個開燈事件流程的終點。這對應的就是我們Rxjava框架的觀察者(Observer)的職能。
(3)在起點跟終點之間,即事件傳遞的過程中是可以被加工,過濾,轉換,合併等等方式處理,這就是Rxjava中常說的操作符的職能。
以上就是所謂的觀察者模式,那個非同步又是啥呢?其實非同步大家都懂,就是那些多執行緒以及執行緒間的切換。
所以Rxjava其實就是基於觀察者模式下組建自己的程式碼邏輯,說白了就是構建被觀察者(Observable)和觀察者(Observer/Subscriber),以及這兩者之間的訂閱關係,實現觀察,在事件傳遞的過程中還可以對事件做各種處理(比如過濾,轉換等等)。
2.Rxjava的事件流程
相信上面這個事件流程圖能夠讓大家更加清晰的知道Rxjava是怎麼產生事件,傳遞事件,處理事件的。總結起來其實就是這幾點:
(1)建立被觀察者,產生事件
(2)設定事件傳遞過程中的過濾,合併,變換等加工操作
(3)訂閱一個觀察者物件,實現事件最終的處理(注意:當呼叫訂閱操作(即呼叫Observable.subscribe()方法)的時候,被觀察者才真正開始發出事件)。
對於Rxjava1.x,比如我們開篇舉的那個開燈的例子,Rxjava的觀察者骨架的是這樣子的(按照三部曲來):
(1)建立被觀察者(也就是開關):
Observable switcherObservable=Observable.create(new Observable.OnSubscribe<String>(){
@Override
public void call(Subscriber<? super String> subscriber) {
subscriber.onNext("On");
subscriber.onNext("Off");
subscriber.onCompleted();
}
});
(2)建立觀察者(也就是檯燈)
Subscriber lightSubscriber=new Subscriber<String>() {
@Override
public void onCompleted() {
//被觀察者的onCompleted()事件會走到這裡;
Log.d("wu","結束觀察...\n");
}
@Override
public void onError(Throwable e) {
//出現錯誤會呼叫這個方法
}
@Override
public void onNext(String s) {
//處理傳過來的onNext事件
Log.d("wu","handle this---"+s)
}
(3)連線兩者的關係,訂閱
switcherObservable.subscribe(lightSubscriber);
從這個demo可以看出:
(1)訂閱這個動作,實際上是觀察者(subscriber)物件把自己傳遞給被觀察者(Observable)內部的onSubscribe
(2)onSubscribe的工作就是呼叫call(subscriber)來通知被觀察者傳送訊息給這個subscriber。
上面的程式碼可以抽象成下面這個流程圖:
如果對於Rxjava1.x這部分想更加深入的瞭解,可以看我上篇寫的: 從原始碼角度來剖析Rxjava的執行原理 這篇會詳細帶大家深入瞭解Rxjava1.x的原理。
這裡我重點放在Rxjava2.0,後面會重點講Rxjava2.0
3.操作符
對於操作符,我不想講太多,因為操作符不是必須的,而是根據你的實際業務處理,能使你業務處理變得更加簡單那就用,不能那就不用,操作符無非就是為了便捷而已。那麼我這裡想將map跟flatmap轉換,這兩個轉換我們可能會經常用到。
(1)map
map操作符其實就是為了達到型別轉換的作用,被觀察者生產發出的事件型別,可能對於我們需要處理的業務不方便,甚至複雜,而map可以把型別轉換成我們需要的型別,方便我們處理事件。比如,我們經常遇到的,圖片顯示。被觀察者產生的事件中只有圖片檔案路徑,但是在觀察者這裡只想要顯示bitmap,那麼就需要型別變換。
Observable.create(new Observable.just(getFilePath())
//使用map操作來完成型別轉換
.map(new Func1<String, Bitmap>() {
@Override
public Bitmap call(String s) {
//顯然自定義的createBitmapFromPath(s)方法,是一個極其耗時的操作
return createBitmapFromPath(s);
}
})
.subscribe(
//建立觀察者,作為事件傳遞的終點處理事件
new Subscriber<Bitmap>() {
@Override
public void onCompleted() {
Log.d("DDDDDD","結束觀察...\n");
}
@Override
public void onError(Throwable e) {
//出現錯誤會呼叫這個方法
}
@Override
public void onNext(Bitmap s) {
//處理事件
showBitmap(s)
}
);
(2)flatMap
其實flatMap也是型別轉換,跟map的區別是:map是一對一的轉換,flatMap是一對多的轉換,什麼叫做一對多的轉換呢?比如大家常拿來說的一個例子:查詢一個學校每個班級的每個學生,即輸入一個學校,返回學生的list集合。
//建立被觀察者,獲取所有班級
Observable.from(getSchoolClasses())
.flatMap(new Func1<SingleClass, Observable<Student>>() {
@Override
public Observable<Student> call(SingleClass singleClass) {
//將每個班級的所有學生作為一列表包裝成一列Observable<Student>,將學生一個一個傳遞出去
return Observable.from(singleClass.getStudents());
}
})
.subscribe(
//建立觀察者,作為事件傳遞的終點處理事件
new Subscriber<Student>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
//出現錯誤會呼叫這個方法
}
@Override
public void onNext(Student student) {
//接受到每個學生類
Log.d("DDDDDD",student.getName())
}
);
也就是flatMap會輸出一個新的Observable,這個Observable正是我們觀察者Subscriber想要接收的,也就是Subscriber不再收到list,而是收到一系列單個的字串,也就是 Observable.from()的輸出。簡單來說就是flatMap將每個Observable產生的事件裡的資訊,再包裝成新的Observable傳遞出來,flatMap可以破除巢狀難題,因為flatMap可以再次包裝新的Observable,而每個Observable都可以使用from(T[])方法來建立自己,這個方法接受一個列表,然後這個列表中的資料再包裝成一系列事件。
如果對操作符不是很懂的。可以看看這篇文章:給 Android 開發者的 RxJava 詳解
4.背壓
在正式進入Rxjava2.0之前必須講講背壓的知識,那麼什麼是背壓呢?
我看到一個大神總結得挺深入合適的,這裡拿出來給大家分享一下。背壓的定義:
背壓(Backpressure)定義:背壓是指在非同步場景中,被觀察者傳送事件速度遠遠快於觀察者的處理速度的情況下,一種告訴上游的被觀察者降低傳送速度的策略。(即背壓是一種控制流速的策略,前提是非同步,就是被觀察者跟觀察者處在不同的執行緒環境中)。
(1)背壓產生的背景
要弄懂背壓,我們就從一個很常見的工作場景上來說。我們都知道Rxjava是一個觀察者模式的架構,當這個架構中被觀察者(Observable)跟觀察者(Subscriber)處於不同的執行緒環境時,由於各自的工作量不一樣,導致它們產生事件和處理事件的速度不一樣,這就會出現兩種情況:
(a).被觀察者產生事件慢一些,觀察者處理事件很快,那麼觀察者就會等著被觀察者傳送事件,這是很正常的,沒有問題。
(b).被觀察者產生事件的速度很快,而觀察者處理很慢,那就會出問題,如果不作處理的話,事件會堆積起來,最終擠爆記憶體,導致崩潰。
像第二種情況,因為被觀察者傳送事件的速度太快,而觀察者處理太慢,而且沒有做相應措施,就會報missingBackpressureException異常,而背壓就是跟這種異常有關。
(2)背壓策略的實現方式(即響應式拉取:reactive pull)
在開篇的時候我已經介紹了,在Rxjava的觀察者模型中,被觀察者是主動的推動資料給觀察者的,而觀察者是被動接收的,而響應式則反過來,觀察者主動從被觀察者那裡去拉取資料,而被觀察者則變成被動的等待通知再發送資料,即觀察者可以根據自身實際情況按需拉取資料,而不是被動接收(這就可以達到告訴被觀察者把速度慢下來),從而實現對被觀察者傳送事件速度的控制,從而實現背壓。總結起來就是:背壓是一種策略,具體措施是下游觀察者通知上游的被觀察者傳送事件,背壓策略很好的解決了非同步環境下被觀察者和觀察者速度不一致的問題。
對於背壓,可以看看這篇文章:https://zhuanlan.zhihu.com/p/24473022?refer=dreawer
二、Rxjava2.0的今生
1.Rxjava2.0的介紹
Rxjava2.0是Rxjava1.x的一個版本升級,依然是以觀察者模式為骨架,不過此次更新中,分為了兩種觀察者模式。
(1)Observable(被觀察者)/Observer(觀察者)
(2)Flowable(被觀察者)/Subscriber(觀察者)
上面我已經介紹了背壓的相關知識,在Rxjava1.x中,關於背壓都是集中在Observable這個類中,導致有的Observable支援背壓,有的不支援,Rxjava2.0為了解決這種缺憾,把支援背壓跟不支援背壓的Observable區分開來。
也就是在Rxjava2.0中,Observable用於訂閱Observer,且不支援背壓,而Flowable用於訂閱Subscriber,是支援背壓的,那啥叫不支援背壓呢?就是當觀察者快速傳送大量資料時,下游不會做其他的處理,即使資料大量堆積,呼叫鏈也不會報MissingBackpressureException,消耗記憶體,過大隻會oom。
那對於這兩種觀察者模式,我們到底需要哪種模式呢,其實這就跟你要處理的業務有關了,得看看你要處理的資料量是不是很大,官方給出的參考值好像是,以1000個事件為分界線。我目前為止沒用過支援背壓的觀察者模式,一般用Observable(被觀察者)/Observer(觀察者)這種觀察者模式就夠了。
2、Rxjava2.0跟Rxjava1.x的區別
目前我這裡只是列出比較明顯的區別,後面會持續更新
(1)RxJava2.0最大的改動就是對於backpressure的處理,為此將原來的Observable拆分成了新的Observable和Flowable,同時其他相關部分也同時進行了拆分。
(2)Transformer排程器的區別
RxJava 1.x 中Transformer實際上就是Func1
public interface Transformer<T, R> extends Func1<Observable<T>, Observable<R>> {
// cover for generics insanity
}
public interface Func1<T, R> extends Function {
R call(T t);
}
例如:
//子執行緒執行,主執行緒回撥
public Observable.Transformer<T, T> io_main(final RxAppCompatActivity context) {
return new Observable.Transformer<T, T>() {
@Override
public Observable<T> call(Observable<T> tObservable) {
Observable<T> observable = (Observable<T>) tObservable
.subscribeOn(Schedulers.io())
.doOnSubscribe(new Action0() {
@Override
public void call() {
DialogHelper.showProgressDlg(context, mMessage);
}
})
.subscribeOn(AndroidSchedulers.mainThread())
.observeOn(AndroidSchedulers.mainThread())
.compose(RxLifecycle.bindUntilEvent(context.lifecycle(), ActivityEvent.STOP));
return observable;
}
};
}
在RxJava2.0中,Transformer劃分的更加細緻了,每一種“Observable”都對應的有自己的Transformer,相關API如下所示:
public interface ObservableTransformer<Upstream, Downstream> {
ObservableSource<Downstream> apply(Observable<Upstream> upstream);
}
public interface CompletableTransformer {
CompletableSource apply(Completable upstream);
}
public interface FlowableTransformer<Upstream, Downstream> {
Publisher<Downstream> apply(Flowable<Upstream> upstream);
}
public interface MaybeTransformer<Upstream, Downstream> {
MaybeSource<Downstream> apply(Maybe<Upstream> upstream);
}
public interface SingleTransformer<Upstream, Downstream> {
SingleSource<Downstream> apply(Single<Upstream> upstream);
}
這裡以ObservableTransformer舉例子:
/**
* Created by wuchunmei on 7/26/17.
*/
/***
* 用於RxJava的LoadDataAsyncTaskDialog進度提示彈窗
* 這裡封裝了doOnSubscribe時show彈窗,doFinally時dismiss彈窗
*/
public class LoadingTransformer implements ObservableTransformer,DialogInterface.OnCancelListener {
private final WeakReference<Context> mContextRef;
private String mInfoText;
private LoadDataAsyncTaskDialog mTaskDialog;
private Disposable mDisposable;
public LoadingTransformer(Context context, String text) {
mContextRef = new WeakReference<>(context);
mInfoText = text;
}
@Override
public ObservableSource apply(Observable upstream) {
return upstream.subscribeOn(Schedulers.io()).doOnSubscribe(new Consumer<Disposable>() {
@Override
public void accept(@NonNull Disposable disposable) throws Exception {
mDisposable = disposable;
if(mContextRef.get() == null){
disposable.dispose();
return;
}
if((mContextRef.get() instanceof Activity) && !TextUtils.isEmpty(mInfoText)){
showTaskDialog(mContextRef.get(),mInfoText);
}
}
}).subscribeOn(AndroidSchedulers.mainThread()).observeOn(AndroidSchedulers.mainThread()).doFinally(new Action() {
@Override
public void run() throws Exception {
dismissTaskDialog();
}
});
}
private void showTaskDialog(Context context, String infoText) {
mTaskDialog = onCreateTaskDialog(context, infoText);
if (mTaskDialog != null) {
try {
mTaskDialog.show();
} catch (Throwable tr) {
tr.printStackTrace();
}
}
}
private LoadDataAsyncTaskDialog onCreateTaskDialog(Context context, String infoText) {
if (context instanceof Activity) {
return new LoadDataAsyncTaskDialog(context,infoText);
}
return null;
}
private void dismissTaskDialog() {
if (mTaskDialog != null) {
try {
mTaskDialog.dismiss();
} catch (Exception ex) {
ex.printStackTrace();
}
}
}
@Override
public void onCancel(DialogInterface dialog) {
try {
if (cancel(true)) {
dialog.dismiss();
}
} catch (Exception ex) {
ex.printStackTrace();
}
}
/**
* 建立彈窗
* @param context
* @param loadingTip
* @param <T>
* @return
*/
public static <T> ObservableTransformer<T, T> applyLoading(Context context, String loadingTip) {
ObservableTransformer loadingTransformer = new LoadingTransformer(context, loadingTip);
return (ObservableTransformer<T, T>)loadingTransformer;
}
/**
* 是否被取消
* @param isCancel
* @return
*/
public boolean cancel(boolean isCancel) {
if (isCancel) {
dismissTaskDialog();
//解除訂閱關係
if (mDisposable != null) mDisposable.dispose();
return true;
}
return false;
}
}
(3)Nulls
RxJava1.x中,支援 null 值,如下程式碼所示:
Observable.just(null);
Single.just(null);
RxJava 2.0不再支援 null 值,如果傳入一個null會丟擲 NullPointerException
(4)Subscriber介面
public interface Subscriber<T> {
public void onSubscribe(Subscription s);
public void onNext(T t);
public void onError(Throwable t);
public void onComplete();
}
我們會發現和以前不一樣的是多了一個onSubscribe的方法,Subscription如下:
public interface Subscription {
public void request(long n);
public void cancel();
}
新的Subscription更像是綜合了舊的Producer與Subscription的綜合體。他既可以向上遊請求資料,又可以打斷並釋放資源。而舊的Subscription在這裡因為名字被佔,而被重新命名成了Disposable
public interface Disposable {
void dispose();
boolean isDisposed();
}
3.Rxjava2.0的使用
(1)引入Rxjava2.0
compile 'io.reactivex.rxjava2:rxjava:2.1.1'
compile 'io.reactivex.rxjava2:rxandroid:2.0.1'
(2)使用Rxjava2.0處理業務邏輯,其實就是根據自己的業務,按照三步曲來執行,建立被觀察者,觀察者,訂閱,中間穿插操作符
eg:我的一個搜尋功能:
/**
* 搜尋版塊
*/
private void searchForumModules() {
Observable.create(new ObservableOnSubscribe<List<Entity>>() {
@Override
public void subscribe(ObservableEmitter<List<Entity>> e) throws Exception {
ClientRecvObject clientRecvObject = SearchConnector.searchForum(ForumSearchActivity.this, searchKey);
List<Entity> remoteCollectedList = null;
if (clientRecvObject != null) {
if (clientRecvObject.isSuccess()) {
remoteCollectedList = (List<Entity>) clientRecvObject.getClientData();
e.onNext(remoteCollectedList);
e.onComplete();
} else {
ClientRecvErrorException errorException = new ClientRecvErrorException();
errorException.setClientRecvObject(clientRecvObject);
e.onError(errorException);
}
}
}
}).compose(LoadingTransformer.<List<Entity>>applyLoading(ForumSearchActivity.this,getString(R.string.searching)))
.subscribeWith(new DisposableObserver<List<Entity>>() {
@Override
public void onNext(List<Entity> list) {
if (!ListUtils.checkEntityListEqual(mSearchResult, list)) {
mSearchResult.clear();
mSearchResult.addAll(list);
mAdapter.notifyDataSetChanged();
}
if (list.size() <= 0) {
emptyView.setVisibility(View.VISIBLE);
mEmptyViewHelper.setErrorEmptyView();
mEmptyViewHelper.setTipText(R.string.empty_search_line1);
mEmptyViewHelper.setSecondTipText(R.string.empty_search_line2);
} else {
emptyView.setVisibility(View.GONE);
}
}
@Override
public void onError(Throwable e) {
if (e instanceof ClientRecvErrorException) {
mEmptyViewHelper.setTipText(R.string.search_error);
mEmptyViewHelper.setErrorEmptyView();
ClientRecvObject client = ((ClientRecvErrorException) e).getClientRecvObject();
if ("該使用者不存在!".equals(client.getMessage())
|| "搜尋結果為空".equals(client.getMessage()) || "沒有相關使用者!".equals(client.getMessage())) {
if ("該使用者不存在!".equals(client.getMessage()) || "沒有相關使用者!".equals(client.getMessage())) {
mEmptyViewHelper.setTipText(R.string.empty_search_user_line1);
mEmptyViewHelper.setSecondTipText(R.string.empty_search_user_line2);
} else {
mEmptyViewHelper.setTipText(R.string.empty_search_line1);
mEmptyViewHelper.setSecondTipText(R.string.empty_search_line2);
}
}
}
}
@Override
public void onComplete() {
}
});
}
從程式碼上大家可以看到這行程式碼:
.compose(LoadingTransformer.<List<Entity>>applyLoading(ForumSearchActivity.this,getString(R.string.searching)))
compose其實就是一個執行緒切換的操作符,一般跟Transformer在一起使用,而LoadingTransformer的applyLoading方法,其實就是我把.doOnSubscribe跟.doFinally的邏輯處理封裝在一個轉換器LoadingTransformer裡面了,方便呼叫。
public class LoadingTransformer implements ObservableTransformer {
private final WeakReference<Context> mContextRef;
private String mInfoText;
private LoadDataAsyncTaskDialog mTaskDialog;
private Disposable mDisposable;
public LoadingTransformer(Context context, String text) {
mContextRef = new WeakReference<>(context);
mInfoText = text;
}
@Override
public ObservableSource apply(Observable upstream) {
return upstream.subscribeOn(Schedulers.io()).doOnSubscribe(new Consumer<Disposable>() {
@Override
public void accept(@NonNull Disposable disposable) throws Exception {
mDisposable = disposable;
if(mContextRef.get() == null){
disposable.dispose();
return;
}
if((mContextRef.get() instanceof Activity) && !TextUtils.isEmpty(mInfoText)){
showTaskDialog(mContextRef.get(),mInfoText);
}
}
}).subscribeOn(AndroidSchedulers.mainThread()).observeOn(AndroidSchedulers.mainThread()).doFinally(new Action() {
@Override
public void run() throws Exception {
dismissTaskDialog();
}
});
}
也就是實現ObservableTransformer介面。
/**
* 建立彈窗
* @param context
* @param loadingTip
* @param <T>
* @return
*/
public static <T> ObservableTransformer<T, T> applyLoading(Context context, String loadingTip) {
ObservableTransformer loadingTransformer = new LoadingTransformer(context, loadingTip);
return (ObservableTransformer<T, T>)loadingTransformer;
}
這個方法就是返回一個泛型的ObservableTransformer物件。
4、Rxjava2.0從原始碼上分析建立以及訂閱關係
(1)首先會create
/**
* Provides an API (via a cold Observable) that bridges the reactive world with the callback-style world.
* @param <T> the element type
* @param source the emitter that is called when an Observer subscribes to the returned {@code Observable}
* @return the new Observable instance
* @see ObservableOnSubscribe
* @see ObservableEmitter
* @see Cancellable
*/
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
ObjectHelper.requireNonNull(source, "source is null");
return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source)); //這裡會把ObservableCreate返回
}
看看引數ObservableOnSubscribe 是啥?
public final class ObservableCreate<T> extends Observable<T> {
final ObservableOnSubscribe<T> source;
public ObservableCreate(ObservableOnSubscribe<T> source) {
this.source = source;
}
@Override
protected void subscribeActual(Observer<? super T> observer) {
CreateEmitter<T> parent = new CreateEmitter<T>(observer); //構建發射器,
observer.onSubscribe(parent);//回撥onSubscribe的邏輯
try {
source.subscribe(parent); //回撥subscribe的邏輯
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
parent.onError(ex);
}
}
其實也就是個Observable,然後實現其subscribeActual(Observer
static final class CreateEmitter<T>
extends AtomicReference<Disposable>
implements ObservableEmitter<T>, Disposable {
private static final long serialVersionUID = -3434801548987643227L;
final Observer<? super T> observer;
CreateEmitter(Observer<? super T> observer) {
this.observer = observer;
}
@Override
public void onNext(T t) {
if (t == null) {
onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
return;
}
if (!isDisposed()) {
observer.onNext(t);
}
}
@Override
public void onError(Throwable t) {
if (!tryOnError(t)) {
RxJavaPlugins.onError(t);
}
}
@Override
public boolean tryOnError(Throwable t) {
if (t == null) {
t = new NullPointerException("onError called with null. Null values are generally not allowed in 2.x operators and sources.");
}
if (!isDisposed()) {
try {
observer.onError(t);
} finally {
dispose();
}
return true;
}
return false;
}
@Override
public void onComplete() {
if (!isDisposed()) {
try {
observer.onComplete();
} finally {
dispose();
}
}
}
@Override
public void setDisposable(Disposable d) {
DisposableHelper.set(this, d);
}
@Override
public void setCancellable(Cancellable c) {
setDisposable(new CancellableDisposable(c));
}
@Override
public ObservableEmitter<T> serialize() {
return new SerializedEmitter<T>(this);
}
@Override
public void dispose() {
DisposableHelper.dispose(this);
}
@Override
public boolean isDisposed() {
return DisposableHelper.isDisposed(get());
}
}
然後實現訂閱:
source.subscribe(parent);
(2)subscribeWith(E observer)
/**
* Subscribes a given Observer (subclass) to this Observable and returns the given
* Observer as is.
* @param <E> the type of the Observer to use and return
* @param observer the Observer (subclass) to use and return, not null
* @return the input {@code observer}
* @throws NullPointerException if {@code observer} is null
* @since 2.0
*/
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public final <E extends Observer<? super T>> E subscribeWith(E observer) {
subscribe(observer);
return observer;
}
subscribeWith方法 返回當前的 observer 物件,進去看看 subscribe(observer)方法:也就是跳到Observable.java的
@Override
public final void subscribe(Observer<? super T> observer) {
ObjectHelper.requireNonNull(observer, "observer is null");
try {
observer = RxJavaPlugins.onSubscribe(this, observer);
ObjectHelper.requireNonNull(observer, "Plugin returned null Observer");
subscribeActual(observer);
} catch (NullPointerException e) { // NOPMD
throw e;
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
// can't call onError because no way to know if a Disposable has been set or not
// can't call onSubscribe because the call might have set a Subscription already
RxJavaPlugins.onError(e);
NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
npe.initCause(e);
throw npe;
}
}
從上面的程式碼可以看出:
observer = RxJavaPlugins.onSubscribe(this, observer);
獲取一個observer物件,然後走到這個方法: subscribeActual(observer);這個方法是不是很熟悉,這個方法進去就是ObservableDoFinally.java的 subscribeActual(Observer
@Override
protected void subscribeActual(Observer<? super T> s) {
source.subscribe(new DoFinallyObserver<T>(s, onFinally));
}
再走一遍subscribe()方法,實際上就是再走一邊subscribeActual(observer)實現真正的訂閱:
最後最終會跑到ObservableObserveOn.java的這個方法:
@Override
protected void subscribeActual(Observer<? super T> observer) {
if (scheduler instanceof TrampolineScheduler) {
source.subscribe(observer);
} else {
Scheduler.Worker w = scheduler.createWorker();
source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize)); //這裡跑完後就會進入最開始的回撥是實現了。
}
}