Rxjava結合操作符—merge、 Join
1、merge
merge可以合併多個發射物
Javadoc: merge(Iterable)
Javadoc: merge(Iterable,int)
Javadoc: merge(Observable[])
Javadoc: merge(Observable,Observable) (接受二到九個Observable)
兩個Obserable合併成一個Observable
Observable<Integer> odds=Observable.just(1,3,5,7);
Observable<Integer> events=Observable.just (2,4,6,8);
Observable.merge(odds,events).subscribe(i->Log.d("TAG","merge->"+i));
執行結果:
merge->1
merge->3
merge->5
merge->7
merge->2
merge->4
merge->6
merge->8
2、mergeWith
這個圖有個叉,表示如果某個階段出錯了,後續的資料就會停止發射了。
Observable<Integer> odds=Observable.just (1,3,5,7);
Observable<Integer> events=Observable.just(2,4,6,8);
odds.mergeWith(events).subscribe(i->Log.d("TAG","merge->"+i));
執行結果同上
3、mergeDelayError
Observable<Integer> odds=Observable.just(1,3,5,7);
Observable<Integer> events=Observable.just(2,4,6,8 );
Observable.mergeDelayError(odds,events).subscribe(i->Log.d("TAG","merge->"+i));
執行結果同上
merge和mergeWith相當於一個是靜態方法,一個是例項方法。merge如果以onError錯誤終止的話,資料也會相應的停止發射。如果想讓它繼續發射資料,然後才報告錯誤,可以使用mergeDelayError
注: 合併後的新Observables發射時,可以認為任何一個Observable發生錯誤,都將會打斷合併。如果想避免這種情況發生,可以呼叫mergeDelayError()方法,表明它能從一個Observable中繼續發射資料即便是其中有一個丟擲了錯誤。當所有的Observables都完成時,mergeDelayError()將會發射onError()。
4、join
任何時候,只要在另一個Observable發射的資料定義的時間視窗內,這個Observable發射了一條資料,就結合兩個Observable發射的資料
Join操作符結合兩個Observable發射的資料,基於時間視窗(你定義的針對每條資料特定的原則)選擇待集合的資料項。你將這些時間視窗實現為一些Observables,它們的生命週期從任何一條Observable發射的每一條資料開始。當這個定義時間視窗的Observable發射了一條資料或者完成時,與這條資料關聯的視窗也會關閉。只要這條資料的視窗是開啟的,它將繼續結合其它Observable發射的任何資料項。你定義一個用於結合資料的函式。
join表示形式:
join(Observable, Func1, Func1, Func2)
join有四個引數分別表示:
Observable:源Observable需要組合的Observable,這裡我們姑且稱之為目標Observable;
Func1:接收從源Observable發射來的資料,並返回一個Observable,這個Observable的宣告週期決定了源Obsrvable發射出來的資料的有效期;
Func1:接收目標Observable發射來的資料,並返回一個Observable,這個Observable的宣告週期決定了目標Obsrvable發射出來的資料的有效期;
Func2:接收從源Observable和目標Observable發射出來的資料,並將這兩個資料組合後返回。
//輸出[0,1,2,3]序列
Observable<Integer> ob =Observable.create(subscriber -> {
for (int i = 0; i < 4; i++) {
subscriber.onNext(i);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
Observable.just("hello")
.join(ob, s -> {
Log.d("TAG",s);
//使Observable延遲3000毫秒執行
return Observable.timer(3000, TimeUnit.MILLISECONDS);
}, integer -> {
Log.d("TAG",integer+"");
//使Observable延遲2000毫秒執行
return Observable.timer(2000, TimeUnit.MILLISECONDS);
//結合上面發射的資料
(s, integer) -> s+integer)
.subscribe(o -> Log.d("TAG",o));
執行結果:
0
hello
hello0
1
hello
hello1
2
hello
hello2
3