rxJava的使用--Observable的建立及原始碼分析(一)
最近新開的專案需要用到rxJava.在網上找了一下資料,感覺資料好少,有一些資料雖然有例子,但例子都好複雜,對一個新手來說操作是挺麻煩的.因此,本人根據自己的理解寫了一些例子還有原始碼分析,給記憶力不好的自己,留作複習用,也希望能幫助大家.文章可能寫得有點慢,請大家見諒.
本文的順序是根據ReactiveX文件中文翻譯來寫的,該文件已經把一些概念性的東西講得很清楚了,在這裡我就不再講述.如果有需要的話大家可以去看一下.
Observable的建立
1, create,程式碼如下:
Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
subscriber.onNext("hello,rxJava");
subscriber.onCompleted();
}
}).subscribe(new Subscriber<String>() {
@Override
public void onCompleted() {
System.out.println("---- onCompleted ----" );
}
@Override
public void onError(Throwable e) {
System.out.println("---- onError ----");
}
@Override
public void onNext(String s) {
System.out.println("---- onNext ----:" + s);
}
});
執行結果如下:
---- onNext ----:hello,rxJava
---- onCompleted ----
通過該方建立Observable的話需要手動的去呼叫subscriber裡面的方法如:onNext,onCompleted.onError一般是程式出現錯誤的時候,rxjava主動呼叫的,因此,正常情況下,不需要去呼叫.
create方法做了什麼呢,下面我們去看一下原始碼,首先看到create方法的內部
原始碼分析:
public static <T> Observable<T> create(OnSubscribe<T> f) {
return new Observable<T>(hook.onCreate(f));
}
看到hook.onCreate(f)的onCreate內部
public <T> OnSubscribe<T> onCreate(OnSubscribe<T> f) {
return f;
}
直接返回了我們之前建立的OnSubscribe的例項物件.接著建立Observable物件並返回.
下面看subscribe方法的內部:
public final Subscription subscribe(Subscriber<? super T> subscriber) {
return Observable.subscribe(subscriber, this);
}
這裡的this就是之前返回的Observable物件,繼續看subscribe方法的內部:
static <T> Subscription subscribe(Subscriber<? super T> subscriber, Observable<T> observable) {
... 省略部分
// subscriber每次都會首先呼叫onStart方法
subscriber.onStart();
// 這裡把subscriber封裝成一個安全的物件
// 在這裡就不分析SafeSubscriber裡面的程式碼了
// 因為裡面的程式碼無非是通過try{}catch{}來保證整個流程正常執行
if (!(subscriber instanceof SafeSubscriber)) {
subscriber = new SafeSubscriber<T>(subscriber);
}
try {
// onSubscribeStart返回之前建立的observable.onSubscribe物件,並且呼叫call
hook.onSubscribeStart(observable, observable.onSubscribe).call(subscriber);
return hook.onSubscribeReturn(subscriber);
} catch (Throwable e) {
... 省略部分
// 合理就是如丟擲異常之後的處理了
try {
subscriber.onError(hook.onSubscribeError(e));
}catch (Throwable e2) {
... 省略部分
}
// 從這也可以知道,如果有異常丟擲的話,rxjava會為我們取消訂閱
return Subscriptions.unsubscribed();
}
}
這裡需要注意的是subscriber = new SafeSubscriber(subscriber);這一句程式碼,我就簡單的說一下這樣做的原因:主要是為了防止使用者建立的subscriber丟擲異常導致整個流程不能正常執行.具體理由大家可以點選該連結,這裡有詳細的討論.
2, Defer,程式碼如下:
Observable.defer(new Func0<Observable<String>>() {
@Override
public Observable call() {
// 這裡要返回一個Observable的例項物件,在這裡用create的方法建立
return Observable.create(new Observable.OnSubscribe<String>() {
@Override
// 這裡還是create的用法
public void call(Subscriber<? super String> subscriber) {
subscriber.onNext("hello world");
subscriber.onCompleted();
}
});
}
// 然後這裡是訂閱者,跟create一樣
}).subscribe(new Subscriber<String>() {
@Override
public void onCompleted() {
System.out.println("---- onCompleted ----");
}
@Override
public void onError(Throwable e) {
System.out.println("---- onError ----");
}
@Override
public void onNext(String o) {
System.out.println("---- onNext ----" + o);
}
});
---- onNext ----hello world
---- onCompleted ----
該方法從用法還是結果來看基本和create一樣,並且還多了new Func0 <Observable>(){}這麼一個步驟. 看起來比create還麻煩.它內部的實現與create有什麼異同呢.下面我們看一下defer這個方法的內部實現.
原始碼分析:
public static <T> Observable<T> defer(Func0<Observable<T>> observableFactory) {
return create(new OnSubscribeDefer<T>(observableFactory));
}
看到這個方法之前我們new Func0物件變成了一個observable的建立工廠observableFactory.接著看OnSubscribeDefer這個類的內部,知道這個OnSubscribeDefer,實現的是OnSubscribe介面:
public final class OnSubscribeDefer<T> implements OnSubscribe<T>
該類把我們之前建立的Func0物件儲存起來
public OnSubscribeDefer(Func0<? extends Observable<? extends T>> observableFactory) {
this.observableFactory = observableFactory;
}
這樣也可以看出defer方法返回的是OnSubscribeDefer的物件.因此在subscribe呼叫的是該例項的call方法,因此去看該類的的call方法:
@Override
public void call(final Subscriber<? super T> s) {
Observable<? extends T> o;
try {
// 這裡呼叫了 Func0物件的call方法,返回了我們用過create建立的
// Observable例項
o = observableFactory.call();
} catch (Throwable t) {
Exceptions.throwOrReport(t, s);
return;
}
// 然後再這裡呼叫create裡面的方法call
// 這裡的unsafeSubscribe從原始碼可知
// 沒有把Subscriber物件包裝為SafeSubscriber
o.unsafeSubscribe(Subscribers.wrap(s));
}
繼續去看unsafeSubscribe的原始碼:
public final Subscription unsafeSubscribe(Subscriber<? super T> subscriber) {
try {
... 忽略部分
subscriber.onStart();
hook.onSubscribeStart(this, onSubscribe).call(subscriber);
return hook.onSubscribeReturn(subscriber);
} catch (Throwable e) {
... 忽略部分
try {
subscriber.onError(hook.onSubscribeError(e));
} catch (Throwable e2) {
... 忽略部分
}
return Subscriptions.unsubscribed();
}
}
該方法跟我們之前分析的subscribe基本是一樣的,除了沒有把subscriber物件包裝成SafeSubscriber物件之外.
defer的整個流程可以總結為:呼叫defer的時候並沒有直接建立Observable的例項物件,而是在呼叫subscribe時候才通過observableFactory來建立.至於它的用處,等我們講到just的時候才回過頭來看.
3, empty,程式碼如下:
Observable.empty().subscribe(new Subscriber<Object>() {
@Override
public void onCompleted() {
System.out.println("---- onCompleted ----");
}
@Override
public void onError(Throwable e) {
System.out.println("---- onError ----");
}
@Override
public void onNext(Object o) {
System.out.println("---- onNext ----");
}
});
結果如下:
---- onCompleted ----
從執行結果看,通empty建立的Observable值呼叫onCompleted方法,不呼叫onNext:
原始碼分析:
empty方法內部:
public static <T> Observable<T> empty() {
return EmptyObservableHolder.instance();
}
跟進EmptyObservableHolder.instance(),發現EmptyObservableHolder是一個列舉類:
public enum EmptyObservableHolder implements OnSubscribe<Object> {
INSTANCE
...忽略部分
}
而instance方法內容如下:
public static <T> Observable<T> instance() {
return (Observable<T>)EMPTY;
}
該方法返回了列舉類裡面的INSTANCE.
因此subscribe方法裡面呼叫的是該類的call方法了,看到該列舉類裡面的call方法:
@Override
public void call(Subscriber<? super Object> child) {
child.onCompleted();
}
從這裡可以知道為什麼empty建立的Observable物件只調用了onCompleted方法的原因了;
4, Never,程式碼如下:
Observable.never().subscribe(new Subscriber<Object>() {
@Override
public void onCompleted() {
System.out.println("---- onCompleted ----");
}
@Override
public void onError(Throwable e) {
System.out.println("---- onError ----");
}
@Override
public void onNext(Object o) {
System.out.println("---- onNext ----");
}
});
執行該程式碼片段時,什麼輸出都沒有.
原始碼分析如下:
跟進never方法看到:
public static <T> Observable<T> never() {
return NeverObservableHolder.instance();
}
根據之前empty分析可以判斷出NeverObservableHolder也是一個列舉類,該類裡面的call方法程式碼如下:
@Override
public void call(Subscriber<? super Object> child) {
}
從這裡看到該call方法執行內容為空,這就是never什麼都不執行的原因了.
5, error,程式碼如下:
Observable.error(new Exception("hello I'm error")).subscribe(new Subscriber<Object>() {
@Override
public void onCompleted() {
System.out.println("---- onCompleted ----");
}
@Override
public void onError(Throwable e) {
System.out.println("---- onError ----" + e.getMessage());
}
@Override
public void onNext(Object o) {
System.out.println("---- onNext ----");
}
});
執行結果如下:
---- onError ----hello I'm error
這裡的異常資訊就是通過error建立Observable時傳入異常的資訊.
原始碼分析如下:
error方法內部:
public static <T> Observable<T> error(Throwable exception) {
return create(new OnSubscribeThrow<T>(exception));
}
跟進OnSubscribeThrow,看到該類的call方法
@Override
public void call(Subscriber<? super T> observer) {
observer.onError(exception);
}
可以看到該類的call方法直接呼叫onError方法並且傳入我們之前的建立的Exception.
好了第一部分就先分析到這裡了,如果大家有什麼不明白的,或者認為我講得不合理的都可以給我留言.