RxJava學習 - 3. Cold versus hot Observables
RxJava學習 - 3. Cold versus hot Observables
在Observable和Observer的關係中,有一個微妙的行為,依賴於Observable是如何實現的。Observables的一個主要特徵是cold和hot,當有多個Observers時,這決定了Observables的行為。
首先,我們看看cold Observables。
Cold Observables
Cold Observables很像一個音樂CD,可以被每個聽眾重新播放,每個人可以在任何時刻聽這些樂曲。
同樣的,cold Observables可以為每個Observer,replay它的emissions,確保所有的Observers拿到全部資料。大多數資料驅動的Observables是cold,包括Observable.just()和Observable.fromIterable()工廠。
下面的例子,我們有兩個Observers訂閱了一個Observable。Observable首先發射所有的emissions給第一個Observer,然後呼叫onComplete()。然後,它再次發射所有的emissions給第二個Observer,然後呼叫onComplete()。通過兩個分開的流,他們都接收到相同的資料。這是cold Observable的典型行為:
import io.reactivex.Observable;
public class Launcher {
public static void main(String[] args) {
Observable<String> source =
Observable.just("Alpha", "Beta", "Gamma", "Delta", "Epsilon");
//first observer
source.subscribe(s -> System.out.println( "Observer 1 Received: " + s));
//second observer
source.subscribe(s -> System.out.println("Observer 2 Received: " + s));
}
}
甚至在第二個Observer轉換了emissions,它仍然獲得一個自己的emissions流。使用map()和filter()仍然能產生cold的Observables:
import io.reactivex.Observable;
public class Launcher {
public static void main(String[] args) {
Observable<String> source =
Observable.just("Alpha", "Beta", "Gamma", "Delta", "Epsilon");
//first observer
source.subscribe(s -> System.out.println("Observer 1 Received: " + s));
//second observer
source.map(String::length).filter(i -> i >= 5)
.subscribe(s -> System.out.println("Observer 2 Received: " + s));
}
}
它的輸出是
Observer 1 Received: Alpha
Observer 1 Received: Beta
Observer 1 Received: Gamma
Observer 1 Received: Delta
Observer 1 Received: Epsilon
Observer 2 Received: 5
Observer 2 Received: 5
Observer 2 Received: 5
Observer 2 Received: 7
這裡有更實際的例子:Dave Moten’s RxJava-JDBC。
允許你增加cold Observables實現SQL查詢。如果你想查詢一個SQLite資料庫,可以在你的=工程裡包含SQLite JDBC驅動和RxJava-JDBC庫。
然後可以反應式地查詢資料庫表,像下面這樣:
import com.github.davidmoten.rx.jdbc.ConnectionProviderFromUrl;
import com.github.davidmoten.rx.jdbc.Database;
import rx.Observable;
import java.sql.Connection;
public class Launcher {
public static void main(String[] args) {
Connection conn =
new ConnectionProviderFromUrl("jdbc:sqlite:/home/thomas/rexon_metals.db").get();
Database db = Database.from(conn);
Observable<String> customerNames =
db.select("SELECT NAME FROM CUSTOMER").getAs(String.class);
customerNames.subscribe(s -> System.out.println(s));
}
}
SQL-driven Observable是cold。很多Observables從有限資料來源(比如資料庫、文字檔案或者JSON)發射資料,他們都是cold。RxJava-JDBC為每個Observer執行查詢。這意味著如果資料在兩個訂閱發生了變化,第二個Observer可以拿到與第一個不同的emissions。cold Observables會為每個Observer,重新生成emissions。
Hot Observables
hot Observable更像是一個廣播電臺。在同一個時刻,它向所有的Observers廣播相同的emissions。如果一個Observer訂閱了一個hot Observable,接收相同的emissions,然後來了另一個Observer,第二個Observer會錯過這些emissions。就像廣播電臺,如果你開啟晚了,你就聽不到那首歌。
hot Observables通常代表事件而不是有限資料集。事件可以攜帶資料,但是,他們是時間敏感的元件,後來的observers會錯過先前的資料。
例如,一個JavaFX或者一個Android UI事件能表示成一個hot Observable。在JavaFX裡,你能使用一個ToggleButton物件的selectedProperty()方法
增加一個Observable。然後把布林emissions轉換成字串,表示該按鈕的狀態(UP或者DOWN),使用一個Observer在Label裡顯示:
import io.reactivex.Observable;
import javafx.application.Application;
import javafx.beans.value.ChangeListener;
import javafx.beans.value.ObservableValue;
import javafx.scene.Scene;
import javafx.scene.control.Label;
import javafx.scene.control.ToggleButton;
import javafx.scene.layout.VBox;
import javafx.stage.Stage;
public class MyJavaFxApp extends Application {
@Override
public void start(Stage stage) throws Exception {
ToggleButton toggleButton = new ToggleButton("TOGGLE ME");
Label label = new Label();
Observable<Boolean> selectedStates = valuesOf(toggleButton.selectedProperty());
selectedStates.map(selected -> selected ? "DOWN" : "UP")
.subscribe(label::setText);
VBox vBox = new VBox(toggleButton, label);
stage.setScene(new Scene(vBox));
stage.show();
}
private static <T> Observable<T> valuesOf(final ObservableValue<T> fxObservable) {
return Observable.create(observableEmitter -> {
//emit initial state
observableEmitter.onNext(fxObservable.getValue());
//emit value changes uses a listener
final ChangeListener<T> listener = (observableValue, prev, current) -> observableEmitter.onNext(current);
fxObservable.addListener(listener);
});
}
}
JavaFX ObservableValue與RxJava Observable無關。它是JavaFX的,但是,使用valuesOf()工廠方法,很容易轉成Observable。
每次點選ToggleButton,Observable會根據狀態發射相應的true或者false。
JavaFX和Android的UI事件主要的hot Observables,你也能使用hot Observables反映伺服器請求。如果你增加一個Observable
為某Twitter主題發射推文,這也是一個hot Observable。hot Observable不一定非要是無限的,只要是向所有的Observers共享emissions,
不replay錯過的emissions的都是hot。
ConnectableObservable
一種有用的hot Observable是ConnectableObservable。它可以是任何Observable(包括cold),讓它變hot,這樣所有的emissions都只給Observers一次。想做這種轉變,你只需要簡單地在任何Observable上呼叫publish(),就會產生一個ConnectableObservable。但是,subscribing不能開始emissions。你需要呼叫它的connect()方法啟動emissions的發射。這樣,你可以預先設定你的Observers:
import io.reactivex.Observable;
import io.reactivex.observables.ConnectableObservable;
public class Launcher {
public static void main(String[] args) {
ConnectableObservable<String> source =
Observable.just("Alpha", "Beta", "Gamma", "Delta", "Epsilon").publish();
//Set up observer 1
source.subscribe(s -> System.out.println("Observer 1: " + s));
//Set up observer 2
source.map(String::length)
.subscribe(i -> System.out.println("Observer 2: " + i));
//Fire!
source.connect();
}
}
它的輸出是
Observer 1: Alpha
Observer 2: 5
Observer 1: Beta
Observer 2: 4
Observer 1: Gamma
Observer 2: 5
Observer 1: Delta
Observer 2: 5
Observer 1: Epsilon
Observer 2: 7
注意,一個Observer收到字串,另一個就收到長度,這兩個是交錯進行的。訂閱都是預先設定好的,然後呼叫connect()開始發射。每個emission同時送給每個Observer。使用ConnectableObservable,強制每個emission同時發給所有的Observers,這叫做多播(multicasting),以後詳細講。
ConnectableObservable是有用的,防止資料被replay給每個Observer。
如果你覺得重播太昂貴了,你可能想這樣做,每個emission只發射一次。甚至有多個下游Observers的時候,你也可以簡單地強制上游使用一個流例項。
多個Observers通常會導致上游有多個流例項,但是,使用publish()返回ConnectableObservable來合併所有的上游形成一個流。
請記住,ConnectableObservable是hot。