1. 程式人生 > >RxJava開發精要6

RxJava開發精要6

上一章中,我們學到如何轉換可觀測序列。我們也看到了map(),scan(),groupBY(),以及更多有用的函式的實際例子,它們幫助我們操作Observable來建立我們想要的Observable。

本章中,我們將研究組合函式並學習如何同時處理多個Observables來建立我們想要的Observable。

Merge

在非同步的世界經常會建立這樣的場景,我們有多個來源但是隻想有一個結果:多輸入,單輸出。RxJava的merge()方法將幫助你把兩個甚至更多的Observables合併到他們發射的資料裡。下圖給出了把兩個序列合併在一個最終發射的Observable。

正如你看到的那樣,發射的資料被交叉合併到一個Observable裡面。注意如果你同步的合併Observable,它們將連線在一起並且不會交叉。

像通常一樣,我們用我們的App和已安裝的App列表來建立了一個“真實世界”的例子。我們還需要第二個Observable。我們可以建立一個單獨的應用列表然後逆序。當然沒有實際的意義,只是為了這個例子。第二個列表,我們的loadList()函式像下面這樣:

private void loadList(List<AppInfo> apps) {
    mRecyclerView.setVisibility(View.VISIBLE);
    List reversedApps = Lists.reverse(apps);
    Observable<AppInfo> observableApps =Observable.from(apps);
    Observable<AppInfo> observableReversedApps =Observable.from(reversedApps);
    Observable<AppInfo> mergedObserbable = Observable.merge(observableApps,observableReversedApps);

    mergedObserbable.subscribe(new
Observer<AppInfo>(){ @Override public void onCompleted() { mSwipeRefreshLayout.setRefreshing(false); Toast.makeText(getActivity(), "Here is the list!", Toast.LENGTH_LONG).show(); } @Override public void onError(Throwable e) { Toast.makeText(getActivity(), "One of the two Observable threw an error!"
, Toast.LENGTH_SHORT).show(); mSwipeRefreshLayout.setRefreshing(false); } @Override public void onNext(AppInfoappInfo) { mAddedApps.add(appInfo); mAdapter.addApplication(mAddedApps.size() - 1, appInfo); } }); }

我們建立了Observable和observableApps資料以及新的observableReversedApps逆序列表。使用Observable.merge(),我們可以建立新的ObservableMergedObservable在單個可觀測序列中發射源Observables發出的所有資料。

正如你能看到的,每個方法簽名都是一樣的,因此我們的觀察者無需在意任何不同就可以複用程式碼。結果如下:

注意錯誤時的toast訊息,你可以認為每個Observable丟擲的錯誤將會打斷合併。如果你需要避免這種情況,RxJava提供了mergeDelayError(),它能從一個Observable中繼續發射資料即便是其中有一個丟擲了錯誤。當所有的Observables都完成時,mergeDelayError()將會發射onError(),如下圖所示:

ZIP

我們在處理多源時可能會帶來這樣一種場景:多從個Observables接收資料,處理它們,然後將它們合併成一個新的可觀測序列來使用。RxJava有一個特殊的方法可以完成:zip()合併兩個或者多個Observables發射出的資料項,根據指定的函式Func*變換它們,併發射一個新值。下圖展示了zip()方法如何處理髮射的“numbers”和“letters”然後將它們合併一個新的資料項:

對於“真實世界”的例子來說,我們將使用已安裝的應用列表和一個新的動態的Observable來讓例子變得有點有趣味。

Observable<Long> tictoc = Observable.interval(1, TimeUnit.SECONDS);

tictocObservable變數使用interval()函式每秒生成一個Long型別的資料:簡單且高效,正如之前所說的,我們需要一個Func物件。因為它需要傳兩個引數,所以是Func2:

private AppInfo updateTitle(AppInfoappInfo, Long time) {
    appInfo.setName(time + " " + appInfo.getName());
    return appInfo;
}

現在我們的loadList()函式變成這樣:

private void loadList(List<AppInfo> apps) {
    mRecyclerView.setVisibility(View.VISIBLE);
    Observable<AppInfo> observableApp = Observable.from(apps);

    Observable<Long> tictoc = Observable.interval(1, TimeUnit.SECONDS);

    Observable.zip(observableApp, tictoc,
    (AppInfo appInfo, Long time) -> updateTitle(appInfo, time))
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(new Observer<AppInfo>() {
        @Override
        public void onCompleted() {
            Toast.makeText(getActivity(), "Here is the list!", Toast.LENGTH_LONG).show();
        }

        @Override
        public void onError(Throwable e) {
            mSwipeRefreshLayout.setRefreshing(false);
            Toast.makeText(getActivity(), "Something went wrong!", Toast.LENGTH_SHORT).show();
        }

        @Override
        public void onNext(AppInfoappInfo) {
            if (mSwipeRefreshLayout.isRefreshing()) {
                mSwipeRefreshLayout.setRefreshing(false);
            } 
            mAddedApps.add(appInfo);
            int position = mAddedApps.size() - 1;
            mAdapter.addApplication(position, appInfo);
            mRecyclerView.smoothScrollToPosition(position);
        }
    });
}

