1. 程式人生 > >RxAndroid使用文件(New)

RxAndroid使用文件(New)

1 概述

RxJava 一個在 Java VM 上使用可觀測的序列來組成非同步的、基於事件的程式的庫.響應式程式設計是一種基於非同步資料流概念的程式設計模式。資料流就像一條河:它可以被觀測,被過濾,被操作,或者為新的消費者與另外一條流合併為一條新的流。

Rx並不是一種新的語言,而是一種普通的Java模式,類似於觀察者模式(Observer Pattern),可以將它看作一個普通的Java類庫。而RxAndroid是RxJava的一個針對Android平臺的擴充套件,主要用於 Android 開發。

1.1 RxJava 有幾個基本概念:

  1. Observable 發射源 (可觀察者,即被觀察者)
  2. Observer 接收源(觀察者)
  3. Subscriber:Subscriber實現了Observer和Subscription介面,所以比Observer多了一個方法unsubscribe( ),用來取消訂閱。
  4. Subject:一個比較特殊的物件,既可充當發射源,也可充當接收源
  5. Subscription :Observable呼叫subscribe( )方法返回的物件,同樣有unsubscribe()方法,可以用來取消訂閱事件;
  6. Action0:RxJava中的一個介面,它只有一個無參call()方法,且無返回值,同樣還有Action1,Action2…Action9等,Action1封裝了含有 1 個參的call()方法,即call(T t),Action2封裝了含有 2 個引數的call方法,即call(T1 t1,T2 t2),以此類推;
  7. Func0:與Action0非常相似,也有call()方法,但是它是有返回值的,同樣也有Func0、Func1…Func9;
  8. subscribe() 訂閱方法,subscribe() 之後, Observable 會持有 Subscriber 的引用,這個引用如果不能及時被釋放,將有記憶體洩露的風險。
  9. unsubscribe() 取消訂閱方法,在這個方法被呼叫後,Subscriber 將不再接收事件。要在不再使用的時候儘快在合適的地方(例如 onPause() onStop() 等方法中)呼叫 unsubscribe() 來解除引用關係,以避免記憶體洩露的發生。

Observable 和 Observer 通過 subscribe() 方法實現訂閱關係,從而 Observable 可以在需要的時候發出事件來通知 Observer。

1.2 RxJava的優點

  1. 建立:Rx可以方便的建立事件流和資料流
  2. 組合:Rx使用查詢式的操作符組合和變換資料流
  3. 監聽:Rx可以訂閱任何可觀察的資料流並執行操作
  4. 函式式風格:對可觀察資料流使用無副作用的輸入輸出函式,避免了程式裡錯綜複雜的狀態
  5. 簡化程式碼:Rx的操作符通通常可以將複雜的難題簡化為很少的幾行程式碼
  6. 非同步錯誤處理:傳統的try/catch沒辦法處理非同步計算,Rx提供了合適的錯誤處理機制
  7. 輕鬆使用併發:Rx的Observables和Schedulers讓開發者可以擺脫底層的執行緒同步和各種併發問題

2 Observable

Observable 即被觀察者,它決定什麼時候觸發事件以及觸發怎樣的事件。

一個Observable可以發出零個或者多個事件,直到結束或者出錯。每發出一個事件,就會呼叫它subscribe的Subscriber的onNext方法,最後呼叫Subscriber.onCompleted()完成或者Subscriber.onError()出錯而結束。

下面看看RxJava提供的建立Observable的方法:

2.1 create

新建一個Observables.create方法中傳入了一個 OnSubscribe 物件作為引數,OnSubscribe 會被儲存在返回的 Observable 物件中,它的作用相當於一個計劃表。當 Observable 被訂閱(subscribe)的時候,OnSubscribe 的 call() 方法會自動被呼叫,事件序列就會依照call中設定依次觸發.

public void testCreate(View view) {
   mObservable = Observable.create(new Observable.OnSubscribe<String>() {
       @Override
       public void call(Subscriber<? super String> subscriber) {
           if (!subscriber.isUnsubscribed()) {
               subscriber.onNext("Hello");
               subscriber.onNext("World");
               subscriber.onCompleted();
           }
       }
   });
}

2.1 subscribe()訂閱

由於建立後需要訂閱了才能看到效果,這裡初步看看訂閱方法。subscribe()註冊 SubscriberObservable .

