RxJava學習 - 1. Observable
RxJava學習 - 1. Observable
Observable是一個基於push的,可組合的iterator。
對於一個給定的Observable,它push型別為T的items(叫做emissions),經過一系列運算,最後到達Observer,Observer消費這些items。
How Observables work
Observable是如何把items沿著chain順序地傳給Observer的?在最高級別,Observable傳遞三種類型的事件:
- OnNext():一次一個地把item從源Observable傳給沿途的Observer
- onComplete():完成事件,說明不會再呼叫onNext()
- onError():發生錯誤了,Observer定義了怎麼處理錯誤。除非使用retry()阻止該錯誤,Observable chain終止,不會再有emissions。
這些事件是Observer型別的抽象方法。
Using Observable.create()
可以使用Observable.create()增加一個source Observable。source Observable就是emissions的源頭,Observable chain的起始點。
import io.reactivex.Observable;
public class Launcher {
public static void main(String[] args) {
Observable<String> source = Observable.create(emitter -> {
emitter.onNext("Alpha");
emitter.onNext("Beta");
emitter.onNext("Gamma");
emitter. onNext("Delta");
emitter.onNext("Epsilon");
emitter.onComplete();
});
source.subscribe(s -> System.out.println("RECEIVED: " + s));
}
}
Emissions只能順序傳遞,而且一次只能傳一個。Emissions不能並行地或者併發地通過一個Observable。
Observables可以是無限多的,也不是必須呼叫onComplete()。
當Observable.create()塊發生錯誤,通過onError()發射(emit)他們的時候,我們可以捕獲這些錯誤。錯誤被push到chain上,由Observer處理。上面的程式碼不處理異常,但是我們也可以這麼寫:
import io.reactivex.Observable;
public class Launcher {
public static void main(String[] args) {
Observable<String> source = Observable.create(emitter -> {
try {
emitter.onNext("Alpha");
emitter.onNext("Beta");
emitter.onNext("Gamma");
emitter.onNext("Delta");
emitter.onNext("Epsilon");
emitter.onComplete();
} catch (Throwable e) {
emitter.onError(e);
}
});
source.subscribe(s -> System.out.println("RECEIVED: " + s), Throwable::printStackTrace);
}
}
onNext()、onComplete()和onError()不一定非要直接push給最終的Observer。也可以push給chain中的下一步。
下面的程式碼,我們派生出執行map()和filter()運算的新的Observables,他們會在源Observable和最終的Observer之間生效:
import io.reactivex.Observable;
public class Launcher {
public static void main(String[] args) {
Observable<String> source = Observable.create(emitter -> {
try {
emitter.onNext("Alpha");
emitter.onNext("Beta");
emitter.onNext("Gamma");
emitter.onNext("Delta");
emitter.onNext("Epsilon");
emitter.onComplete();
} catch (Throwable e) {
emitter.onError(e);
}
});
Observable<Integer> lengths = source.map(String::length);
Observable<Integer> filtered = lengths.filter(i -> i >= 5);
filtered.subscribe(s -> System.out.println("RECEIVED: " + s));
}
}
上面的程式碼,onNext()把每個item交給map(),在map()內部,它作為一箇中介Observer,八字串轉換成它的length()。然後,呼叫onNext()和filter()傳遞這個整數,lambda條件i -> i >= 5會過濾掉長度小於5的。最後,filter()呼叫onNext()把每個item交給最終的Observer,他們被列印。
請注意,map()會產生一個新的Observable。filter()也會產生一個新的Observable,但是會忽略條件不成立的emissions。因為類似map()和filter()這樣的運算會產生新的Observables(內部使用Observer實現接收emissions),我們能連結所有的Observables,節省掉中間變數:
import io.reactivex.Observable;
public class Launcher {
public static void main(String[] args) {
Observable<String> source = Observable.create(emitter -> {
try {
emitter.onNext("Alpha");
emitter.onNext("Beta");
emitter.onNext("Gamma");
emitter.onNext("Delta");
emitter.onNext("Epsilon");
emitter.onComplete();
} catch (Throwable e) {
emitter.onError(e);
}
});
source.map(String::length)
.filter(i -> i >= 5)
.subscribe(s -> System.out.println("RECEIVED: " + s));
}
}
**警告:**RxJava 2.0,Observables不再支援發射null值。如果你增加一個Observable,嘗試發射一個null值,會立刻得到一個non-null異常。
如果你需要發射null,請使用JDK8或者Google Guava的Optional。
Using Observable.just()
一般不需要使用Observable.create(),我們一般使用streamlined的工廠增加Observables。
前面使用Observable.create()的例子,可以使用Observable.just()完成。我們能最多傳遞10個想要發射的items。它將為每個item呼叫onNext(),
當全部push以後,呼叫onComplete():
import io.reactivex.Observable;
public class Launcher {
public static void main(String[] args) {
Observable<String> source =
Observable.just("Alpha", "Beta", "Gamma", "Delta",
"Epsilon");
source.map(String::length).filter(i -> i >= 5)
.subscribe(s -> System.out.println("RECEIVED: " + s));
}
}
也可以使用Observable.fromIterable(),從任何Iterable型別發射items,比如一個List。它也為每個item呼叫onNext(),在iteration完成以後,呼叫onComplete()。
import io.reactivex.Observable;
import java.util.Arrays;
import java.util.List;
public class Launcher {
public static void main(String[] args) {
List<String> items =
Arrays.asList("Alpha", "Beta", "Gamma", "Delta", "Epsilon");
Observable<String> source = Observable.fromIterable(items);
source.map(String::length).filter(i -> i >= 5)
.subscribe(s -> System.out.println("RECEIVED: " + s));
}
}