正如你看到的那樣,zip()函式有三個引數:兩個Observables和一個Func2

仔細一看會發現observeOn()函式。它將在下一章中講解:現在我們可以小試一下。

結果如下:

Join

前面兩個方法,zip()merge()方法作用在發射資料的範疇內,在決定如何操作值之前有些場景我們需要考慮時間的。RxJava的join()函式基於時間視窗將兩個Observables發射的資料結合在一起。

為了正確的理解上一張圖,我們解釋下join()需要的引數:

  • 第二個Observable和源Observable結合。
  • Func1引數:在指定的由時間視窗定義時間間隔內,源Observable發射的資料和從第二個Observable發射的資料相互配合返回的Observable。
  • Func1引數:在指定的由時間視窗定義時間間隔內,第二個Observable發射的資料和從源Observable發射的資料相互配合返回的Observable。
  • Func2引數:定義已發射的資料如何與新發射的資料項相結合。

  • 如下練習的例子,我們可以修改loadList()函式像下面這樣:
private void loadList(List<AppInfo> apps) {
    mRecyclerView.setVisibility(View.VISIBLE);

    Observable<AppInfo> appsSequence =
    Observable.interval(1000, TimeUnit.MILLISECONDS)
                .map(position -> {
                    return apps.get(position.intValue());
                });

    Observable<Long> tictoc = Observable.interval(1000,TimeUnit.MILLISECONDS);

    appsSequence.join(
        tictoc, 
        appInfo -> Observable.timer(2,TimeUnit.SECONDS),
        time -> Observable.timer(0, TimeUnit.SECONDS),
        this::updateTitle)
        .observeOn(AndroidSchedulers.mainThread())
        .take(10)
        .subscribe(new Observer<AppInfo>() {
            @Override
            public void onCompleted() {
                Toast.makeText(getActivity(), "Here is the list!", Toast.LENGTH_LONG).show();
            }

            @Override
            public void onError(Throwable e) {
                mSwipeRefreshLayout.setRefreshing(false); 
                Toast.makeText(getActivity(), "Something went wrong!", Toast.LENGTH_SHORT).show();
            }

            @Override
            public void onNext(AppInfoappInfo) {
                if (mSwipeRefreshLayout.isRefreshing()) {
                    mSwipeRefreshLayout.setRefreshing(false);
                } 
                mAddedApps.add(appInfo);
                int position = mAddedApps.size() - 1;
                mAdapter.addApplication(position, appInfo);
                mRecyclerView.smoothScrollToPosition(position);
            } 
        });
}

我們有一個新的物件appsSequence,它是一個每秒從我們已安裝的app列表發射app資料的可觀測序列。tictoc這個Observable資料每秒只發射一個新的Long型整數。為了合併它們,我們需要指定兩個Func1變數:

appInfo -> Observable.timer(2, TimeUnit.SECONDS)

time -> Observable.timer(0, TimeUnit.SECONDS)

上面描述了兩個時間視窗。下面一行描述我們如何使用Func2將兩個發射的資料結合在一起。

this::updateTitle

結果如下:

它看起來有點亂,但是注意app的名字和我們指定的時間視窗,我們可以看到:一旦第二個資料發射了我們就會將它與源資料結合,但我們用同一個源資料有2秒鐘。這就是為什麼標題重複數字累加的原因。

值得一提的是,為了簡單起見,也有一個join()操作符作用於字串然後簡單的和發射的字串連線成最終的字串。

combineLatest

RxJava的combineLatest()函式有點像zip()函式的特殊形式。正如我們已經學習的,zip()作用於最近未打包的兩個Observables。相反,combineLatest()作用於最近發射的資料項:如果Observable1發射了A並且Observable2發射了B和C,combineLatest()將會分組處理AB和AC,如下圖所示:

combineLatest()函式接受二到九個Observable作為引數,如果有需要的話或者單個Observables列表作為引數。

從之前的例子中把loadList()函數借用過來,我們可以修改一下來用於combineLatest()實現“真實世界”這個例子:

