RxJava開發精要6
上一章中,我們學到如何轉換可觀測序列。我們也看到了map()
,scan()
,groupBY()
,以及更多有用的函式的實際例子,它們幫助我們操作Observable來建立我們想要的Observable。
本章中,我們將研究組合函式並學習如何同時處理多個Observables來建立我們想要的Observable。
Merge
在非同步的世界經常會建立這樣的場景,我們有多個來源但是隻想有一個結果:多輸入,單輸出。RxJava的merge()
方法將幫助你把兩個甚至更多的Observables合併到他們發射的資料裡。下圖給出了把兩個序列合併在一個最終發射的Observable。
正如你看到的那樣,發射的資料被交叉合併到一個Observable裡面。注意如果你同步的合併Observable,它們將連線在一起並且不會交叉。
像通常一樣,我們用我們的App和已安裝的App列表來建立了一個“真實世界”的例子。我們還需要第二個Observable。我們可以建立一個單獨的應用列表然後逆序。當然沒有實際的意義,只是為了這個例子。第二個列表,我們的loadList()
函式像下面這樣:
private void loadList(List<AppInfo> apps) {
mRecyclerView.setVisibility(View.VISIBLE);
List reversedApps = Lists.reverse(apps);
Observable<AppInfo> observableApps =Observable.from(apps);
Observable<AppInfo> observableReversedApps =Observable.from(reversedApps);
Observable<AppInfo> mergedObserbable = Observable.merge(observableApps,observableReversedApps);
mergedObserbable.subscribe(new Observer<AppInfo>(){
@Override
public void onCompleted() {
mSwipeRefreshLayout.setRefreshing(false);
Toast.makeText(getActivity(), "Here is the list!", Toast.LENGTH_LONG).show();
}
@Override
public void onError(Throwable e) {
Toast.makeText(getActivity(), "One of the two Observable threw an error!" , Toast.LENGTH_SHORT).show();
mSwipeRefreshLayout.setRefreshing(false);
}
@Override
public void onNext(AppInfoappInfo) {
mAddedApps.add(appInfo);
mAdapter.addApplication(mAddedApps.size() - 1, appInfo);
}
});
}
我們建立了Observable和observableApps資料以及新的observableReversedApps逆序列表。使用Observable.merge()
,我們可以建立新的ObservableMergedObservable
在單個可觀測序列中發射源Observables發出的所有資料。
正如你能看到的,每個方法簽名都是一樣的,因此我們的觀察者無需在意任何不同就可以複用程式碼。結果如下:
注意錯誤時的toast訊息,你可以認為每個Observable丟擲的錯誤將會打斷合併。如果你需要避免這種情況,RxJava提供了mergeDelayError()
,它能從一個Observable中繼續發射資料即便是其中有一個丟擲了錯誤。當所有的Observables都完成時,mergeDelayError()
將會發射onError()
,如下圖所示:
ZIP
我們在處理多源時可能會帶來這樣一種場景:多從個Observables接收資料,處理它們,然後將它們合併成一個新的可觀測序列來使用。RxJava有一個特殊的方法可以完成:zip()
合併兩個或者多個Observables發射出的資料項,根據指定的函式Func*
變換它們,併發射一個新值。下圖展示了zip()
方法如何處理髮射的“numbers”和“letters”然後將它們合併一個新的資料項:
對於“真實世界”的例子來說,我們將使用已安裝的應用列表和一個新的動態的Observable來讓例子變得有點有趣味。
Observable<Long> tictoc = Observable.interval(1, TimeUnit.SECONDS);
tictoc
Observable變數使用interval()
函式每秒生成一個Long型別的資料:簡單且高效,正如之前所說的,我們需要一個Func
物件。因為它需要傳兩個引數,所以是Func2
:
private AppInfo updateTitle(AppInfoappInfo, Long time) {
appInfo.setName(time + " " + appInfo.getName());
return appInfo;
}
現在我們的loadList()
函式變成這樣:
private void loadList(List<AppInfo> apps) {
mRecyclerView.setVisibility(View.VISIBLE);
Observable<AppInfo> observableApp = Observable.from(apps);
Observable<Long> tictoc = Observable.interval(1, TimeUnit.SECONDS);
Observable.zip(observableApp, tictoc,
(AppInfo appInfo, Long time) -> updateTitle(appInfo, time))
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer<AppInfo>() {
@Override
public void onCompleted() {
Toast.makeText(getActivity(), "Here is the list!", Toast.LENGTH_LONG).show();
}
@Override
public void onError(Throwable e) {
mSwipeRefreshLayout.setRefreshing(false);
Toast.makeText(getActivity(), "Something went wrong!", Toast.LENGTH_SHORT).show();
}
@Override
public void onNext(AppInfoappInfo) {
if (mSwipeRefreshLayout.isRefreshing()) {
mSwipeRefreshLayout.setRefreshing(false);
}
mAddedApps.add(appInfo);
int position = mAddedApps.size() - 1;
mAdapter.addApplication(position, appInfo);
mRecyclerView.smoothScrollToPosition(position);
}
});
}
正如你看到的那樣,zip()
函式有三個引數:兩個Observables和一個Func2
。
仔細一看會發現observeOn()
函式。它將在下一章中講解:現在我們可以小試一下。
結果如下:
Join
前面兩個方法,zip()
和merge()
方法作用在發射資料的範疇內,在決定如何操作值之前有些場景我們需要考慮時間的。RxJava的join()
函式基於時間視窗將兩個Observables發射的資料結合在一起。
為了正確的理解上一張圖,我們解釋下join()
需要的引數:
- 第二個Observable和源Observable結合。
Func1
引數:在指定的由時間視窗定義時間間隔內,源Observable發射的資料和從第二個Observable發射的資料相互配合返回的Observable。Func1
引數:在指定的由時間視窗定義時間間隔內,第二個Observable發射的資料和從源Observable發射的資料相互配合返回的Observable。Func2
引數:定義已發射的資料如何與新發射的資料項相結合。-
如下練習的例子,我們可以修改loadList()
函式像下面這樣:
private void loadList(List<AppInfo> apps) {
mRecyclerView.setVisibility(View.VISIBLE);
Observable<AppInfo> appsSequence =
Observable.interval(1000, TimeUnit.MILLISECONDS)
.map(position -> {
return apps.get(position.intValue());
});
Observable<Long> tictoc = Observable.interval(1000,TimeUnit.MILLISECONDS);
appsSequence.join(
tictoc,
appInfo -> Observable.timer(2,TimeUnit.SECONDS),
time -> Observable.timer(0, TimeUnit.SECONDS),
this::updateTitle)
.observeOn(AndroidSchedulers.mainThread())
.take(10)
.subscribe(new Observer<AppInfo>() {
@Override
public void onCompleted() {
Toast.makeText(getActivity(), "Here is the list!", Toast.LENGTH_LONG).show();
}
@Override
public void onError(Throwable e) {
mSwipeRefreshLayout.setRefreshing(false);
Toast.makeText(getActivity(), "Something went wrong!", Toast.LENGTH_SHORT).show();
}
@Override
public void onNext(AppInfoappInfo) {
if (mSwipeRefreshLayout.isRefreshing()) {
mSwipeRefreshLayout.setRefreshing(false);
}
mAddedApps.add(appInfo);
int position = mAddedApps.size() - 1;
mAdapter.addApplication(position, appInfo);
mRecyclerView.smoothScrollToPosition(position);
}
});
}
我們有一個新的物件appsSequence
,它是一個每秒從我們已安裝的app列表發射app資料的可觀測序列。tictoc
這個Observable資料每秒只發射一個新的Long
型整數。為了合併它們,我們需要指定兩個Func1
變數:
appInfo -> Observable.timer(2, TimeUnit.SECONDS)
time -> Observable.timer(0, TimeUnit.SECONDS)
上面描述了兩個時間視窗。下面一行描述我們如何使用Func2
將兩個發射的資料結合在一起。
this::updateTitle
結果如下:
它看起來有點亂,但是注意app的名字和我們指定的時間視窗,我們可以看到:一旦第二個資料發射了我們就會將它與源資料結合,但我們用同一個源資料有2秒鐘。這就是為什麼標題重複數字累加的原因。
值得一提的是,為了簡單起見,也有一個join()
操作符作用於字串然後簡單的和發射的字串連線成最終的字串。
combineLatest
RxJava的combineLatest()
函式有點像zip()
函式的特殊形式。正如我們已經學習的,zip()
作用於最近未打包的兩個Observables。相反,combineLatest()
作用於最近發射的資料項:如果Observable1
發射了A並且Observable2
發射了B和C,combineLatest()
將會分組處理AB和AC,如下圖所示:
combineLatest()
函式接受二到九個Observable作為引數,如果有需要的話或者單個Observables列表作為引數。
從之前的例子中把loadList()
函數借用過來,我們可以修改一下來用於combineLatest()
實現“真實世界”這個例子:
private void loadList(List<AppInfo> apps) {
mRecyclerView.setVisibility(View.VISIBLE);
Observable<AppInfo> appsSequence = Observable.interval(1000, TimeUnit.MILLISECONDS)
.map(position ->apps.get(position.intValue()));
Observable<Long> tictoc = Observable.interval(1500, TimeUnit.MILLISECONDS);
Observable.combineLatest(appsSequence, tictoc,
this::updateTitle)
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer<AppInfo>() {
@Override
public void onCompleted() {
Toast.makeText(getActivity(), "Here is the list!", Toast.LENGTH_LONG).show();
}
@Override
public void onError(Throwable e) {
mSwipeRefreshLayout.setRefreshing(false);
Toast.makeText(getActivity(), "Something went wrong!", Toast.LENGTH_SHORT).show();
}
@Override
public void onNext(AppInfoappInfo) {
if (mSwipeRefreshLayout.isRefreshing()) {
mSwipeRefreshLayout.setRefreshing(false);
}
mAddedApps.add(appInfo);
int position = mAddedApps.size() - 1;
mAdapter.addApplication(position, appInfo);
mRecyclerView.smoothScrollToPosition(position);
}
});
}
這我們使用了兩個Observables:一個是每秒鐘從我們已安裝的應用列表發射一個App資料,第二個是每隔1.5秒發射一個Long
型整數。我們將他們結合起來並執行updateTitle()
函式,結果如下:
正如你看到的,由於不同的時間間隔,AppInfo
物件如我們所預料的那樣有時候會重複。
And,Then和When
在將來還有一些zip()
滿足不了的場景。如複雜的架構,或者是僅僅為了個人愛好,你可以使用And/Then/When解決方案。它們在RxJava的joins包下,使用Pattern和Plan作為中介,將發射的資料集合併到一起。
我們的loadList()
函式將會被修改從這樣:
private void loadList(List<AppInfo> apps) {
mRecyclerView.setVisibility(View.VISIBLE);
Observable<AppInfo> observableApp = Observable.from(apps);
Observable<Long> tictoc = Observable.interval(1, TimeUnit.SECONDS);
Pattern2<AppInfo, Long> pattern = JoinObservable.from(observableApp).and(tictoc);
Plan0<AppInfo> plan = pattern.then(this::updateTitle);
JoinObservable
.when(plan)
.toObservable()
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer<AppInfo>() {
@Override
public void onCompleted() {
Toast.makeText(getActivity(), "Here is the list!", Toast.LENGTH_LONG).show();
}
@Override
public void onError(Throwable e) {
mSwipeRefreshLayout.setRefreshing(false);
Toast.makeText(getActivity(), "Something went wrong!", Toast.LENGTH_SHORT).show();
}
@Override
public void onNext(AppInfoappInfo) {
if (mSwipeRefreshLayout.isRefreshing()) {
mSwipeRefreshLayout.setRefreshing(false);
}
mAddedApps.add(appInfo);
int position = mAddedApps.size() - 1;
mAdapter.addApplication(position, appInfo); mRecyclerView.smoothScrollToPosition(position);
}
});
}
和通常一樣,我們有兩個發射的序列,observableApp
,發射我們安裝的應用列表資料,tictoc
每秒發射一個Long
型整數。現在我們用and()
連線源Observable和第二個Observable。
JoinObservable.from(observableApp).and(tictoc);
這裡建立一個pattern
物件,使用這個物件我們可以建立一個Plan
物件:”我們有兩個發射資料的Observables,then()
是做什麼的?”
pattern.then(this::updateTitle);
現在我們有了一個Plan
物件並且當plan發生時我們可以決定接下來發生的事情。
.when(plan).toObservable()
這時候,我們可以訂閱新的Observable,正如我們總是做的那樣。
Switch
有這樣一個複雜的場景就是在一個subscribe-unsubscribe
的序列裡我們能夠從一個Observable自動取消訂閱來訂閱一個新的Observable。
RxJava的switch()
,正如定義的,將一個發射多個Observables的Observable轉換成另一個單獨的Observable,後者發射那些Observables最近發射的資料項。
給出一個發射多個Observables序列的源Observable,switch()
訂閱到源Observable然後開始發射由第一個發射的Observable發射的一樣的資料。當源Observable發射一個新的Observable時,switch()
立即取消訂閱前一個發射資料的Observable(因此打斷了從它那裡發射的資料流)然後訂閱一個新的Observable,並開始發射它的資料。
StartWith
我們已經學到如何連線多個Observables並追加指定的值到一個發射序列裡。RxJava的startWith()
是concat()
的對應部分。正如concat()
向發射資料的Observable追加資料那樣,在Observable開始發射他們的資料之前, startWith()
通過傳遞一個引數來先發射一個數據序列。
總結
這章中,我們學習瞭如何將兩個或者更多個Observable結合來建立一個新的可觀測序列。我們將能夠merge
Observable,join
Observables ,zip
Observables 並在幾種情況下把他們結合在一起。
下一章,我們將介紹排程器,它將很容易的幫助我們建立主執行緒以及提高我們應用程式的效能。我們也將學習如何正確的執行長任務或者I/O任務來獲得更好的效能。