RxJava(一) create操作符的用法和原始碼分析
RxJava系列文章目錄導讀:
1 create操作符的基本使用
顧名思義,Create操作符是用來建立一個Observable的。下面來看一個簡單的程式碼段:
Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
//Emit Data
}
})
create方法接收一個引數Observable.OnSubscribe
/**
* Invoked when Observable.subscribe is called.
*/
public interface OnSubscribe<T> extends Action1<Subscriber<? super T>> {
// cover for generics insanity
}
Observable.OnSubscribe
說白了就是一個繼承了Action1介面的介面:
public interface Action1<T > extends Action {
void call(T t);
}
/**
* All Action interfaces extend from this.
* <p>
* Marker interface to allow instanceof checks.
*/
public interface Action extends Function {
}
/**
* All Func and Action interfaces extend from this.
* <p>
* Marker interface to allow instanceof checks.
*/
public interface Function {
}
它們的繼承關係如下:
Observable.OnSubscribe <- Action1 <- Action <- Function
create()方法也就是個工廠方法:
public static <T> Observable<T> create(OnSubscribe<T> f) {
return new Observable<T>(hook.onCreate(f));
}
通過OnSubscribe的原始碼的註釋
Invoked when Observable.subscribe is called.
意思是 當Observable被訂閱(subscribe)
OnSubscribe介面的call方法會被執行。
知道如何建立(create)Observable, 接下來我們看下如何訂閱它:
Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
for (int i = 0; i < 5; i++) {
printLog(tvLogs, "Emit Data:", i + "");
subscriber.onNext("" + i);
}
}
})
.subscribe(new Action1<String>() {
@Override
public void call(String s) {
//showToast(s);
printLog(tvLogs, "Consume Data:", s);
}
});
當呼叫了subscribe方法 Observable.OnSubscribe的call方法就會被執行,在Observable.OnSubscribe的call方法中迴圈了呼叫了5次subscriber.onNext,在subscribe的Action1回撥就會接受5次回撥。
Emit Data:'0' , Thread Name:RxCachedThreadScheduler-1
Emit Data:'1' , Thread Name:RxCachedThreadScheduler-1
Emit Data:'2' , Thread Name:RxCachedThreadScheduler-1
Emit Data:'3' , Thread Name:RxCachedThreadScheduler-1
Emit Data:'4' , Thread Name:RxCachedThreadScheduler-1
Consume Data:'0' , Thread Name:main
Consume Data:'1' , Thread Name:main
Consume Data:'2' , Thread Name:main
Consume Data:'3' , Thread Name:main
Consume Data:'4' , Thread Name:main
從輸出的日誌可以看到,我們還列印了Thread Name執行緒的名稱,我們可以控制傳送資料、消費資料所在的執行緒。
.observeOn(AndroidSchedulers.mainThread())
.subscribeOn(Schedulers.io())
subscribeOn
設定Observable的call方法所在的執行緒 【生產資料】
observeOn
設定subscribe的call方法所在的執行緒【消費資料】
2 從原始碼角度分析create()和subscribe()如何協同工作的
從上面的分析我們知道,create方法就是一個簡單的工廠方法:
public static <T> Observable<T> create(OnSubscribe<T> f) {
return new Observable<T>(hook.onCreate(f));
}
直接new一個Observable 接收的引數由hook.onCreate方法返回(該方法也很簡單):
public <T> OnSubscribe<T> onCreate(OnSubscribe<T> f) {
return f;
}
protected Observable(OnSubscribe<T> f) {
this.onSubscribe = f;
}
總結下來一句話:create操作符建立Observable,Observable通過構造方法 儲存了我們傳進來的
OnSubscribe
說白了就是Action1
.
下面來看看Observable的subscribe方法的原始碼:
public final Subscription subscribe(final Action1<? super T> onNext) {
if (onNext == null) {
throw new IllegalArgumentException("onNext can not be null");
}
return subscribe(new Subscriber<T>() {
@Override
public final void onCompleted() {
// do nothing
}
@Override
public final void onError(Throwable e) {
throw new OnErrorNotImplementedException(e);
}
@Override
public final void onNext(T args) {
onNext.call(args);
}
});
}
從原始碼可以看出subscribe方法並沒有直接呼叫傳進來引數的方法(沒有直接呼叫onNext.call())。
而是通過subscribe(Subscriber)
方法, subscribe(Subscriber)方法又是呼叫了Observable的私有靜態方法:private static <T> Subscription subscribe(Subscriber<? super T> subscriber, Observable<T> observable)
。下面是該方法的原始碼片段:
private static <T> Subscription subscribe(Subscriber<? super T> subscriber, Observable<T> observable) {
//remove some code ....
try {
// allow the hook to intercept and/or decorate
hook.onSubscribeStart(observable, observable.onSubscribe).call(subscriber);
return hook.onSubscribeReturn(subscriber);
} catch (Throwable e) {
// special handling for certain Throwable/Error/Exception types
Exceptions.throwIfFatal(e);
// if an unhandled error occurs executing the onSubscribe we will propagate it
try {
subscriber.onError(hook.onSubscribeError(e));
} catch (Throwable e2) {
Exceptions.throwIfFatal(e2);
// if this happens it means the onError itself failed (perhaps an invalid function implementation)
// so we are unable to propagate the error correctly and will just throw
RuntimeException r = new RuntimeException("Error occurred attempting to subscribe [" + e.getMessage() + "] and then again while trying to pass to onError.", e2);
// TODO could the hook be the cause of the error in the on error handling.
hook.onSubscribeError(r);
// TODO why aren't we throwing the hook's return value.
throw r;
}
return Subscriptions.unsubscribed();
}
}
我們看關鍵部分就可以了:hook.onSubscribeStart
(observable, observable.onSubscribe).call
(subscriber);
看看hook.onSubscribeStart原始碼:
public <T> OnSubscribe<T> onSubscribeStart(Observable<? extends T> observableInstance, final OnSubscribe<T> onSubscribe) {
// pass-thru by default
return onSubscribe;
}
很簡單,直接返回傳遞過來的引數(onSubscribe)。 這個OnSubscribe就是我們Observable.create(OnSubscribe)傳遞進去的OnSubscribe,然後呼叫OnSubscribe的call。
所以上面的程式碼可以簡化為(便於理解):observable.onSubscribe.call(subscriber).
至此,驗證了那句話,只有當Observable被訂閱OnSubscribe的call(subscriber)方法才會被執行。
我們知道了OnSubscribe的call(subscriber)執行的時機,那麼是如何把生產的資料傳遞了Observable.subscribe方法的回撥的呢?
我們通過Observable.subscribe原始碼得知,傳遞進來的回撥(Action1),是通過Subscriber來執行Action1的回撥,Subscriber又是Observable.create()引數的回撥。
Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
//Emit Data
}
})
所以Subscriber是Observable.OnSubscribe的回撥和Observable.subscribe(Action1..)的Action1之間通訊的橋樑。
Subscriber有三個方法:
- onCompleted();
- void onError(Throwable e);
- void onNext(T t);
既然Subscriber是Observable.create(params)引數的回撥和Observable.subscribe()引數回撥的通訊橋樑,Subscriber有三個方法,那麼Observable.subscribe肯定也有三個與之對應回撥,通過原始碼知道Observable.subscribe有很多過載方法:
- public final Subscription subscribe(final Action1
總結:Subscriber是Observable.create(Observable.OnSubscribe)引數回撥和Observable.subscribe(Action1,[Action1,Action0])引數回撥的通訊橋樑.
需要你注意的是:如果呼叫了void onError(Throwable e)方法,那麼onNext和onCompleted都不會執行。
下面用程式碼來表示他們之間的關係:
Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
for (int i = 0; i < 5; i++) {
printLog(tvLogs, "Emit Data:", i + "");
subscriber.onNext("" + i);//對應subscribe方法的第一個引數
if (condition) {
subscriber.onError(Throwable);//對應subscribe方法的第二個引數
}
}
subscriber.onCompleted(); //對應subscribe方法的第三個引數
}
})
.observeOn(AndroidSchedulers.mainThread())
.subscribeOn(Schedulers.io())
.subscribe(new Action1<String>() {
@Override
public void call(String s) {
//showToast(s);
printLog(tvLogs, "Consume Data:", s);
//onNext
}
}, new Action1<Throwable>() {
@Override
public void call(Throwable throwable) {
//onError
}
}, new Action0() {
@Override
public void call() {
//onCompleted
}
});
總結:
1. 只有當Observable被訂閱OnSubscribe的call(subscriber)方法才會被執行
2. onCompleted方法裡會把Subscription取消訂閱(unsubscribe)
3. 如果呼叫了void onError(Throwable e)方法,那麼onNext和onCompleted都不會執行。會在onError呼叫之前,把Subscription取消註冊。
4. 整個事件流不管是正常結束(onComplete)還是出現了異常(onError),Subscription都會被取消註冊(unsubscribe)。
但是,由於我們可能執行一些耗時操作,介面又被關閉了,所以還需要把subscription取消註冊
5. Subscriber是Observable.create(Observable.OnSubscribe)引數回撥和Observable.subscribe(Action1,[Action1,Action0])引數回撥的通訊橋樑.