1. 程式人生 > >RxJava學習 - 2. The Observer interface

RxJava學習 - 2. The Observer interface

RxJava學習 - 2. The Observer interface

onNext()、onComplete()和onError()方法在Observer裡定義,Observer是一個abstract interface,用來傳遞這些事件。
後面再將onSubscribe()方法,先看這三個方法:

package io.reactivex;

import io.reactivex.
disposables.Disposable; public interface Observer<T> { void onSubscribe(Disposable d); void onNext(T value); void onError(Throwable e); void onComplete(); }

Observers和源Observables是相關的。在一個context裡,source Observable是你的Observable chain的起點。我們前面的例子,Observable.create()方法或者Observable.just()方法返回的Observable是源Observable。對於filter(),從map()返回的Observable是源。它不知道源頭在哪裡,它只知道從哪兒接收的emissions。
通過一個運算返回的Observable的內部有一個Observer,它接收、轉換、中繼emissions給下游的Observer。它不知道下一個Observer是另一個運算,還是最終的Observer。當我們談論這些Observer的時候,我們經常談論在Observable chain結尾的最終的Observer。但是,每一個運算,例如map()和filter(),內部也實現了Observer。

下來,我們聚焦Observer的subscribe()方法。

Implementing and subscribing to an Observer

當你呼叫一個Observable的subscribe()方法,一個Observer通過實現這些方法,被用來消費這些三個事件。
我們能實現一個Observer,傳遞它的例項,傳給subscribe()方法。先不要關注onSubscribe(),我們後面再講:

import io.reactivex.Observable;
import io.reactivex.Observer;
import io.reactivex.disposables.
Disposable; public class Launcher { public static void main(String[] args) { Observable<String> source = Observable.just("Alpha", "Beta", "Gamma", "Delta", "Epsilon"); Observer<Integer> myObserver = new Observer<Integer>() { @Override public void onSubscribe(Disposable d) { //沒事做,忽視他 } @Override public void onNext(Integer value) { System.out.println("RECEIVED: " + value); } @Override public void onError(Throwable e) { e.printStackTrace(); } @Override public void onComplete() { System.out.println("Done!"); } }; source.map(String::length).filter(i -> i >= 5) .subscribe(myObserver); } }

我們增加了一個Observer,它接收整數長度emissions。在Observable chain的結尾,我們的Observer接收emissions,在終點,我們消費了這些emissions。通過消費,他們達到處理過程的終點,他們被寫進資料庫,文字檔案,伺服器響應,在UI展示,或者只是在控制檯列印。
我們預先宣告一個Observer,在Observable chain的終點,把它傳給subscribe()方法。
它的onNext()方法接收每個整數長度emission,列印它。這個簡單的例子不會產生錯誤,但是如果Observable chain裡發生了錯誤,將被傳給我們的
onError()實現,列印Throwable的堆疊跟蹤。最後,當源不再有更多的emissions,它將呼叫onComplete(),在控制檯列印Done!。

Shorthand Observers with lambdas

上面實現Observer的程式碼冗長而又繁瑣。幸好,subscribe()方法是過載的,可以接受三個事件的lambda引數。前面的例子,我們可以這樣寫:

    Consumer<Integer> onNext = i -> System.out.println("RECEIVED: " + i);
    Action onComplete = () -> System.out.println("Done!");
    Consumer<Throwable> onError = Throwable::printStackTrace;

可以把這三個lambda作為引數傳給subscribe()方法:

import io.reactivex.Observable;
public class Launcher {
    public static void main(String[] args) {
        Observable<String> source =
                Observable.just("Alpha", "Beta", "Gamma", "Delta",
                        "Epsilon");
        source.map(String::length).filter(i -> i >= 5)
                .subscribe(i -> System.out.println("RECEIVED: " + i),
                        Throwable::printStackTrace,
                        () -> System.out.println("Done!"));        
    }
}

注意,subscribe()還有其他的過載。你可以忽略onComplete(),只實現onNext()和onError()。這樣將不會執行onComplete(),有時候你確實不需要它:

import io.reactivex.Observable;
public class Launcher {
    public static void main(String[] args) {
        Observable<String> source =
                Observable.just("Alpha", "Beta", "Gamma", "Delta",
                        "Epsilon");
        source.map(String::length).filter(i -> i >= 5)
                .subscribe(i -> System.out.println("RECEIVED: " + i),
                        Throwable::printStackTrace);        
    }
}

你甚至可以忽略onError(),只實現onNext():

import io.reactivex.Observable;
public class Launcher {
    public static void main(String[] args) {
        Observable<String> source =
                Observable.just("Alpha", "Beta", "Gamma", "Delta",
                        "Epsilon");
        source.map(String::length).filter(i -> i >= 5)
                .subscribe(i -> System.out.println("RECEIVED: " + i));        
    }
}

錯誤可能在Observable chain的任何地方發生,會被傳播給onError()處理,然後終止Observable,不再有更多的emissions。如果你不指定onError的動作,錯誤就不會被處理。
大多數的subscribe()過載變種返回一個Disposable,我們都沒做處理。Disposable允許我們斷開到Observable的連線,這樣可以早一些終止emissions,
這會無限的或者長時間執行的Observables至關重要。我們後面討論disposables。