private void loadList(List<AppInfo> apps) {
    mRecyclerView.setVisibility(View.VISIBLE);
    Observable<AppInfo> appsSequence = Observable.interval(1000, TimeUnit.MILLISECONDS)
              .map(position ->apps.get(position.intValue()));
    Observable<Long> tictoc = Observable.interval(1500, TimeUnit.MILLISECONDS);
    Observable.combineLatest(appsSequence, tictoc,
               this::updateTitle)
       .observeOn(AndroidSchedulers.mainThread())
        .subscribe(new Observer<AppInfo>() {

        @Override
        public void onCompleted() {
            Toast.makeText(getActivity(), "Here is the list!", Toast.LENGTH_LONG).show();
        }

        @Override
        public void onError(Throwable e) {
            mSwipeRefreshLayout.setRefreshing(false);
            Toast.makeText(getActivity(), "Something went wrong!", Toast.LENGTH_SHORT).show();
        }

        @Override
        public void onNext(AppInfoappInfo) {
            if (mSwipeRefreshLayout.isRefreshing()) {
                mSwipeRefreshLayout.setRefreshing(false);
            } 
            mAddedApps.add(appInfo);
            int position = mAddedApps.size() - 1;
            mAdapter.addApplication(position, appInfo);
            mRecyclerView.smoothScrollToPosition(position);
        } 
    });
}

這我們使用了兩個Observables:一個是每秒鐘從我們已安裝的應用列表發射一個App資料,第二個是每隔1.5秒發射一個Long型整數。我們將他們結合起來並執行updateTitle()函式,結果如下:

正如你看到的,由於不同的時間間隔,AppInfo物件如我們所預料的那樣有時候會重複。

And,Then和When

在將來還有一些zip()滿足不了的場景。如複雜的架構,或者是僅僅為了個人愛好,你可以使用And/Then/When解決方案。它們在RxJava的joins包下,使用Pattern和Plan作為中介,將發射的資料集合併到一起。

我們的loadList()函式將會被修改從這樣:

private void loadList(List<AppInfo> apps) {

    mRecyclerView.setVisibility(View.VISIBLE);

    Observable<AppInfo> observableApp = Observable.from(apps);

    Observable<Long> tictoc = Observable.interval(1, TimeUnit.SECONDS);

    Pattern2<AppInfo, Long> pattern = JoinObservable.from(observableApp).and(tictoc); 

    Plan0<AppInfo> plan = pattern.then(this::updateTitle);

    JoinObservable
        .when(plan)
        .toObservable()
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe(new Observer<AppInfo>() {

            @Override
            public void onCompleted() {
                Toast.makeText(getActivity(), "Here is the list!", Toast.LENGTH_LONG).show();
            }

            @Override
            public void onError(Throwable e) {
                mSwipeRefreshLayout.setRefreshing(false); 
                Toast.makeText(getActivity(), "Something went wrong!", Toast.LENGTH_SHORT).show();
            }

            @Override
            public void onNext(AppInfoappInfo) {
                if (mSwipeRefreshLayout.isRefreshing()) { 
                mSwipeRefreshLayout.setRefreshing(false);
                } 
                mAddedApps.add(appInfo);
                int position = mAddedApps.size() - 1;
                mAdapter.addApplication(position, appInfo); mRecyclerView.smoothScrollToPosition(position);
            } 
        });
}

和通常一樣,我們有兩個發射的序列,observableApp,發射我們安裝的應用列表資料,tictoc每秒發射一個Long型整數。現在我們用and()連線源Observable和第二個Observable。

JoinObservable.from(observableApp).and(tictoc);

這裡建立一個pattern物件,使用這個物件我們可以建立一個Plan物件:”我們有兩個發射資料的Observables,then()是做什麼的?”

pattern.then(this::updateTitle);

現在我們有了一個Plan物件並且當plan發生時我們可以決定接下來發生的事情。

.when(plan).toObservable()

這時候,我們可以訂閱新的Observable,正如我們總是做的那樣。

Switch

有這樣一個複雜的場景就是在一個subscribe-unsubscribe的序列裡我們能夠從一個Observable自動取消訂閱來訂閱一個新的Observable。

RxJava的switch(),正如定義的,將一個發射多個Observables的Observable轉換成另一個單獨的Observable,後者發射那些Observables最近發射的資料項。

給出一個發射多個Observables序列的源Observable,switch()訂閱到源Observable然後開始發射由第一個發射的Observable發射的一樣的資料。當源Observable發射一個新的Observable時,switch()立即取消訂閱前一個發射資料的Observable(因此打斷了從它那裡發射的資料流)然後訂閱一個新的Observable,並開始發射它的資料。

StartWith

我們已經學到如何連線多個Observables並追加指定的值到一個發射序列裡。RxJava的startWith()concat()的對應部分。正如concat()向發射資料的Observable追加資料那樣,在Observable開始發射他們的資料之前, startWith()通過傳遞一個引數來先發射一個數據序列。

總結

這章中,我們學習瞭如何將兩個或者更多個Observable結合來建立一個新的可觀測序列。我們將能夠merge Observable,join Observables ,zip Observables 並在幾種情況下把他們結合在一起。

下一章,我們將介紹排程器,它將很容易的幫助我們建立主執行緒以及提高我們應用程式的效能。我們也將學習如何正確的執行長任務或者I/O任務來獲得更好的效能。