subscribe()方法做了三件事:

  1. 呼叫 Subscriber.onStart()
  2. 呼叫 Observable 中的 OnSubscribe.call(Subscriber)
  3. 將傳入的 Subscriber 作為 Subscription 返回
public void testSubscribe(View view) {
   if (mObservable != null) {
       mObservable.subscribe(new Subscriber<String>() {
           @Override
           public void onStart() {
           /** start方法不是必須的,call方法之前呼叫。
             * 在subscribe 所發生的執行緒被呼叫,不能指定執行緒
             */
               Log.e("testSubscribe", "onStart" + Thread.currentThread().getId());
               super.onStart();
           }

           @Override
           public void onCompleted() {
               Log.e("testSubscribe", "onCompleted" + Thread.currentThread().getId());
           }

           @Override
           public void onError(Throwable e) {
                e.printStackTrace();
           }

           @Override
           public void onNext(String s) {
               Log.e("testSubscribe", s + Thread.currentThread().getId());
           }
       });
   }
}

2.2 forEach

forEach方法是簡化版的subscribe,無返回值。通過 forEach 可以處理 Observable 每個發射出來的資料。且是非阻塞執行的。forEach一般用於遍歷所有元素,然後處理。

public void forEach(View v) {
    Observable.interval(500, TimeUnit.MILLISECONDS)
            .take(6)
            .forEach(LogUtils::e);
    LogUtils.e("over");
}

非阻塞的,所以先打印出後面的內容。

02-18 00:50:16.921 15095-15095/com.felix.testrxjava E/LogUtils: over
02-18 00:50:17.420 15095-15290/com.felix.testrxjava E/LogUtils:  0
02-18 00:50:17.920 15095-15290/com.felix.testrxjava E/LogUtils:  1
02-18 00:50:18.420 15095-15290/com.felix.testrxjava E/LogUtils:  2
02-18 00:50:18.919 15095-15290/com.felix.testrxjava E/LogUtils:  3
02-18 00:50:19.420 15095-15290/com.felix.testrxjava E/LogUtils:  4
02-18 00:50:19.919 15095-15290/com.felix.testrxjava E/LogUtils:  5

2.3 just(T…)

just()方法可以傳入一到九個引數,它們會按照傳入的引數的順序來發射它們。just()方法也可以接受列表或陣列,它將會發射整個列表。通常,當我們想發射一組已經定義好的值時會用到它。但是如果我們的函式不是時變性的,我們可以用just來建立一個更有組織性和可測性的程式碼庫。

 public void testJust(View view) {
        mObservable = Observable.just("hello", "World");
    }

2.4 from(T[] t) / from(Iterable

Observable.from(Executors.newFixedThreadPool(3).submit(new Callable<String>() {
       @Override
       public String call() throws Exception {
           Thread.sleep(5000);
           return "result";
       }
   }))
      .subscribe(new Action1<String>() {
          @Override
          public void call(String s) {
              Log.e("", s);
          }
      });
}

