RxJava學習 - 4. Other Observable sources
RxJava學習 - 4. Other Observable sources
- Observable.range()
- Observable.interval()
- Observable.future()
- Observable.empty()
- Observable.never()
- Observable.error()
- Observable.defer()
- Observable.fromCallable()
Observable.range()
可以使用Observable.range()發射一定範圍內的整數。從start值開始傳送發射,每次加1,直到一定的count。這些數通過onNext()事件傳遞,跟著一個onComplete()事件:
import io.reactivex.Observable;
public class Launcher {
public static void main(String[] args) {
Observable.range(1,10)
.subscribe(s -> System.out.println("RECEIVED: " + s));
}
}
可以使用Observable.rangeLong()發射比較大的數。
Observable.interval()
Observable.interval()生成一個基於時間的Observable。每經過一個時間間隔它將發射一個連續的long emission(從0開始)。
下面的程式碼,每秒鐘發射一個數:
import io.reactivex.Observable;
import java.util.concurrent.TimeUnit;
public class Launcher {
public static void main(String[]args) {
Observable.interval(1, TimeUnit.SECONDS)
.subscribe(s -> System.out.println(s + " Mississippi"));
sleep(5000);
}
public static void sleep(int millis) {
try {
Thread.sleep(millis);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
Observable.interval()在一個定時器上執行,所以,需要一個單獨的執行緒做排程。上面的程式,main()方法生成一個Observable,但是不等它完成。它在一個新的執行緒上發射。要讓main()不退出,我們使用sleep()方法讓程式存活了5秒鐘。在程式退出之前,我們的Observable有5秒鐘時間可以發射。當你增加一個生產程式的時候,你一般不會碰到這樣的問題,web服務、Android程式或者JavaFX將保持程式的存活。
Observable.interval()返回的是一個hot還是cold Observable?因為它是事件驅動的(頁數無限的),你可能會說它是hot。
但是,增加一個Observer,等5秒鐘,再加一個Observer。發生了什麼?請看:
import io.reactivex.Observable;
import java.util.concurrent.TimeUnit;
public class Launcher {
public static void main(String[] args) {
Observable<Long> seconds = Observable.interval(1, TimeUnit.SECONDS);
//Observer 1
seconds.subscribe(l -> System.out.println("Observer 1: " + l));
//sleep 5 seconds
sleep(5000);
//Observer 2
seconds.subscribe(l -> System.out.println("Observer 2: " + l));
//sleep 5 seconds
sleep(5000);
}
public static void sleep(int millis) {
try {
Thread.sleep(millis);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
輸出是這樣的:
Observer 1: 0
Observer 1: 1
Observer 1: 2
Observer 1: 3
Observer 1: 4
Observer 1: 5
Observer 2: 0
Observer 1: 6
Observer 2: 1
Observer 1: 7
Observer 2: 2
Observer 1: 8
Observer 2: 3
Observer 1: 9
Observer 2: 4
5秒鐘過去了,Observer 2來了。注意,它有自己的計時器,從0開始。這兩個observers實際上有他們自己的emissions,每個都是從0開始。
所以,這個Observable實際上是cold。想讓所有的observers使用同一個計時器,可以使用ConnectableObservable:
import io.reactivex.Observable;
import java.util.concurrent.TimeUnit;
public class Launcher {
public static void main(String[] args) {
ConnectableObservable<Long> seconds =
Observable.interval(1, TimeUnit.SECONDS).publish();
//observer 1
seconds.subscribe(l -> System.out.println("Observer 1: " + l));
seconds.connect();
//sleep 5 seconds
sleep(5000);
//observer 2
seconds.subscribe(l -> System.out.println("Observer 2: " + l));
//sleep 5 seconds
sleep(5000);
}
public static void sleep(int millis) {
try {
Thread.sleep(millis);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
輸出如下:
Observer 1: 0
Observer 1: 1
Observer 1: 2
Observer 1: 3
Observer 1: 4
Observer 1: 5
Observer 2: 5
Observer 1: 6
Observer 2: 6
Observer 1: 7
Observer 2: 7
Observer 1: 8
Observer 2: 8
Observer 1: 9
Observer 2: 9
Observable.future()
RxJava的Observables比Futures更健壯和富有表現力,但是,如果你現在使用的庫仍然生產Futures,可以很容易地把他們轉換成Observables:
import io.reactivex.Observable;
import java.util.concurrent.Future;
public class Launcher {
public static void main(String[] args) {
Future<String> futureValue = ...;
Observable.fromFuture(futureValue)
.map(String::length)
.subscribe(System.out::println);
}
}
Observable.empty()
看上去好像沒什麼用,但是,Observable可以什麼都不發射,然後呼叫onComplete():
import io.reactivex.Observable;
public class Launcher {
public static void main(String[] args) {
Observable<String> empty = Observable.empty();
empty.subscribe(System.out::println,
Throwable::printStackTrace,
() -> System.out.println("Done!"));
}
}
空的observables通常代表空的資料集。有時候,有些運算也能返回空,比如filter()。有時候,你故意使用Observable.empty()。
empty Observable實質上就是RxJava的null。Empty Observables比null優雅,程式會繼續,不會拋NullPointerExceptions。
Observable.never()
Observable.empty()的近親是Observable.never()。唯一的不同是,它不呼叫onComplete(),observers一直在等emissions,卻永遠等不來:
import io.reactivex.Observable;
public class Launcher {
public static void main(String[] args) {
Observable<String> empty = Observable.never();
empty.subscribe(System.out::println,
Throwable::printStackTrace,
() -> System.out.println("Done!"));
sleep(5000);
}
}
這個Observable主要用來測試,生產中不經常使用。我們使用sleep(),就像Observable.interval()那樣,因為main執行緒產生Observable以後不會等待。
這個例子裡,我們等了5秒鐘,就是為了證明什麼都沒發射,然後,程式退出。
Observable.error()
你可以增加一個Observable,立刻呼叫onError():
import io.reactivex.Observable;
public class Launcher {
public static void main(String[] args) {
Observable.error(new Exception("Crash and burn!"))
.subscribe(i -> System.out.println("RECEIVED: " + i),
Throwable::printStackTrace,
() -> System.out.println("Done!"));
}
}
你也可以使用lambda提供異常,這樣給每個Observer提供了單獨的異常例項:
import io.reactivex.Observable;
public class Launcher {
public static void main(String[] args) {
Observable.error(() -> new Exception("Crash and burn!"))
.subscribe(i -> System.out.println("RECEIVED: " + i),
Throwable::printStackTrace,
() -> System.out.println("Done!"));
}
}
Observable.defer()
Observable.defer()是一個強有力的工廠,為每個Observer增加單獨的狀態。當使用一定的Observable工廠的時候,如果你的源是有狀態的,你可能想給每個Observer增加單獨的狀態。你的源Observable可能不知道它的引數已經修改了,傳送過時的emissions。這裡有個簡單的例子,你有一個Observable.range(),有兩個靜態的int屬性:start和count。
如果你訂閱這個Observable,修改count,然後再次訂閱,你會發現,第二個Observer沒看到這個變化:
import io.reactivex.Observable;
public class Launcher {
private static int start = 1;
private static int count = 5;
public static void main(String[] args) {
Observable<Integer> source = Observable.range(start, count);
source.subscribe(i -> System.out.println("Observer 1: " + i));
//modify count
count = 10;
source.subscribe(i -> System.out.println("Observer 2: " + i));
}
}
輸出是這樣的:
Observer 1: 1
Observer 1: 2
Observer 1: 3
Observer 1: 4
Observer 1: 5
Observer 2: 1
Observer 2: 2
Observer 2: 3
Observer 2: 4
Observer 2: 5
想解決這個問題,你能玩兒每個訂閱者增加一個fresh Observable。使用Observable.defer()就可以實現,它接受一個lambda,
說明怎麼為每個訂閱者增加一個Observable。因為這樣做每次增加一個新的Observable,就可以反映引數的變化:
import io.reactivex.Observable;
public class Launcher {
private static int start = 1;
private static int count = 5;
public static void main(String[] args) {
Observable<Integer> source = Observable.defer(() ->
Observable.range(start,count));
source.subscribe(i -> System.out.println("Observer 1: " + i));
//modify count
count = 10;
source.subscribe(i -> System.out.println("Observer 2: " + i));
}
}
輸出是這樣的:
Observer 1: 1
Observer 1: 2
Observer 1: 3
Observer 1: 4
Observer 1: 5
Observer 2: 1
Observer 2: 2
Observer 2: 3
Observer 2: 4
Observer 2: 5
Observer 2: 6
Observer 2: 7
Observer 2: 8
Observer 2: 9
Observer 2: 10
如果你的Observable源是原生實現,而且超過一個Observer(比如,重新使用一個Iterator,該迭代器只迭代一次),也可以使用Observable.defer()解決。
Observable.fromCallable()
如果你需要執行一個計算或者動作,然後發射它,你可以使用Observable.just()(或者Single.just()、Maybe.just())。
但是,有時候,我們想晚一點再做,或者是deferred的方式。而且,如果處理過程發生錯誤,我們希望以onError()的方式發到Observable chain,而不是以Java傳統的方式拋異常。例如,如果你的Observable.just()想用1除以0,會拋異常,但是不發給Observer:
import io.reactivex.Observable;
public class Launcher {
public static void main(String[] args) {
Observable.just(1 / 0)
.subscribe(i -> System.out.println("RECEIVED: " + i),
e -> System.out.println("Error Captured: " + e));
}
}
輸出是:
java.lang.ArithmeticException: / by zero
at Launcher.main(Launcher.java:6)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
如果希望響應式地處理錯誤,把錯誤發給Observer處理。上面的例子,可以使用Observable.fromCallable(),它接受一個lambda Supplier,發生錯誤時會發射給Observer:
import io.reactivex.Observable;
public class Launcher {
public static void main(String[] args) {
Observable.fromCallable(() -> 1 / 0)
.subscribe(i -> System.out.println("Received: " + i),
e -> System.out.println("Error Captured: " + e));
}
}
輸出是:
Error Captured: java.lang.ArithmeticException: / by zero