RxJava 2.x 之圖解建立、訂閱、發射流程
阿新 • • 發佈:2018-11-01
從一個例子開始
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
for (int i = 0; i < 3; i++) {
emitter. onNext(i);
}
emitter.onComplete();
Log.d(TAG, "subscribe " + Thread.currentThread().getName());
}
}).subscribeOn(Schedulers.newThread())
.map(new Function<Integer, String>() {
@Override
public String apply(Integer value) throws Exception {
Log.d(TAG, "apply " + Thread.currentThread().getName());
return "apply " + value;
}
})
.observeOn(AndroidSchedulers.mainThread())
.subscribeWith (new ResourceObserver<String>() {
@Override
public void onNext(String value) {
Log.d(TAG, "onNext " + value);
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "onError");
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete " + Thread.currentThread().getName());
}
});
來看看輸出:
10-26 16:55:17.418 32696-561/com.onzhou.study D/MainActivity: apply RxNewThreadScheduler-1
10-26 16:55:17.418 32696-561/com.onzhou.study D/MainActivity: apply RxNewThreadScheduler-1
10-26 16:55:17.418 32696-561/com.onzhou.study D/MainActivity: create RxNewThreadScheduler-1
10-26 16:55:17.427 32696-32696/com.onzhou.study D/MainActivity: onNext apply 0
10-26 16:55:17.427 32696-32696/com.onzhou.study D/MainActivity: onNext apply 1
10-26 16:55:17.427 32696-32696/com.onzhou.study D/MainActivity: onNext apply 2
10-26 16:55:17.427 32696-32696/com.onzhou.study D/MainActivity: onComplete main
可以看到建立
,傳送
,轉換
過程都在子執行緒中
,而最後的回撥是在主執行緒中
整個過程筆者整理成一張圖,一步一步來跟進分析
建立過程
- 第一步:通過
create操作符
建立了一個ObservableCreate
型別的Observable
,由於是基於匿名內部類
建立的,因此持有的是實現了ObservableOnSubscribe
介面的HomeActivity例項
- 第二步:通過
subscribeOn操作符
建立了一個ObservableSubscribeOn
型別的Observable
,且其內部的source
持有上個步驟的ObservableCreate例項
- 第三步:通過
map操作符
建立了一個ObservableMap
型別的Observable
,且其內部持有上個步驟傳入的ObservableSubscribeOn例項
- 第四步:通過
observeOn操作符
建立了一個ObservableObserveOn
型別的Observable
,且其內部持有上個步驟的ObservableMap例項
- 第五步:通過
subscribeWith方法
完成訂閱,由於是基於匿名內部類建立的,因此傳入的實際上是實現了ResourceObserver
的HomeActivity例項
訂閱過程
上述的幾個步驟其實已經完成的基本的建立過程了,最後我們拿到的實際是ObservableObserveOn
的例項,下面開始訂閱
流程。
- 第一步:
subscribeWith方法
,傳入的observer
是實現了ResourceObserver介面
的HomeActivity例項
,通過subscribeActual
發起訂閱,內部實際呼叫的是source.subscribe
方法,由於source
持有的是上面傳入的ObservableMap例項
,因此這一步驟實際呼叫的是,ObservableMap例項
中的subscribe
方法,傳入的引數就是ObserveOnObserver例項(構造引數主要是實現了ResourceObserver的例項即:HomeActivity)
。
- 第二步:進入
ObservableMap例項
的subscribe
方法中,通過subscribeActual
發起訂閱,實際呼叫的是source.subscribe
方法,傳入的是MapObserver例項(構造引數為之前傳遞的ObserveOnObserver例項)
,由於source
持有的是ObservableSubscribeOn的例項
,因此最終呼叫的其實是ObservableSubscribeOn例項
中的subscribe
方法
- 第三步:進入
ObservableSubscribeOn例項
的subscribe
方法中,通過subscribeActual
發起訂閱,完成MapObserver例項對SubscribeOnObserver的訂閱
,接著由NewThreadScheduler執行緒排程器
完成對應的任務(該任務的執行是線上程中執行的),SubscribeTask實現了Runnable介面
,最終會回撥run
方法,執行source.subscribe
方法,這裡的source
持有的就是最開始的ObservableCreate例項
@Override
public void subscribeActual(final Observer<? super T> s) {
final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);
//這裡的s就是上個步驟的MapObserver例項
s.onSubscribe(parent);
//這裡的scheduler就是我們最開始指定的Schedulers.newThread 即NewThreadScheduler執行緒排程器
parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
}
- 第四步:進入
ObservableCreate例項
的subscribe
方法中,通過subscribeActual
發起訂閱,這裡的source
持有的是HomeActivity例項
,直接呼叫subscribe
方法,傳入引數是構建的最頂層的發射器CreateEmitter例項
- 第五步:上述的幾個過程實際已經完成了訂閱的過程,最後經過層層傳遞,持有的最頂層的是
CreateEmitter例項
,即我們最終的被觀察者
發射過程
上述的過程已經完成了訂閱過程,在最後訂閱完成之後,最終會通過source.subscribe
方法,其實就是呼叫HomeActivity例項的subscribe方法
,完成元素髮射
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
for (int i = 0; i < 3; i++) {
emitter.onNext(i);
}
emitter.onComplete();
Log.d(TAG, "subscribe " + Thread.currentThread().getName());
}
我們在最頂層的被觀察者
裡通過ObservableEmitter例項
的onNext
方法完成元素的發射
,最終又會通過一層一層的Observer
轉發到最原始的實現了ResourceObserver介面
的觀察者中來
注意:
- 這裡的
被觀察者
裡的所有發射過程實際上都是在NewThreadScheduler執行緒排程器
分配的執行緒裡完成的 - 當
發射的元素
被傳遞到下層的ObservableObserveOn類
中的ObserveOnObserver例項
的onNext方法
,實際執行的是HandlerScheduler.HandlerWorker
的schedule
方法,最終就是通過我們持有的主執行緒的handler
來切換到主執行緒中
小結
整個建立過程
,訂閱過程
,發射過程
看起來山路十八彎
,但是如果你一步一步跟進檢視,會發現整個流程實際上是很清晰的,整個過程起點
和終點
很明確,
而中間產生的一系列Observable
和Observer
你都可以看作是代理類
,用來轉發訂閱
以及最終的元素髮射