RxJava2.X 原始碼分析 一
本文內容大致如下:
- 初步瞭解RxJava2.X的使用流程 ;
- 探索Observable傳送資料的流程 ;
- 明白Observer是如何接收資料的 ;
- 解析Observable與Observer的勾搭(如何關聯)過程 ;
- 探索RxJava執行緒切換的奧祕 ;
- 瞭解RxJava操作符的實現原理。
探索RxJava2分發訂閱流程
從Demo到原理
//1、觀察者建立一個Observer
Observer observer = new Observer() {
@Override
public void onSubscribe(@NonNull Disposable d) {
Log.d(TAG, "onSubscribe" );
}
@Override
public void onNext(@NonNull String s) {
Log.d(TAG, "onNext data is :" + s);
}
@Override
public void onError(@NonNull Throwable e) {
Log.d(TAG, "onError data is :" + e.toString());
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete");
}
};
Observable observable = Observable.create(new ObservableOnSubscribe() {
@Override
public void subscribe(@NonNull ObservableEmitter e) throws Exception {
e.onNext("hello");
e.onNext("world" );
e.onComplete();
}
});
observable.subscribe(observer);
結果輸出:
onSubscribe
onNext data is :hello
onNext data is :world
onComplete
可以看到,Observer的onSubscribe是最先被呼叫的,這個回撥會有什麼用呢?我們後面會講到。
由於整個流程是從create開始的,我們就從源頭開始分析。create方法返回的是一個observable物件,也就是被觀察的物件。create方法需要傳入一個ObservableOnSubscribe來建立,我們看下ObservableOnSubscribe是什麼:
/**
* A functional interface that has a {@code subscribe()} method that receives
* an instance of an {@link ObservableEmitter} instance that allows pushing
* events in a cancellation-safe manner.
*
* @param <T> the value type pushed
*/
public interface ObservableOnSubscribe<T> {
/**
* Called for each Observer that subscribes.
* @param e the safe emitter instance, never null
* @throws Exception on error
*/
void subscribe(@NonNull ObservableEmitter<T> e) throws Exception;
}
該介面會接收一個ObservableEmitter的一個物件,然後通過該物件我們可以傳送訊息也可以安全地取消訊息,我們繼續看ObservableEmitter這個介面類。
public interface ObservableEmitter<T> extends Emitter<T> {
/**
* Sets a Disposable on this emitter; any previous Disposable
* or Cancellation will be unsubscribed/cancelled.
* @param d the disposable, null is allowed
*/
void setDisposable(@Nullable Disposable d);
/**
* Sets a Cancellable on this emitter; any previous Disposable
* or Cancellation will be unsubscribed/cancelled.
* @param c the cancellable resource, null is allowed
*/
void setCancellable(@Nullable Cancellable c);
/**
* Returns true if the downstream disposed the sequence or the
* emitter was terminated via {@link #onError(Throwable)}, {@link #onComplete} or a
* successful {@link #tryOnError(Throwable)}.
* <p>This method is thread-safe.
* @return true if the downstream disposed the sequence or the emitter was terminated
*/
boolean isDisposed();
/**
* Ensures that calls to onNext, onError and onComplete are properly serialized.
* @return the serialized ObservableEmitter
*/
@NonNull
ObservableEmitter<T> serialize();
/**
* Attempts to emit the specified {@code Throwable} error if the downstream
* hasn't cancelled the sequence or is otherwise terminated, returning false
* if the emission is not allowed to happen due to lifecycle restrictions.
* <p>
* Unlike {@link #onError(Throwable)}, the {@code RxJavaPlugins.onError} is not called
* if the error could not be delivered.
* @param t the throwable error to signal if possible
* @return true if successful, false if the downstream is not able to accept further
* events
* @since 2.1.1 - experimental
*/
@Experimental
boolean tryOnError(@NonNull Throwable t);
}
ObservableEmitter是對Emitter的擴充套件,而擴充套件的方法證實RxJava2.0之後引入的,提供了可中途取消等新能力,我們繼續看Emitter。
/**
* Base interface for emitting signals in a push-fashion in various generator-like source
* operators (create, generate).
*
* @param <T> the value type emitted
*/
public interface Emitter<T> {
/**
* Signal a normal value.
* @param value the value to signal, not null
*/
void onNext(@NonNull T value);
/**
* Signal a Throwable exception.
* @param error the Throwable to signal, not null
*/
void onError(@NonNull Throwable error);
/**
* Signal a completion.
*/
void onComplete();
}
裡面的三個方法使用過rx的應該非常眼熟了。看到這裡,我們只是瞭解了傳遞引數的資料結構,瞭解到的資訊還是比較少的。我們繼續看下create內部做了什麼操作。
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
ObjectHelper.requireNonNull(source, "source is null");
return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
}
RxJavaPlugins或許你會很陌生,其實我也很陌生,不過沒關係,我覺得後面會經常遇到RxJavaPlugins,熟悉它是必然的;
可以看到我們傳入ObservableOnSubscribe被用來建立ObservableCreate,其實ObservableCreate就是Observable的一個實現類哦。
思路梳理
OK,到這裡我們先梳理一下思路:
1、Observable通過呼叫create建立一個Observable
2、呼叫create時需要傳入一個ObservableOnSubscribe型別的例項引數
3、最終傳入的ObservableOnSubscribe型別的例項引數作為ObservableCreate建構函式的引數傳入,一個Observable就此誕生了
ObservableCreate又是個什麼東東呢?我們分步來,先看ObservableCreate的兩個方法。
public final class ObservableCreate<T> extends Observable<T> {
final ObservableOnSubscribe<T> source;
public ObservableCreate(ObservableOnSubscribe<T> source) {
this.source = source;
}
@Override
protected void subscribeActual(Observer<? super T> observer) {
CreateEmitter<T> parent = new CreateEmitter<T>(observer);
observer.onSubscribe(parent);
try {
source.subscribe(parent);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
parent.onError(ex);
}
}
......
}
source:Observable.createc傳入的ObservableOnSubscribe例項;
subscribeActual回撥方法,它在呼叫Observable.subscribe時被呼叫,即與觀察者或則訂閱者發生聯絡時觸發。subscribeActual也是實現我們主要邏輯的地方,我們來仔細分析下subscribeActual方法:
1. 首先subscribeActual傳入的引數為Observer型別,也就是我們subscribe時傳入的觀察者,到底是不是呢?後面會分析到。
2. 傳入的Observer會被包裝成一個CreateEmitter,CreateEmitter繼承了AtomicReference提供了原子級的控制能力。RxJava2.0提供的新特性與之息息相關哦,這個我們先給它來個關鍵標籤,後面再詳細分析。
3. 觀察者(observer)呼叫自己的onSubscribe(parent);將包裝後的observer傳入。這個也是RxJava2.0的變化,真正的訂閱在source.subscribe(parent);這句程式碼被執行後開始,而在此之前先呼叫了onSubscribe方法來提供RxJava2.0後引入的新能力(如中斷能力)。從這裡我們也就知道了為何觀察者的onSubscribe最先被呼叫了。(被訂閱者說:我也很無辜,他自己呼叫了自己,我也控制不了╮(╯_╰)╭)
4. 被訂閱者或者說被觀察者(source)呼叫subscribe訂閱方法與觀察者發生聯絡。這裡進行了異常捕獲,如果subscribe丟擲了未被捕獲的異常,則呼叫 parent.onError(ex);
5. 在執行subscribe時也就對應了我們demo中的
public void subscribe(@NonNull ObservableEmitter e) throws Exception {
e.onNext("hello");
e.onNext("world");
e.onComplete();
}
Ok,看來subscribeActual這個回撥確實很重要,前面我們也說了subscribeActual回撥方法在Observable.subscribe被呼叫時執行的,真的像我說的一樣麼?萬一我看走眼了
@SchedulerSupport(SchedulerSupport.NONE)
@Override
public final void subscribe(Observer<? super T> observer) {
ObjectHelper.requireNonNull(observer, "observer is null");
try {
observer = RxJavaPlugins.onSubscribe(this, observer);
ObjectHelper.requireNonNull(observer, "Plugin returned null Observer");
subscribeActual(observer);
} catch (NullPointerException e) { // NOPMD
throw e;
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
// can't call onError because no way to know if a Disposable has been set or not
// can't call onSubscribe because the call might have set a Subscription already
RxJavaPlugins.onError(e);
NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
npe.initCause(e);
throw npe;
}
}
OK,程式碼不多,可以看到RxJavaPlugins.onSubscribe(this, observer);,我們RxJava2.0中的Hook能力就是來自這裡了。然後繼續看下面subscribeActual(observer);被呼叫了。
思路梳理
- 傳入的ObservableOnSubscribe最終被用來建立成ObservableCreate
- ObservableCreate持有我們的被觀察者物件以及訂閱時所觸發的回撥subscribeActual
- 在subscribeActual實現了我們的主要邏輯,包括observer.onSubscribe(parent);,source.subscribe(parent);,parent.onError(ex);的呼叫
- 在Observable的subscribe被呼叫時開始執行事件分發流程。
探索RxJava2神祕的隨意取消訂閱流程的原理
前面初步分析了RxJava從建立到執行的流程。
接著我們將探索RxJava2.x提供給我們的Disposable能力的來源。
先看一個demo。
//1、觀察者建立一個Observer
Observer observer = new Observer() {
@Override
public void onSubscribe(@NonNull Disposable d) {
Log.d(TAG, "onSubscribe");
disposable = d;
}
@Override
public void onNext(@NonNull String s) {
Log.d(TAG, "onNext data is :" + s);
if (s.equals("hello")) {
//執行了hello之後終止
disposable.dispose();
}
}
@Override
public void onError(@NonNull Throwable e) {
Log.d(TAG, "onError data is :" + e.toString());
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete");
}
};
Observable observable = Observable.create(new ObservableOnSubscribe() {
@Override
public void subscribe(@NonNull ObservableEmitter e) throws Exception {
e.onNext("hello");
Log.i(TAG, "傳送 hello");
e.onNext("world");
Log.i(TAG, "傳送 world");
e.onComplete();
Log.i(TAG, "呼叫 onComplete");
}
});
observable.subscribe(observer);
輸出結果如下:
onSubscribe
onNext data is :hello
傳送 hello
傳送 world
呼叫 onComplete
在傳送玩hello之後,成功終止了後面的Reactive流。從結果我們還發現,後面的Reactive流被終止了,也就是訂閱者或者觀察者收不到後面的資訊了,但是生產者或者說被訂閱者、被觀察者的程式碼還是會繼續執行的。
Ok,我們從哪開始入手呢?我們發現,在我們執行了 disposable.dispose();後,觸發了該事件,我們通過原始碼看下 disposable.dispose();到底做了什麼。
/**
* Represents a disposable resource.
*/
public interface Disposable {
/**
* Dispose the resource, the operation should be idempotent.
*/
void dispose();
/**
* Returns true if this resource has been disposed.
* @return true if this resource has been disposed
*/
boolean isDisposed();
}
此時我們要回憶一下前面的一段程式碼
public ObservableCreate(ObservableOnSubscribe<T> source) {
this.source = source;
}
@Override
protected void subscribeActual(Observer<? super T> observer) {
CreateEmitter<T> parent = new CreateEmitter<T>(observer);
observer.onSubscribe(parent);
try {
source.subscribe(parent);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
parent.onError(ex);
}
}
- 我們之前分析到在執行source.subscribe(parent);觸發資料分發事件之前先執行了observer.onSubscribe(parent);這句程式碼,所傳入的parent也就對應了我們的Disposable
- parent是CreateEmitter型別的,但是CreateEmitter是實現了Disposable介面的一個類。而parent又是我們的observer的一個包裝後的物件。
- OK,分析到這裡我們來整理下前面的環節,根據Demo來解釋下:首先在執行下面程式碼之前
e.onNext("hello");
Log.i(TAG, "傳送 hello");
e.onNext("world");
Log.i(TAG, "傳送 world");
e.onComplete();
Log.i(TAG, "呼叫 onComplete");
- 先執行了observer.onSubscribe(parent);,我們在demo中也是通過傳入的parent呼叫其dispose方法來終止Reactive流,而執行分發hello等資料的e也是我們的parent,也就是他們都是同一個物件。而執行e.onNext(“hello”);的e物件也是observer的一個包裝後的ObservableEmitter型別的物件。
總結:Observer自己來控制了Reactive流狀態。
Ok,此時如果我說關鍵點應該在ObservableEmitter這個類上面,你覺得可能性有多少呢?( ̄∇ ̄)
關鍵點就是CreateEmitter parent = new CreateEmitter(observer);這個包裝的過程,我們來看下其原始碼。
static final class CreateEmitter<T>
extends AtomicReference<Disposable>
implements ObservableEmitter<T>, Disposable {
private static final long serialVersionUID = -3434801548987643227L;
final Observer<? super T> observer;
CreateEmitter(Observer<? super T> observer) {
this.observer = observer;
}
@Override
public void onNext(T t) {
if (t == null) {
onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
return;
}
if (!isDisposed()) {
observer.onNext(t);
}
}
@Override
public void onError(Throwable t) {
if (!tryOnError(t)) {
RxJavaPlugins.onError(t);
}
}
@Override
public boolean tryOnError(Throwable t) {
if (t == null) {
t = new NullPointerException("onError called with null. Null values are generally not allowed in 2.x operators and sources.");
}
if (!isDisposed()) {
try {
observer.onError(t);
} finally {
dispose();
}
return true;
}
return false;
}
@Override
public void onComplete() {
if (!isDisposed()) {
try {
observer.onComplete();
} finally {
dispose();
}
}
}
@Override
public void setDisposable(Disposable d) {
DisposableHelper.set(this, d);
}
@Override
public void setCancellable(Cancellable c) {
setDisposable(new CancellableDisposable(c));
}
@Override
public ObservableEmitter<T> serialize() {
return new SerializedEmitter<T>(this);
}
@Override
public void dispose() {
DisposableHelper.dispose(this);
}
@Override
public boolean isDisposed() {
return DisposableHelper.isDisposed(get());
}
@Override
public String toString() {
return String.format("%s{%s}", getClass().getSimpleName(), super.toString());
}
}
CreateEmitter是ObservableCreate類的靜態內部類。CreateEmitter實現了ObservableEmitter, Disposable介面類,所以需實現其方法。這裡其實是使用了裝飾者模式,其魅力所在一會就會看到了。
我們可以看到在ObservableEmitter內部通過Observer< ? super T> observer儲存了我們的observer,這樣有什麼用呢?看Demo,我們在呼叫e.onNext(“hello”);時,呼叫的時ObservableEmitter物件的onNext方法,然後ObservableEmitter物件的onNext方法內部再通過observer呼叫onNext方法,但是從原始碼我們可以發現,其並不是簡單的呼叫哦。
@Override
public void onNext(T t) {
if (t == null) {
onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
return;
}
if (!isDisposed()) {
observer.onNext(t);
}
}
1、先判斷傳入的資料是否為null
2、判斷isDisposed(),如果isDisposed()返回false則不執行onNext。
isDisposed()什麼時候會返回false呢?按照demo,也就是我們呼叫了disposable.dispose();後,disposable前面分析了就是CreateEmitter parent,我們看CreateEmitter.dispose()
@Override
public void dispose() {
DisposableHelper.dispose(this);
}
裡面呼叫了DisposableHelper.dispose(this);,我們看isDisposed()
@Override
public boolean isDisposed() {
return DisposableHelper.isDisposed(get());
}
RxJava的onComplete();與onError(t);只有一個會被執行的祕密原來是它?
再看另外兩個方法的呼叫
@Override
public void onError(Throwable t) {
if (!tryOnError(t)) {
RxJavaPlugins.onError(t);
}
}
@Override
public boolean tryOnError(Throwable t) {
if (t == null) {
t = new NullPointerException("onError called with null. Null values are generally not allowed in 2.x operators and sources.");
}
if (!isDisposed()) {
try {
observer.onError(t);
} finally {
dispose();
}
return true;
}
return false;
}
@Override
public void onComplete() {
if (!isDisposed()) {
try {
observer.onComplete();
} finally {
dispose();
}
}
}
其內部也基本做了同樣的操作,先判斷!isDisposed()後再決定是否執行。
但是再這裡還有一點哦,我們應該知道onComplete();和onError(t)只有一個會發生,其實現原理也是通過isDisposed這個方法哦,我們可以看到,不關是先執行onComplete();還是先執行onError(t),最終都會呼叫dispose();,而呼叫了dispose();後,isDisposed()為false,也就不會再執行另外一個了。而且如果人為先呼叫onComplete再呼叫onError,onComplete不會被觸發,而且會丟擲NullPointerException異常。
小結:
此時我們的目的基本達到了,我們知道了Reactive流是如何被終止的以及RxJava的onComplete();與onError(t);只有一個會被執行的原因。
我們雖然知道了原因,但是秉著刨根問底的態度,抵擋不住內心的好奇,我還是決定挖一挖DisposableHelper這個類,當然如果不想了解DisposableHelper的話,看到這裡也就可以了;
Ok,前面分析到,程式碼裡呼叫了DisposableHelper類的靜態方法,我們看下其呼叫的兩個靜態方法分別做了什麼?
public enum DisposableHelper implements Disposable {
DISPOSED;
public static boolean isDisposed(Disposable d) {
// 判斷上次記錄的終點標識的是否是 當前執行的Observer,如果是返回true
return d == DISPOSED;
}
....
public static boolean dispose(AtomicReference field) {
//1、current為我們當前的observer的Disposable的值,第一次呼叫時current是null
Disposable current = field.get();
//2、終止標識
Disposable d = DISPOSED;
//3、兩次不相同,說明observer未呼叫過dispose,
if (current != d) {
//4、將終止標識的值設定給當前的observer的Disposable,並返回設定前的observer的Disposable的值,此時如果呼叫isDisposed(Disposable d)返回的就是ture了
current = field.getAndSet(d);
if (current != d) {
//第一次呼叫時會走到這裡,此時current==null,返回true,
//current不為null時說明當前的observer呼叫了多次dispose(),而如果多次呼叫了Disposable的值還不是DISPOSED,說明之前設定失敗,所以再次呼叫dispose();
if (current != null) {
current.dispose();
}
return true;
}
}
return false;
}
....
}
1、DISPOSED:作為是否要終止的列舉型別的標識
2、isDisposed:判斷上次記錄的終點標識的是否是 當前執行的Observer,如果是返回true
3、dispose:採用了原子性引用類AtomicReference,目的是防止多執行緒操作出現的錯誤。
總結:
- 我們瞭解了RxJava的隨意終止Reactive流的能力的來源;
- 過程中也明白了RxJava的onComplete();與onError(t);只有一個會被執行的祕密。
- 實現該能力的主要方式還是利用了裝飾者模式
- 從中體會了設計模式的魅力所在,當然我們還接觸了AtomicReference這個類,在平時估計很少接觸到。
探索RxJava2之訂閱執行緒切換原理
本次我們將探索RxJava2.x執行緒切換的實現原理。
先看一個demo。
//1、觀察者建立一個Observer
Observer observer = new Observer() {
@Override
public void onSubscribe(@NonNull Disposable d) {
Log.d(TAG, "onSubscribe");
Log.d(TAG,"work thread is"+Thread.currentThread().getName());
disposable = d;
}
@Override
public void onNext(@NonNull String s) {
Log.d(TAG, "onNext data is :" + s);
Log.d(TAG,"work thread is"+Thread.currentThread().getName());
if (s.equals("hello")) {
//執行了hello之後終止
disposable.dispose();
disposable.dispose();
}
CompositeDisposable compositeDisposable = new CompositeDisposable();
compositeDisposable.dispose();
}
@Override
public void onError(@NonNull Throwable e) {
Log.d(TAG, "onError data is :" + e.toString());
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete");
}
};
Observable observable = Observable.create(new ObservableOnSubscribe() {
@Override
public void subscribe(@NonNull ObservableEmitter e) throws Exception {
Log.d(TAG,"work thread is"+Thread.currentThread().getName());
e.onNext("hello");
Log.i(TAG, "傳送 hello");
e.onNext("world");
Log.i(TAG, "傳送 world");
e.onComplete();
Log.i(TAG, "呼叫 onComplete");
}
});
版本1:不存線上程切換
observable.subscribe(observer);
輸出結果:
07-13 14:58:13.173 818-865/? D/RxJavaDemo2: onSubscribe
07-13 14:58:13.173 818-865/? D/RxJavaDemo2: work thread isInstr: android.support.test.runner.AndroidJUnitRunner
07-13 14:58:13.173 818-865/? D/RxJavaDemo2: work thread isInstr: android.support.test.runner.AndroidJUnitRunner
07-13 14:58:13.173 818-865/? D/RxJavaDemo2: onNext data is :hello
07-13 14:58:13.173 818-865/? D/RxJavaDemo2: work thread isInstr: android.support.test.runner.AndroidJUnitRunner
07-13 14:58:13.173 818-865/? I/RxJavaDemo2: 傳送 hello
07-13 14:58:13.173 818-865/? I/RxJavaDemo2: 傳送 world
07-13 14:58:13.173 818-865/? I/RxJavaDemo2: 呼叫 onComplete
07-13 14:58:13.173 818-865/? I/TestRunner: finished: testRx(com.guanaj.rxdemo.RxJavaDemo2)
版本2:切換執行緒(切換操作是如此的瀟灑)
observable
.subscribeOn(Schedulers.io())//切換到io執行緒
.observeOn(AndroidSchedulers.mainThread())//切換到主執行緒
.subscribe(observer);
輸出結果:
07-13 14:43:56.699 29727-29749/? D/RxJavaDemo2: onSubscribe
07-13 14:43:56.699 29727-29749/? D/RxJavaDemo2: work thread isInstr: android.support.test.runner.AndroidJUnitRunner
07-13 14:43:56.699 29727-29749/? I/TestRunner: finished: testRx(com.guanaj.rxdemo.RxJavaDemo2)
07-13 14:43:56.699 29727-29753/? D/RxJavaDemo2: work thread isRxCachedThreadScheduler-1
07-13 14:43:56.699 29727-29753/? I/RxJavaDemo2: 傳送 hello
07-13 14:43:56.699 29727-29753/? I/RxJavaDemo2: 傳送 world
07-13 14:43:56.699 29727-29753/? I/RxJavaDemo2: 呼叫 onComplete
07-13 14:43:56.699 29727-29727/? D/RxJavaDemo2: onNext data is :hello
07-13 14:43:56.699 29727-29727/? D/RxJavaDemo2: work thread ismain
結果分析(因為我用的是@RunWith(AndroidJUnit4.class)執行程式碼,所以在工作執行緒是AndroidJUnitRunner):
現在我們現象,後面根據現象分析原因。
沒執行緒切換的版本:
無論在哪裡呼叫subscribe,都在當前執行緒執行。
存在版本切換的版本:
1、被觀察者的onSubscribe在呼叫subscribe的執行緒中執行,
2、被觀察者的subscribe在RxJava2的RxCachedThreadScheduler-1中執行。
3、onNext工作在主執行緒
OK,現象看完了,我們開始看本質吧!但是,從哪入手呢?還是老辦法,哪裡觸發的行為就哪裡下手( ̄∇ ̄)
我們先來探索切換Observable工作執行緒的subscribeOn方法入手。
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.CUSTOM)
public final Observable<T> subscribeOn(Scheduler scheduler) {
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
}
看到了RxJavaPlugins.onAssembly,前面分析過,為hook服務,現在看成是返回傳入的Obserable即可。這裡的this為我們的observable,scheduler就是我們傳入的Schedulers.io();我們繼續看ObservableSubscribeOn;
public final class ObservableSubscribeOn<T> extends AbstractObservableWithUpstream<T, T> {....}
其繼承AbstractObservableWithUpstream
abstract class AbstractObservableWithUpstream<T, U> extends Observable<U> implements HasUpstreamObservableSource<T> {
/** The source consumable Observable. */
protected final ObservableSource<T> source;
/**
* Constructs the ObservableSource with the given consumable.
* @param source the consumable Observable
*/
AbstractObservableWithUpstream(ObservableSource<T> source) {
this.source = source;
}
@Override
public final ObservableSource<T> source() {
return source;
}
}
AbstractObservableWithUpstream繼承自Observable,其作用是通過source欄位儲存上游的Observable,因為Observable是ObservableSource介面的實現類,所以我們可以認為Observable和ObservableSource在本文中是相等的:,
也就是說ObservableSubscribeOn是對Observble進行了一次wrapper操作
我們繼續回來看ObservableSubscribeOn的原始碼
public final class ObservableSubscribeOn<T> extends AbstractObservableWithUpstream<T, T> {
//1、執行緒排程器
final Scheduler scheduler;
public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) {
//2、儲存上游的obserble
super(source);
this.scheduler = scheduler;
}
@Override
public void subscribeActual(final Observer<? super T> s) {
//以下為關鍵部分
//3、對我們下游的observer進行一次wrapper
final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);
//4、同樣,先自己呼叫自己的onSubscribe
s.onSubscribe(parent);
//5、(高能量聚集了)將排程的執行緒的Disposable賦值給當前的Disposable。scheduler可以看成是某個執行緒上的排程器。new SubscribeTask(parent)工作在其指定的執行緒裡面。SubscribeTask是一個Runnable,也就是說排程器觸發Runnable的run()執行,
//***是不是恍然大悟,那麼run()裡面的程式碼就是執行在scheduler的執行緒上了。這樣也就實現了執行緒的切換了。
parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
}
static final class SubscribeOnObserver<T> extends AtomicReference implements Observer<T>, Disposable {....}
...
}
我們開看下SubscribeTask
final class SubscribeTask implements Runnable {
private final SubscribeOnObserver<T> parent;
SubscribeTask(SubscribeOnObserver<T> parent) {
this.parent = parent;
}
@Override
public void run() {
source.subscribe(parent);
}
}
1、parent就是我們包裝後的observe,其內部儲存了下游的observer
2、source即通過ObservableSubscribeOnwrapper後儲存我們上游的observable
所以run裡面的source.subscribe(parent);即為wrapper的observer訂閱了上游的observable,觸發了上游observable的subscribeActual,開始執行資料的分發
上層obserable -》wrapper產生的observer -》真實的observser
思路梳理(重要)
Ok,分析到這裡思路基本清晰了
1、在執行subscribeOn時,在Observable和observer中插入了一個(wrapper)obserabler和(wrapper)Observer
原observable->【(Wrapper)observable||(Wrapper)Observer】->(原)Observer
2、observable.subscribe觸發->(Wrapper)Observable.subscribeActual()內部呼叫->parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));,->scheduler在指定執行緒呼叫(完成執行緒切換)->SubscribeTask.run,run內部呼叫->原Observable.subscribe((Wrapper)Observer)觸發->(原)Observable.subscribeActual()開始資料分發
此時分發給的是(Wrapper)Observer,那應該是(Wrapper)Observer分發給了(原)Observer,我們看下是不是。
OK,(Wrapper)Observer對(原)Observer進行了wrapper,我們看下原始碼:
static final class SubscribeOnObserver<T> extends AtomicReference implements Observer<T>, Disposable {
//6、儲存下游的observer
final Observer<? super T> actual;
//7、儲存下游observer的Disposable,下游的Disposable交由其管理
final AtomicReference<Disposable> s;
SubscribeOnObserver(Observersuper T> actual) {
this.actual = actual;
this.s = new AtomicReference<Disposable>();
}
@Override
public void onSubscribe(Disposable s) {
//8、onSubscribe()方法在observer呼叫subscribe時觸發,Observer傳入自己的Disposable,賦值給this.s,交由當前的包裝的Observer管理。同樣是裝飾者模式的魅力所在。
DisposableHelper.setOnce(this.s, s);
}
//當前Observer可以理解為下游observer和上游obserable的一箇中間observer。
//9、這裡直接呼叫下游observer的對應方法。
@Override
public void onNext(T t) {
actual.onNext(t);
}
@Override
public void onError(Throwable t) {
actual.onError(t);
}
@Override
public void onComplete() {
actual.onComplete();
}
//10、取消訂閱時,要同時取消下游的observer和當前的observer,因為上游obserable分發訂閱資料判斷是否需要派發時判斷的是與之最近的observer。
//上層obserable-》wrapper產生的observer》真實的observser
@Override
public void dispose() {
DisposableHelper.dispose(s);
DisposableHelper.dispose(this);
}
@Override
public boolean isDisposed() {
return DisposableHelper.isDisposed(get());
}
//11、subscribeActual()中被呼叫,目的是將Schedulers返回的Worker加入管理
void setDisposable(Disposable d) {
DisposableHelper.setOnce(this, d);
}
}
確實是(Wrapper)Observer分發給了(原)Observer。
到這裡的時候,整個流程基本OK了,但是,我們在5和11處說了,排程Worker也會加入Disposable進行管理,我還是要一探究竟( ̄∇ ̄)。
有了對SubscribeOnObserver分析的鋪墊,我們現在可以分析第5處parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));的程式碼了,我們先看scheduler.scheduleDirect()這句
@NonNull
public Disposable scheduleDirect(@NonNull Runnable run) {
//12、以毫秒為單位,無延遲排程
return scheduleDirect(run, 0L, TimeUnit.NANOSECONDS);
}
其返回一個Disposable,我們看下這個Disposable是否真的是排程的執行緒的。
@NonNull
public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
//13、Worker實現了Disposable的一個排程工作者類
final Worker w = createWorker();
//14、hook,排除hook干擾,可以理解為decoratedRun==run
final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
//15、DisposeTask同樣是實現了Disposable的Task
DisposeTask task = new DisposeTask(decoratedRun, w);
//16、開始執行
w.schedule(task, delay, unit);
//17、確實是返回了管理run的worker
return task;
}
現在重點轉移到DisposeTask,我們把run給了DisposeTask,然後worker排程task開始執行run
那麼我們可以猜測w.schedule(task, delay, unit)執行後應該是排程了task的某個方法,然後task開始執行Runnable的run
是不是真的呢?我們來看下new DisposeTask(decoratedRun, w)做了什麼
static final class DisposeTask implements Runnable, Disposable {
//18、我們外部傳入的runnable
final Runnable decoratedRun;
//19、排程工作者
final Worker w;
//20、當前執行緒
Thread runner;
DisposeTask(Runnable decoratedRun, Worker w) {
this.decoratedRun = decoratedRun;
this.w = w;
}
@Override
public void run() {
//21、獲取執decoratedRun的執行緒
runner = Thread.currentThread();
try {
//22、OK,高能來了。decoratedRun的run被執行
decoratedRun.run();
} finally {
dispose();
runner = null;
}
}
@Override
public void dispose() {
if (runner == Thread.currentThread() && w instanceof NewThreadWorker) {
((NewThreadWorker)w).shutdown();
} else {
//在DisposeTask被取消時,告訴Worker取消,因為DisposeTask是Worker進行管理的
w.dispose();
}
}
@Override
public boolean isDisposed() {
return w.isDisposed();
}
}