1. 程式人生 > >RxJava學習 - 1. Observable

RxJava學習 - 1. Observable

RxJava學習 - 1. Observable

Observable是一個基於push的,可組合的iterator。
對於一個給定的Observable,它push型別為T的items(叫做emissions),經過一系列運算,最後到達Observer,Observer消費這些items。

How Observables work

Observable是如何把items沿著chain順序地傳給Observer的?在最高級別,Observable傳遞三種類型的事件:

  • OnNext():一次一個地把item從源Observable傳給沿途的Observer
  • onComplete():完成事件,說明不會再呼叫onNext()
  • onError():發生錯誤了,Observer定義了怎麼處理錯誤。除非使用retry()阻止該錯誤,Observable chain終止,不會再有emissions。

這些事件是Observer型別的抽象方法。

Using Observable.create()

可以使用Observable.create()增加一個source Observable。source Observable就是emissions的源頭,Observable chain的起始點。

import io.reactivex.Observable;
public class Launcher {
    public static void main(String[] args) {
        Observable<String> source = Observable.create(emitter -> {
            emitter.onNext("Alpha");
            emitter.onNext("Beta");
            emitter.onNext("Gamma");
            emitter.
onNext("Delta"); emitter.onNext("Epsilon"); emitter.onComplete(); }); source.subscribe(s -> System.out.println("RECEIVED: " + s)); } }

Emissions只能順序傳遞,而且一次只能傳一個。Emissions不能並行地或者併發地通過一個Observable。
Observables可以是無限多的,也不是必須呼叫onComplete()。
當Observable.create()塊發生錯誤,通過onError()發射(emit)他們的時候,我們可以捕獲這些錯誤。錯誤被push到chain上,由Observer處理。上面的程式碼不處理異常,但是我們也可以這麼寫:

import io.reactivex.Observable;
public class Launcher {
    public static void main(String[] args) {
        Observable<String> source = Observable.create(emitter -> {
            try {
                emitter.onNext("Alpha");
                emitter.onNext("Beta");
                emitter.onNext("Gamma");
                emitter.onNext("Delta");
                emitter.onNext("Epsilon");
                emitter.onComplete();
            } catch (Throwable e) {
                emitter.onError(e);
            }
        });
        source.subscribe(s -> System.out.println("RECEIVED: " + s), Throwable::printStackTrace);
    }
}

onNext()、onComplete()和onError()不一定非要直接push給最終的Observer。也可以push給chain中的下一步。
下面的程式碼,我們派生出執行map()和filter()運算的新的Observables,他們會在源Observable和最終的Observer之間生效:

import io.reactivex.Observable;
public class Launcher {
    public static void main(String[] args) {
        Observable<String> source = Observable.create(emitter -> {
            try {
                emitter.onNext("Alpha");
                emitter.onNext("Beta");
                emitter.onNext("Gamma");
                emitter.onNext("Delta");
                emitter.onNext("Epsilon");
                emitter.onComplete();
            } catch (Throwable e) {
                emitter.onError(e);
            }
        });
        Observable<Integer> lengths = source.map(String::length);
        Observable<Integer> filtered = lengths.filter(i -> i >= 5);
        filtered.subscribe(s -> System.out.println("RECEIVED: " + s));
    }
}

上面的程式碼,onNext()把每個item交給map(),在map()內部,它作為一箇中介Observer,八字串轉換成它的length()。然後,呼叫onNext()和filter()傳遞這個整數,lambda條件i -> i >= 5會過濾掉長度小於5的。最後,filter()呼叫onNext()把每個item交給最終的Observer,他們被列印。
請注意,map()會產生一個新的Observable。filter()也會產生一個新的Observable,但是會忽略條件不成立的emissions。因為類似map()和filter()這樣的運算會產生新的Observables(內部使用Observer實現接收emissions),我們能連結所有的Observables,節省掉中間變數:

import io.reactivex.Observable;
public class Launcher {
    public static void main(String[] args) {
        Observable<String> source = Observable.create(emitter -> {
            try {
                emitter.onNext("Alpha");
                emitter.onNext("Beta");
                emitter.onNext("Gamma");
                emitter.onNext("Delta");
                emitter.onNext("Epsilon");
                emitter.onComplete();
            } catch (Throwable e) {
                emitter.onError(e);
            }
        });
        source.map(String::length)
                .filter(i -> i >= 5)
                .subscribe(s -> System.out.println("RECEIVED: " + s));
    }
}

**警告:**RxJava 2.0,Observables不再支援發射null值。如果你增加一個Observable,嘗試發射一個null值,會立刻得到一個non-null異常。
如果你需要發射null,請使用JDK8或者Google Guava的Optional。

Using Observable.just()

一般不需要使用Observable.create(),我們一般使用streamlined的工廠增加Observables。
前面使用Observable.create()的例子,可以使用Observable.just()完成。我們能最多傳遞10個想要發射的items。它將為每個item呼叫onNext(),
當全部push以後,呼叫onComplete():

import io.reactivex.Observable;
public class Launcher {
    public static void main(String[] args) {
        Observable<String> source =
                Observable.just("Alpha", "Beta", "Gamma", "Delta",
                        "Epsilon");
        source.map(String::length).filter(i -> i >= 5)
                .subscribe(s -> System.out.println("RECEIVED: " + s));
    }
}

也可以使用Observable.fromIterable(),從任何Iterable型別發射items,比如一個List。它也為每個item呼叫onNext(),在iteration完成以後,呼叫onComplete()。

import io.reactivex.Observable;
import java.util.Arrays;
import java.util.List;
public class Launcher {
    public static void main(String[] args) {
        List<String> items =
                Arrays.asList("Alpha", "Beta", "Gamma", "Delta", "Epsilon");
        Observable<String> source = Observable.fromIterable(items);
        source.map(String::length).filter(i -> i >= 5)
                .subscribe(s -> System.out.println("RECEIVED: " + s));
    }
}