2.5 defer(Func0

 public void testDefer(View view) {
   count = 1;
   //just方法
   Observable<Integer> justObservable = Observable.just(count);
   count = 2;
   //just的訂閱
   justObservable.subscribe(new Action1<Integer>() {
       @Override
       public void call(Integer integer) {
           Log.e("testDefer", "just   " + integer);
       }
   });
   count = 3;
   //defer方法
   Observable<Integer> testDefer = Observable.defer(new Func0<Observable<Integer>>() {
       @Override
       public Observable<Integer> call() {
           //注意此處的call方法沒有Subscriber引數
           return Observable.just(count);
       }
   });
   count = 4;
   //defer訂閱
   testDefer.subscribe(new Subscriber<Integer>() {
       @Override
       public void onCompleted() {
           Log.e("testDefer", "onCompleted");
       }

       @Override
       public void onError(Throwable e) {
           Log.e("testDefer", "onError   " + e.getMessage());
       }

       @Override
       public void onNext(Integer i) {
           Log.e("testDefer", "defer " + i);
       }
   });
   count = 5;
   //defer訂閱2
   testDefer.subscribe(new Action1<Integer>() {
       @Override
       public void call(Integer integer) {
           Log.e("testDefer", "Action1 defer " + integer);
       }
   });
}

以上示例將just和defer放在一起做了一個對比。我們看看列印的值。just等操作符是在建立的時候就已經生成了Observable,而defer是在subscribe的時候才建立,而且每次訂閱都會新建立一個,以保證當前使用的是最新的值。

com.felix.testrxjava E/testDefer: just   1
com.felix.testrxjava E/testDefer: defer 4
com.felix.testrxjava E/testDefer: onCompleted
com.felix.testrxjava E/testDefer: Action1 defer 5

2.6 interval/timeInterval

  1. interval建立一個按固定時間間隔發射整數序列的Observable,可用作定時器.interval()有一個三個引數的過載方法,可以傳入Scheduler排程器,預設使用的是Schedulers.computation()。

    public void testInterval(View view) {
        //每隔2s傳送一次
       final Subscription subscription = Observable.interval(2, TimeUnit.SECONDS)
               .subscribe(new Action1<Long>() {
                   @Override
                   public void call(Long aLong) {
                       Log.e("testInterval", "  " + aLong);
                   }
               });
    
        //延遲15s取消訂閱
       new Handler().postDelayed(new Runnable() {
           @Override
           public void run() {
               subscription.unsubscribe();
           }
       }, 15 * 1000);
    }
  2. timeInterval將原始Observable轉換為另一個Obserervable,後者發射一個標誌替換前者的資料項,這個標誌表示前者的兩個連續發射物之間流逝的時間長度。新的Observable的第一個發射物表示的是在觀察者訂閱原始Observable到原始Observable發射它的第一項資料之間流逝的時間長度。不存在與原始Observable發射最後一項資料和發射onCompleted通知之間時長對應的發射物。timeInterval預設在immediate排程器上執行,你可以通過傳引數修改。

    public void timeInterval(View view) {
        Observable.create(subscriber -> {
            for (int i = 0; i < 5; i++) {
                SystemClock.sleep(i * 1000);
                subscriber.onNext("aaaa " + i);
            }
            subscriber.onCompleted();
        }).timeInterval().cast(TimeInterval.class)
                .subscribe(x -> Log.e("timeInterval", x.getIntervalInMilliseconds() + "++" + x.getValue()));
    }

    看看輸出,getIntervalInMilliseconds,返回的間隔的毫秒值,getValue返回的是上一個Observable丟擲來的值。

    02-16 22:47:03.748 21569-21569/com.felix.testrxjava E/timeInterval: 0++aaaa 0
    02-16 22:47:04.749 21569-21569/com.felix.testrxjava E/timeInterval: 1001++aaaa 1
    02-16 22:47:06.749 21569-21569/com.felix.testrxjava E/timeInterval: 2000++aaaa 2
    02-16 22:47:09.750 21569-21569/com.felix.testrxjava E/timeInterval: 3001++aaaa 3
    02-16 22:47:13.750 21569-21569/com.felix.testrxjava E/timeInterval: 4000++aaaa 4

2.7 range

建立一個發射特定整數序列的Observable:第一個引數為起始值;第二個為傳送的個數,如果為0則不傳送,負數則拋異常,大於int型別的最大值也會拋異常。

/**
     * 發射從1開始的10個數字
     *
     * @param view
     */
public void testRange(View view) {
   Observable.range(1, 10)
      .subscribe(new Action1<Integer>() {
          @Override
          public void call(Integer integer) {
              Log.e("testRange", " " + integer);
          }
      });
}

2.8 timer

建立一個Observable,它在一個給定的延遲後發射一個特殊的值(一般是0),等同於Android中Handler的postDelay()。

timer()有一個三個引數的過載方法,可以傳入Scheduler排程器,預設使用的是Schedulers.computation()。

public void testTimer(View view) {
   Observable.timer(2, TimeUnit.SECONDS)
      .subscribe(new Action1<Long>() {
          @Override
          public void call(Long aLong) {
              Log.e("testRange", " " + aLong);
          }
      });
}

2.9 repeat

建立重複發射特定的資料或資料序列的Observable

2.10 start

它接受一個函式作為引數,呼叫這個函式獲取一個值,然後返回一個會發射這個值給後續觀察者的Observable。Start操作符的多種RxJava實現都屬於可選的rxjava-async模組。

注意:這個函式只會被執行一次,即使多個觀察者訂閱這個返回的Observable。

2.11 empty

不呼叫onNext(),直接呼叫onComplete(),這裡onStart方法也會呼叫。

 public void testEmpty(View view) {
   mObservable = Observable.empty();
}

2.12 never

建立一個不發射資料並且也永遠不會結束的Observable。只有onStart方法也會被呼叫。

 public void testNever(View view) {
   mObservable = Observable.never();
   mObservable.subscribe(new Subscriber<String>() {

       @Override
       public void onStart() {
           Log.e("testNever", "onStart");
       }

       @Override
       public void onCompleted() {
           Log.e("testNever", "onCompleted");
       }

       @Override
       public void onError(Throwable e) {
           e.printStackTrace();
       }

       @Override
       public void onNext(String s) {
           Log.e("testNever", "onNext  " + s);
       }
   });
}

2.13 error

建立一個不發射資料並且以錯誤結束的Observable。 只會回撥onStart和onError方法。

 public void testError(View view) {
   mObservable = Observable.error(new Exception("error test"));
}

2.14 using

using操作符讓你可以指示Observable建立一個只在它的生命週期記憶體在的資源,當Observable終止時這個資源會被自動釋放。

using操作符接受三個引數:

  1. Func0,一個使用者建立一次性資源的工廠函式
  2. Func1,一個用於建立Observable的工廠函式
  3. Action1,一個用於釋放資源的函式

當一個觀察者訂閱using返回的Observable時,using將會使用Observable工廠函式建立觀察者要觀察的Observable,同時使用資源工廠函式建立一個你想要建立的資源。當觀察者取消訂閱這個Observable時,或者當觀察者終止時(無論是正常終止還是因錯誤而終止),using使用第三個函式釋放它建立的資源。

我們來看一個例子

public void using(View view) {
    Observable.using(
            () -> new TestUsingThread(),
            t -> Observable.timer(10, TimeUnit.SECONDS),
            resource -> resource.stopThread()
    ).subscribe(
            LogUtils::e,
            e -> e.printStackTrace(),
            () -> LogUtils.e("completed")
    );
}

class TestUsingThread extends Thread {
    private volatile boolean flag = true;
    int index = 0;
    public void stopThread() {
        this.flag = false;
    }
    public TestUsingThread() {
        this.start();
    }
    @Override
    public void run() {
        while (flag) {
            Log.e("=====", index++ + "");
            SystemClock.sleep(index * 1000);
        }
    }
}

測試執行緒一直執行,知道時間達到10s,呼叫關閉方法關閉執行緒,這裡看到執行緒停止運行了。用來關閉資源很好

02-18 00:39:46.087 29094-29417/com.felix.testrxjava E/=====: 0
02-18 00:39:47.088 29094-29417/com.felix.testrxjava E/=====: 1
02-18 00:39:49.088 29094-29417/com.felix.testrxjava E/=====: 2
02-18 00:39:52.088 29094-29417/com.felix.testrxjava E/=====: 3
02-18 00:39:56.090 29094-29417/com.felix.testrxjava E/=====: 4
02-18 00:39:56.103 29094-29419/com.felix.testrxjava E/LogUtils:  0
02-18 00:39:56.104 29094-29419/com.felix.testrxjava E/LogUtils: completed

3 Single

Single類似於Observable,不同的是,它總是隻發射一個值,或者一個錯誤通知,而不是發射一系列的值。訂閱Single只需要如下兩個方法, 需要注意Single是沒有onStart方法.

onSuccess - Single發射單個的值到這個方法.
onError - 如果無法發射需要的值,Single發射一個Throwable物件到這個方法

3.1 建立

Single的建立方式和Observable基本類似,Observable支援的方法,Single基本都支援。

使用示例如下:

 public void testSingle(View view) {
   Single<String> single = Single.create(new Single.OnSubscribe<String>() {
       @Override
       public void call(SingleSubscriber<? super String> singleSubscriber) {
           if (!singleSubscriber.isUnsubscribed()) {
               if (SystemClock.currentThreadTimeMillis() % 2 == 0) {
                   singleSubscriber.onSuccess("Hello");
                   singleSubscriber.onSuccess("World");
               } else {
                   singleSubscriber.onError(new Exception("sth error"));
               }
           }
       }
   });

   single.subscribe(new SingleSubscriber<String>() {

       @Override
       public void onSuccess(String value) {
           Log.e("###", value);
       }

       @Override
       public void onError(Throwable error) {
           error.printStackTrace();
       }
   });
}

3.2 concatWith: Single轉化為Observable

4 Observer

Observer 即觀察者,它決定事件觸發的時候將有怎樣的行為。

Observer 是一個介面,包含了三個方法

  1. onNext() 普通事件的回撥。

  2. onCompleted(): 事件佇列完結。RxJava 不僅把每個事件單獨處理,還會把它們看做一個佇列。RxJava 規定,當不會再有新的 onNext() 發出時,需要觸發 onCompleted() 方法作為標誌。

  3. onError(): 事件佇列異常。在事件處理過程中出異常時,onError() 會被觸發,同時佇列自動終止,不允許再有事件發出。

在一個正確執行的事件序列中, onCompleted() 和 onError() 有且只有一個,並且是事件序列中的最後一個。需要注意的是,onCompleted() 和 onError() 二者也是互斥的,即在佇列中呼叫了其中一個,就不應該再呼叫另一個。

一般不會直接使用Observer介面,而是使用實現了其介面的抽象類Subscriber,Observer用法如下:

public void testObserver(View view) {
   Observable.just(12)
           .subscribe(new Observer<Integer>() {
               @Override
               public void onCompleted() {
                   Log.e(TAG, "" + "onCompleted");
               }

               @Override
               public void onError(Throwable e) {
                   e.printStackTrace();
               }

               @Override
               public void onNext(Integer integer) {
                   Log.e(TAG, "" + integer);
               }
           });
}

5 Subscriber

Subscriber 是一個實現了 Observer 和 Subscription 兩個介面的抽象類。 Subscriber 對 Observer 介面進行了一些擴充套件,但他們的基本使用方式是完全一樣的:

Subscriber與Observer相比

  1. Subscriber多了一個onStart方法
  2. Subscriber多了一個unsubscribe()方法,用於取消訂閱
  3. Subscriber多了一個isUnsubscribed()方法,用於判斷訂閱狀態

6 Subject

Subject 是一個神奇的物件,它可以是一個Observable同時也可以是一個Observer。Subject是一個抽象類,繼承了Observable的同時也實現了Observer介面。
常用的Subject實現類有以下幾個:

6.1 PublishSubject

PublishSubject只會把在訂閱發生的時間點之後來自原始Observable的資料發射給觀察者。需要注意的是,PublishSubject可能會一建立完成就立刻開始發射資料(除非你可以阻止它發生),因此這裡有一個風險:在PublishSubject被建立後到有觀察者訂閱它之前這個時間段內,一個或多個數據可能會丟失。

public void publishSubject(View view) {
   //create方法是無參的,需要通過後續的onNext,onComplete,onError等方法傳送資料
   PublishSubject<String> subject = PublishSubject.create();
   subject.onNext("1.  Hello World");
   subject.subscribe(new Subscriber<String>() {
       @Override
       public void onCompleted() {
           Log.e("PublishSubject", "onCompleted");
       }

       @Override
       public void onError(Throwable e) {
           Log.e("PublishSubject", "onError    " + e.getMessage());

       }

       @Override
       public void onNext(String s) {
           Log.e("PublishSubject", "onNext " + s);
       }
   });
   subject.onNext("2.This is Felix");
   subject.onCompleted();
}

以上demo列印結果如下,只有訂閱以後的訊息推送才會接收到,這裡需要注意訂閱之前的subject.onCompleted/subject.onError回撥,也會在訂閱之後收到,標誌此Observable結束

com.felix.testrxjava E/PublishSubject: onNext 2.This is Felix
com.felix.testrxjava E/PublishSubject: onCompleted

PublishSubject一般用於建立連線Observables並且同時可被觀測的實體。比如為公共資源建立獨立、抽象或更易觀測的點這種場景。例如如下場景,我們需要監測一個內部請求的結果,不管成功失敗。

//建立一個PublishSubject用於接收最終的結果
final PublishSubject<Boolean> publishSubject = PublishSubject.create();
publishSubject.subscribe(new Action1<Boolean>() {
     @Override
     public void call(Boolean aBoolean) {
         //最終的結果在這裡接收監聽並處理
         Log.e("PublishSubject", "subscribe-- " + aBoolean);
     }
});
    //建立一個私有的Observable,只有內部的變數才能訪問到
Observable.create(new Observable.OnSubscribe<Integer>() {
     @Override
     public void call(Subscriber<? super Integer> subscriber) {
         for (int i = 0; i < 10; i++) {
             subscriber.onNext(i);
         }
         subscriber.onCompleted();
     }
}).doOnCompleted(new Action0() {
     ////當Observable結束時要會呼叫這裡
     @Override
     public void call() {
         publishSubject.onNext(true);
     }
}).subscribe();//空的訂閱表示不關注中間過程

以上執行的結果,就達到了只監控最終結果的目的。

com.felix.testrxjava E/PublishSubject: subscribe-- true

6.2 BehaviorSubject

當觀察者訂閱BehaviorSubject時,它開始發射原始Observable最近發射的資料(如果此時還沒有收到任何資料,它會發射一個預設值),然後繼續發射訂閱後的資料流。

public void behaviorSubject(View view) {
   Integer integer = 12;
   BehaviorSubject<Integer> behaviorSubject =BehaviorSubject.create(integer);       
   behaviorSubject.subscribe(new Action1<Integer>() {
       @Override
       public void call(Integer integer) {
           Log.e("BehaviorSubject1", "1.  " + integer);
       }
   });
   behaviorSubject.onNext(11);
   behaviorSubject.onNext(23);
   behaviorSubject.onNext(58);
   behaviorSubject.subscribe(new Action1<Integer>() {
       @Override
       public void call(Integer integer) {
           Log.e("BehaviorSubject2", "2.  " + integer);
       }
   });
}

以上例子,BehaviorSubject.create(),不傳引數的時候,是不會發送預設值的。輸出如下:

com.felix.testrxjava E/BehaviorSubject1: 1.  11
com.felix.testrxjava E/BehaviorSubject1: 1.  23
com.felix.testrxjava E/BehaviorSubject1: 1.  58
com.felix.testrxjava E/BehaviorSubject2: 2.  58

帶一個引數的時候,其內部會預設認為此引數是預設值的,也就會發送預設值即使這個引數為null。我們看看原始碼

//無參方法,預設第一個引數是null,第二個引數是false
 public static <T> BehaviorSubject<T> create() {
     return create(null, false);
}

//帶有預設值的方法,預設第二個引數是true
public static <T> BehaviorSubject<T> create(T defaultValue) {
        return create(defaultValue, true);
}

private static <T> BehaviorSubject<T> create(T defaultValue, boolean hasDefault) {
   final SubjectSubscriptionManager<T> state = new SubjectSubscriptionManager<T>();
    if (hasDefault){
          state.setLatest(NotificationLite.instance().next(defaultValue));
   }
   state.onAdded = new Action1<SubjectObserver<T>>() {
       @Override
       public void call(SubjectObserver<T> o) {
           o.emitFirst(state.getLatest(), state.nl);
       }    
   };
   state.onTerminated = state.onAdded;
   return new BehaviorSubject<T>(state, state); 
}

6.3 ReplaySubject

ReplaySubject會快取它所訂閱的所有資料,向任意一個訂閱它的觀察者重發。

public void replaySubject(View view) {
   ReplaySubject<Integer> subject = ReplaySubject.create();
   for (int i = 0; i < 3; i++) {
       subject.onNext(i);
   }
   subject.subscribe(new Subscriber<Integer>() {
       @Override
       public void onCompleted() {
           Log.e("replaySubject", "onCompleted");
       }

       @Override
       public void onError(Throwable e) {
           Log.e("replaySubject", "onError " + e.getMessage());
       }

       @Override
       public void onNext(Integer integer) {
           Log.e("replaySubject", "onError " + integer);
       }
   });
   for (int i = 0; i < 5; i++) {
       subject.onNext(i);
   }
   subject.subscribe(new Action1<Integer>() {
       @Override
       public void call(Integer integer) {
           Log.e("replaySubject2", "onError2   " + integer);
       }
   });
}

以上執行的結果將都打印出所有迴圈的資料,由於沒有執行取消訂閱unsubscribe方法。

com.felix.testrxjava E/replaySubject: onError 0
com.felix.testrxjava E/replaySubject: onError 1
com.felix.testrxjava E/replaySubject: onError 2
com.felix.testrxjava E/replaySubject: onError 0
com.felix.testrxjava E/replaySubject: onError 1
com.felix.testrxjava E/replaySubject: onError 2
com.felix.testrxjava E/replaySubject: onError 3
com.felix.testrxjava E/replaySubject: onError 4
com.felix.testrxjava E/replaySubject: onError2 0
com.felix.testrxjava E/replaySubject: onError2 1
com.felix.testrxjava E/replaySubject: onError2 2
com.felix.testrxjava E/replaySubject: onError2 0
com.felix.testrxjava E/replaySubject: onError2 1
com.felix.testrxjava E/replaySubject: onError2 2
com.felix.testrxjava E/replaySubject: onError2 3
com.felix.testrxjava E/replaySubject: onError2 4

如果你把ReplaySubject當作一個觀察者使用,注意不要從多個執行緒中呼叫它的onNext方法(包括其它的on系列方法),這可能導致同時(非順序)呼叫,這會違反Observable協議,給Subject的結果增加了不確定性。

6.4 AsyncSubject

一個AsyncSubject只在原始Observable完成後,發射來自原始Observable的最後一個值。(如果原始Observable沒有發射任何值,AsyncObject也不發射任何值)它會把這最後一個值發射給任何後續的觀察者。

public void asyncSubject(View view) {
   AsyncSubject<Integer> subject = AsyncSubject.create();
   subject.subscribe(new Action1<Integer>() {
       @Override
       public void call(Integer integer) {
           Log.e("asyncSubject", "1. " + integer);
       }
   });
   for (int i = 0; i < 3; i++) {
       subject.onNext(i);
   }
   //onCompleted方法呼叫後才會觸發訂閱的回撥,否則無回撥。
   subject.onCompleted();
   subject.subscribe(new Action1<Integer>() {
       @Override
       public void call(Integer integer) {
           Log.e("asyncSubject2", "2. " + integer);
       }
   });
}

輸出結果如下

com.felix.testrxjava E/asyncSubject: 1. 2
com.felix.testrxjava E/asyncSubject2: 2. 2

說明

  1. onCompleted方法呼叫後才會觸發訂閱的回撥,否則無回撥。
  2. 無論何時訂閱,都會回撥,而且都只會回撥最終結果。

6.5 SerializedSubject

多個執行緒中呼叫Subject的onNext方法(包括其它的on系列方法),可能導致同時(非順序)呼叫Subscriber,這會違反Observable協議,給Subject的結果增加了不確定性。 要避免此類問題,你可以將 Subject 轉換為一個 SerializedSubject。SerializedSubject保證了同一時刻只有一個執行緒可以呼叫其方法發射資料。

我們看看傳統的方式

public void serializedSubject(View view) {
   final PublishSubject<String> subject = PublishSubject.create();
   subject.subscribe(new Action1<String>() {
       @Override
       public void call(String s) {
           Log.e("serializedSubject", s);
           pos++;
       }
   });
   ExecutorService service = Executors.newFixedThreadPool(3);
   for (int i = 0; i < 3; i++) {
       service.submit(new Runnable() {
           @Override
           public void run() {
               for (int j = 0; j < 3; j++) {
                   SystemClock.sleep(300);
            subject.onNext(Thread.currentThread().getName() + "    " + pos);
               }
           }
       });
   }
   service.shutdown();
}

輸出結果如下,我們看到,執行緒1和執行緒2同時訪問了onNext方法,這裡就導致了輸出兩個一樣的結果 3

serializedSubject: pool-1-thread-1    0
serializedSubject: pool-1-thread-2    1
serializedSubject: pool-1-thread-3    2
serializedSubject: pool-1-thread-1    3
serializedSubject: pool-1-thread-2    3
serializedSubject: pool-1-thread-3    5
serializedSubject: pool-1-thread-1    6
serializedSubject: pool-1-thread-2    7
serializedSubject: pool-1-thread-3    8

如果我們加上serializedSubject裝飾一下

public void serializedSubject(View view) {
   final PublishSubject<String> subject = PublishSubject.create();
   final SerializedSubject serializedSubject = new SerializedSubject(subject);
   serializedSubject.subscribe(new Action1<String>() {
       @Override
       public void call(String s) {
           Log.e("serializedSubject", s);
           pos++;
       }
   });
   ExecutorService service = Executors.newFixedThreadPool(3);
   for (int i = 0; i < 3; i++) {
       service.submit(new Runnable() {
           @Override
           public void run() {
               for (int j = 0; j < 3; j++) {
                   SystemClock.sleep(300);
          serializedSubject.onNext(Thread.currentThread().getName() + "    " + pos);
               }
           }
       });
   }
   service.shutdown();
}

我們再看看輸出,如下:無論重試多少次,都會發現不會重複了。加了SerializedSubject的裝飾,就不會同時又多個執行緒訪問onNext方法了。

serializedSubject: pool-1-thread-1    0
serializedSubject: pool-1-thread-2    1
s