1. 程式人生 > >RxJava學習 - 3. Cold versus hot Observables

RxJava學習 - 3. Cold versus hot Observables

RxJava學習 - 3. Cold versus hot Observables

在Observable和Observer的關係中,有一個微妙的行為,依賴於Observable是如何實現的。Observables的一個主要特徵是cold和hot,當有多個Observers時,這決定了Observables的行為。
首先,我們看看cold Observables。

Cold Observables

Cold Observables很像一個音樂CD,可以被每個聽眾重新播放,每個人可以在任何時刻聽這些樂曲。
同樣的,cold Observables可以為每個Observer,replay它的emissions,確保所有的Observers拿到全部資料。大多數資料驅動的Observables是cold,包括Observable.just()和Observable.fromIterable()工廠。
下面的例子,我們有兩個Observers訂閱了一個Observable。Observable首先發射所有的emissions給第一個Observer,然後呼叫onComplete()。然後,它再次發射所有的emissions給第二個Observer,然後呼叫onComplete()。通過兩個分開的流,他們都接收到相同的資料。這是cold Observable的典型行為:

import io.reactivex.Observable;
public class Launcher {
    public static void main(String[] args) {
        Observable<String> source =
                Observable.just("Alpha", "Beta", "Gamma", "Delta", "Epsilon");
        //first observer
        source.subscribe(s -> System.out.println(
"Observer 1 Received: " + s)); //second observer source.subscribe(s -> System.out.println("Observer 2 Received: " + s)); } }

甚至在第二個Observer轉換了emissions,它仍然獲得一個自己的emissions流。使用map()和filter()仍然能產生cold的Observables:

import io.reactivex.Observable;
public class Launcher {
public static void main(String[] args) { Observable<String> source = Observable.just("Alpha", "Beta", "Gamma", "Delta", "Epsilon"); //first observer source.subscribe(s -> System.out.println("Observer 1 Received: " + s)); //second observer source.map(String::length).filter(i -> i >= 5) .subscribe(s -> System.out.println("Observer 2 Received: " + s)); } }

它的輸出是

Observer 1 Received: Alpha
Observer 1 Received: Beta
Observer 1 Received: Gamma
Observer 1 Received: Delta
Observer 1 Received: Epsilon
Observer 2 Received: 5
Observer 2 Received: 5
Observer 2 Received: 5
Observer 2 Received: 7

這裡有更實際的例子:Dave Moten’s RxJava-JDBC
允許你增加cold Observables實現SQL查詢。如果你想查詢一個SQLite資料庫,可以在你的=工程裡包含SQLite JDBC驅動和RxJava-JDBC庫。
然後可以反應式地查詢資料庫表,像下面這樣:

import com.github.davidmoten.rx.jdbc.ConnectionProviderFromUrl;
import com.github.davidmoten.rx.jdbc.Database;
import rx.Observable;
import java.sql.Connection;

public class Launcher {
    public static void main(String[] args) {
        Connection conn =
            new ConnectionProviderFromUrl("jdbc:sqlite:/home/thomas/rexon_metals.db").get();
        Database db = Database.from(conn);
        Observable<String> customerNames =
            db.select("SELECT NAME FROM CUSTOMER").getAs(String.class);
        customerNames.subscribe(s -> System.out.println(s));
    }
}

SQL-driven Observable是cold。很多Observables從有限資料來源(比如資料庫、文字檔案或者JSON)發射資料,他們都是cold。RxJava-JDBC為每個Observer執行查詢。這意味著如果資料在兩個訂閱發生了變化,第二個Observer可以拿到與第一個不同的emissions。cold Observables會為每個Observer,重新生成emissions。

Hot Observables

hot Observable更像是一個廣播電臺。在同一個時刻,它向所有的Observers廣播相同的emissions。如果一個Observer訂閱了一個hot Observable,接收相同的emissions,然後來了另一個Observer,第二個Observer會錯過這些emissions。就像廣播電臺,如果你開啟晚了,你就聽不到那首歌。
hot Observables通常代表事件而不是有限資料集。事件可以攜帶資料,但是,他們是時間敏感的元件,後來的observers會錯過先前的資料。
例如,一個JavaFX或者一個Android UI事件能表示成一個hot Observable。在JavaFX裡,你能使用一個ToggleButton物件的selectedProperty()方法
增加一個Observable。然後把布林emissions轉換成字串,表示該按鈕的狀態(UP或者DOWN),使用一個Observer在Label裡顯示:

import io.reactivex.Observable;
import javafx.application.Application;
import javafx.beans.value.ChangeListener;
import javafx.beans.value.ObservableValue;
import javafx.scene.Scene;
import javafx.scene.control.Label;
import javafx.scene.control.ToggleButton;
import javafx.scene.layout.VBox;
import javafx.stage.Stage;

public class MyJavaFxApp extends Application {
    @Override
    public void start(Stage stage) throws Exception {
        ToggleButton toggleButton = new ToggleButton("TOGGLE ME");
        Label label = new Label();
        Observable<Boolean> selectedStates = valuesOf(toggleButton.selectedProperty());
        selectedStates.map(selected -> selected ? "DOWN" : "UP")
            .subscribe(label::setText);
        VBox vBox = new VBox(toggleButton, label);
        stage.setScene(new Scene(vBox));
        stage.show();
    }
    
    private static <T> Observable<T> valuesOf(final ObservableValue<T> fxObservable) {
        return Observable.create(observableEmitter -> {
            //emit initial state
            observableEmitter.onNext(fxObservable.getValue());
            //emit value changes uses a listener
            final ChangeListener<T> listener = (observableValue, prev, current) -> observableEmitter.onNext(current);
            fxObservable.addListener(listener);
        });
    }
}

JavaFX ObservableValue與RxJava Observable無關。它是JavaFX的,但是,使用valuesOf()工廠方法,很容易轉成Observable。
每次點選ToggleButton,Observable會根據狀態發射相應的true或者false。
JavaFX和Android的UI事件主要的hot Observables,你也能使用hot Observables反映伺服器請求。如果你增加一個Observable
為某Twitter主題發射推文,這也是一個hot Observable。hot Observable不一定非要是無限的,只要是向所有的Observers共享emissions,
不replay錯過的emissions的都是hot。

ConnectableObservable

一種有用的hot Observable是ConnectableObservable。它可以是任何Observable(包括cold),讓它變hot,這樣所有的emissions都只給Observers一次。想做這種轉變,你只需要簡單地在任何Observable上呼叫publish(),就會產生一個ConnectableObservable。但是,subscribing不能開始emissions。你需要呼叫它的connect()方法啟動emissions的發射。這樣,你可以預先設定你的Observers:

import io.reactivex.Observable;
import io.reactivex.observables.ConnectableObservable;
public class Launcher {
    public static void main(String[] args) {
        ConnectableObservable<String> source =
                Observable.just("Alpha", "Beta", "Gamma", "Delta", "Epsilon").publish();
        //Set up observer 1
        source.subscribe(s -> System.out.println("Observer 1: " + s));
        //Set up observer 2
        source.map(String::length)
                .subscribe(i -> System.out.println("Observer 2: " + i));
        //Fire!
        source.connect();        
    }
}

它的輸出是

Observer 1: Alpha
Observer 2: 5
Observer 1: Beta
Observer 2: 4
Observer 1: Gamma
Observer 2: 5
Observer 1: Delta
Observer 2: 5
Observer 1: Epsilon
Observer 2: 7

注意,一個Observer收到字串,另一個就收到長度,這兩個是交錯進行的。訂閱都是預先設定好的,然後呼叫connect()開始發射。每個emission同時送給每個Observer。使用ConnectableObservable,強制每個emission同時發給所有的Observers,這叫做多播(multicasting),以後詳細講。
ConnectableObservable是有用的,防止資料被replay給每個Observer。
如果你覺得重播太昂貴了,你可能想這樣做,每個emission只發射一次。甚至有多個下游Observers的時候,你也可以簡單地強制上游使用一個流例項。
多個Observers通常會導致上游有多個流例項,但是,使用publish()返回ConnectableObservable來合併所有的上游形成一個流。
請記住,ConnectableObservable是hot。