1. 程式人生 > >Rxjava結合操作符—merge、 Join

Rxjava結合操作符—merge、 Join

1、merge

merge可以合併多個發射物
Javadoc: merge(Iterable)
Javadoc: merge(Iterable,int)
Javadoc: merge(Observable[])
Javadoc: merge(Observable,Observable) (接受二到九個Observable)

這裡寫圖片描述

兩個Obserable合併成一個Observable

    Observable<Integer> odds=Observable.just(1,3,5,7);
    Observable<Integer> events=Observable.just
(2,4,6,8); Observable.merge(odds,events).subscribe(i->Log.d("TAG","merge->"+i));

執行結果:

merge->1
merge->3
merge->5
merge->7
merge->2
merge->4
merge->6
merge->8

2、mergeWith

這裡寫圖片描述

這個圖有個叉,表示如果某個階段出錯了,後續的資料就會停止發射了。

    Observable<Integer> odds=Observable.just
(1,3,5,7); Observable<Integer> events=Observable.just(2,4,6,8); odds.mergeWith(events).subscribe(i->Log.d("TAG","merge->"+i));

執行結果同上

3、mergeDelayError

這裡寫圖片描述

    Observable<Integer> odds=Observable.just(1,3,5,7);
    Observable<Integer> events=Observable.just(2,4,6,8
); Observable.mergeDelayError(odds,events).subscribe(i->Log.d("TAG","merge->"+i));

執行結果同上

      merge和mergeWith相當於一個是靜態方法,一個是例項方法。merge如果以onError錯誤終止的話,資料也會相應的停止發射。如果想讓它繼續發射資料,然後才報告錯誤,可以使用mergeDelayError

注: 合併後的新Observables發射時,可以認為任何一個Observable發生錯誤,都將會打斷合併。如果想避免這種情況發生,可以呼叫mergeDelayError()方法,表明它能從一個Observable中繼續發射資料即便是其中有一個丟擲了錯誤。當所有的Observables都完成時,mergeDelayError()將會發射onError()。

4、join

任何時候,只要在另一個Observable發射的資料定義的時間視窗內,這個Observable發射了一條資料,就結合兩個Observable發射的資料
這裡寫圖片描述

Join操作符結合兩個Observable發射的資料,基於時間視窗(你定義的針對每條資料特定的原則)選擇待集合的資料項。你將這些時間視窗實現為一些Observables,它們的生命週期從任何一條Observable發射的每一條資料開始。當這個定義時間視窗的Observable發射了一條資料或者完成時,與這條資料關聯的視窗也會關閉。只要這條資料的視窗是開啟的,它將繼續結合其它Observable發射的任何資料項。你定義一個用於結合資料的函式。

join表示形式:
join(Observable, Func1, Func1, Func2)

join有四個引數分別表示:
Observable:源Observable需要組合的Observable,這裡我們姑且稱之為目標Observable;
Func1:接收從源Observable發射來的資料,並返回一個Observable,這個Observable的宣告週期決定了源Obsrvable發射出來的資料的有效期;
Func1:接收目標Observable發射來的資料,並返回一個Observable,這個Observable的宣告週期決定了目標Obsrvable發射出來的資料的有效期;
Func2:接收從源Observable和目標Observable發射出來的資料,並將這兩個資料組合後返回。


//輸出[0,1,2,3]序列
  Observable<Integer> ob =Observable.create(subscriber -> {
      for (int i = 0; i < 4; i++) {
        subscriber.onNext(i);
        try {
          Thread.sleep(1000);
        } catch (InterruptedException e) {
          e.printStackTrace();
        }
      }
    });

    Observable.just("hello")
        .join(ob, s -> {
          Log.d("TAG",s);
          //使Observable延遲3000毫秒執行
          return Observable.timer(3000, TimeUnit.MILLISECONDS);
        }, integer -> {
          Log.d("TAG",integer+"");
           //使Observable延遲2000毫秒執行
          return Observable.timer(2000, TimeUnit.MILLISECONDS);
          //結合上面發射的資料
        (s, integer) -> s+integer)
        .subscribe(o -> Log.d("TAG",o));

執行結果:

0
hello
hello0
1
hello
hello1
2
hello
hello2
3