RxAndroid使用文件(New)
1 概述
RxJava 一個在 Java VM 上使用可觀測的序列來組成非同步的、基於事件的程式的庫.響應式程式設計是一種基於非同步資料流概念的程式設計模式。資料流就像一條河:它可以被觀測,被過濾,被操作,或者為新的消費者與另外一條流合併為一條新的流。
Rx並不是一種新的語言,而是一種普通的Java模式,類似於觀察者模式(Observer Pattern),可以將它看作一個普通的Java類庫。而RxAndroid是RxJava的一個針對Android平臺的擴充套件,主要用於 Android 開發。
1.1 RxJava 有幾個基本概念:
- Observable 發射源 (可觀察者,即被觀察者)
- Observer 接收源(觀察者)
- Subscriber:Subscriber實現了Observer和Subscription介面,所以比Observer多了一個方法unsubscribe( ),用來取消訂閱。
- Subject:一個比較特殊的物件,既可充當發射源,也可充當接收源
- Subscription :Observable呼叫subscribe( )方法返回的物件,同樣有unsubscribe()方法,可以用來取消訂閱事件;
- Action0:RxJava中的一個介面,它只有一個無參call()方法,且無返回值,同樣還有Action1,Action2…Action9等,Action1封裝了含有 1 個參的call()方法,即call(T t),Action2封裝了含有 2 個引數的call方法,即call(T1 t1,T2 t2),以此類推;
- Func0:與Action0非常相似,也有call()方法,但是它是有返回值的,同樣也有Func0、Func1…Func9;
- subscribe() 訂閱方法,subscribe() 之後, Observable 會持有 Subscriber 的引用,這個引用如果不能及時被釋放,將有記憶體洩露的風險。
- unsubscribe() 取消訂閱方法,在這個方法被呼叫後,Subscriber 將不再接收事件。要在不再使用的時候儘快在合適的地方(例如 onPause() onStop() 等方法中)呼叫 unsubscribe() 來解除引用關係,以避免記憶體洩露的發生。
Observable 和 Observer 通過 subscribe() 方法實現訂閱關係,從而 Observable 可以在需要的時候發出事件來通知 Observer。
1.2 RxJava的優點
- 建立:Rx可以方便的建立事件流和資料流
- 組合:Rx使用查詢式的操作符組合和變換資料流
- 監聽:Rx可以訂閱任何可觀察的資料流並執行操作
- 函式式風格:對可觀察資料流使用無副作用的輸入輸出函式,避免了程式裡錯綜複雜的狀態
- 簡化程式碼:Rx的操作符通通常可以將複雜的難題簡化為很少的幾行程式碼
- 非同步錯誤處理:傳統的try/catch沒辦法處理非同步計算,Rx提供了合適的錯誤處理機制
- 輕鬆使用併發:Rx的Observables和Schedulers讓開發者可以擺脫底層的執行緒同步和各種併發問題
2 Observable
Observable 即被觀察者,它決定什麼時候觸發事件以及觸發怎樣的事件。
一個Observable可以發出零個或者多個事件,直到結束或者出錯。每發出一個事件,就會呼叫它subscribe的Subscriber的onNext方法,最後呼叫Subscriber.onCompleted()完成或者Subscriber.onError()出錯而結束。
下面看看RxJava提供的建立Observable的方法:
2.1 create
新建一個Observables.create方法中傳入了一個 OnSubscribe 物件作為引數,OnSubscribe 會被儲存在返回的 Observable 物件中,它的作用相當於一個計劃表。當 Observable 被訂閱(subscribe)的時候,OnSubscribe 的 call() 方法會自動被呼叫,事件序列就會依照call中設定依次觸發.
public void testCreate(View view) {
mObservable = Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
if (!subscriber.isUnsubscribed()) {
subscriber.onNext("Hello");
subscriber.onNext("World");
subscriber.onCompleted();
}
}
});
}
2.1 subscribe()訂閱
由於建立後需要訂閱了才能看到效果,這裡初步看看訂閱方法。subscribe()註冊 Subscriber
到 Observable
.
subscribe()方法做了三件事:
- 呼叫 Subscriber.onStart()
- 呼叫 Observable 中的 OnSubscribe.call(Subscriber)
- 將傳入的 Subscriber 作為 Subscription 返回
public void testSubscribe(View view) {
if (mObservable != null) {
mObservable.subscribe(new Subscriber<String>() {
@Override
public void onStart() {
/** start方法不是必須的,call方法之前呼叫。
* 在subscribe 所發生的執行緒被呼叫,不能指定執行緒
*/
Log.e("testSubscribe", "onStart" + Thread.currentThread().getId());
super.onStart();
}
@Override
public void onCompleted() {
Log.e("testSubscribe", "onCompleted" + Thread.currentThread().getId());
}
@Override
public void onError(Throwable e) {
e.printStackTrace();
}
@Override
public void onNext(String s) {
Log.e("testSubscribe", s + Thread.currentThread().getId());
}
});
}
}
2.2 forEach
forEach方法是簡化版的subscribe,無返回值。通過 forEach 可以處理 Observable 每個發射出來的資料。且是非阻塞執行的。forEach一般用於遍歷所有元素,然後處理。
public void forEach(View v) {
Observable.interval(500, TimeUnit.MILLISECONDS)
.take(6)
.forEach(LogUtils::e);
LogUtils.e("over");
}
非阻塞的,所以先打印出後面的內容。
02-18 00:50:16.921 15095-15095/com.felix.testrxjava E/LogUtils: over
02-18 00:50:17.420 15095-15290/com.felix.testrxjava E/LogUtils: 0
02-18 00:50:17.920 15095-15290/com.felix.testrxjava E/LogUtils: 1
02-18 00:50:18.420 15095-15290/com.felix.testrxjava E/LogUtils: 2
02-18 00:50:18.919 15095-15290/com.felix.testrxjava E/LogUtils: 3
02-18 00:50:19.420 15095-15290/com.felix.testrxjava E/LogUtils: 4
02-18 00:50:19.919 15095-15290/com.felix.testrxjava E/LogUtils: 5
2.3 just(T…)
just()方法可以傳入一到九個引數,它們會按照傳入的引數的順序來發射它們。just()方法也可以接受列表或陣列,它將會發射整個列表。通常,當我們想發射一組已經定義好的值時會用到它。但是如果我們的函式不是時變性的,我們可以用just來建立一個更有組織性和可測性的程式碼庫。
public void testJust(View view) {
mObservable = Observable.just("hello", "World");
}
2.4 from(T[] t) / from(Iterable
Observable.from(Executors.newFixedThreadPool(3).submit(new Callable<String>() {
@Override
public String call() throws Exception {
Thread.sleep(5000);
return "result";
}
}))
.subscribe(new Action1<String>() {
@Override
public void call(String s) {
Log.e("", s);
}
});
}
2.5 defer(Func0
public void testDefer(View view) {
count = 1;
//just方法
Observable<Integer> justObservable = Observable.just(count);
count = 2;
//just的訂閱
justObservable.subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
Log.e("testDefer", "just " + integer);
}
});
count = 3;
//defer方法
Observable<Integer> testDefer = Observable.defer(new Func0<Observable<Integer>>() {
@Override
public Observable<Integer> call() {
//注意此處的call方法沒有Subscriber引數
return Observable.just(count);
}
});
count = 4;
//defer訂閱
testDefer.subscribe(new Subscriber<Integer>() {
@Override
public void onCompleted() {
Log.e("testDefer", "onCompleted");
}
@Override
public void onError(Throwable e) {
Log.e("testDefer", "onError " + e.getMessage());
}
@Override
public void onNext(Integer i) {
Log.e("testDefer", "defer " + i);
}
});
count = 5;
//defer訂閱2
testDefer.subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
Log.e("testDefer", "Action1 defer " + integer);
}
});
}
以上示例將just和defer放在一起做了一個對比。我們看看列印的值。just等操作符是在建立的時候就已經生成了Observable,而defer是在subscribe的時候才建立,而且每次訂閱都會新建立一個,以保證當前使用的是最新的值。
com.felix.testrxjava E/testDefer: just 1
com.felix.testrxjava E/testDefer: defer 4
com.felix.testrxjava E/testDefer: onCompleted
com.felix.testrxjava E/testDefer: Action1 defer 5
2.6 interval/timeInterval
interval建立一個按固定時間間隔發射整數序列的Observable,可用作定時器.interval()有一個三個引數的過載方法,可以傳入Scheduler排程器,預設使用的是Schedulers.computation()。
public void testInterval(View view) { //每隔2s傳送一次 final Subscription subscription = Observable.interval(2, TimeUnit.SECONDS) .subscribe(new Action1<Long>() { @Override public void call(Long aLong) { Log.e("testInterval", " " + aLong); } }); //延遲15s取消訂閱 new Handler().postDelayed(new Runnable() { @Override public void run() { subscription.unsubscribe(); } }, 15 * 1000); }
timeInterval將原始Observable轉換為另一個Obserervable,後者發射一個標誌替換前者的資料項,這個標誌表示前者的兩個連續發射物之間流逝的時間長度。新的Observable的第一個發射物表示的是在觀察者訂閱原始Observable到原始Observable發射它的第一項資料之間流逝的時間長度。不存在與原始Observable發射最後一項資料和發射onCompleted通知之間時長對應的發射物。timeInterval預設在immediate排程器上執行,你可以通過傳引數修改。
public void timeInterval(View view) { Observable.create(subscriber -> { for (int i = 0; i < 5; i++) { SystemClock.sleep(i * 1000); subscriber.onNext("aaaa " + i); } subscriber.onCompleted(); }).timeInterval().cast(TimeInterval.class) .subscribe(x -> Log.e("timeInterval", x.getIntervalInMilliseconds() + "++" + x.getValue())); }
看看輸出,getIntervalInMilliseconds,返回的間隔的毫秒值,getValue返回的是上一個Observable丟擲來的值。
02-16 22:47:03.748 21569-21569/com.felix.testrxjava E/timeInterval: 0++aaaa 0 02-16 22:47:04.749 21569-21569/com.felix.testrxjava E/timeInterval: 1001++aaaa 1 02-16 22:47:06.749 21569-21569/com.felix.testrxjava E/timeInterval: 2000++aaaa 2 02-16 22:47:09.750 21569-21569/com.felix.testrxjava E/timeInterval: 3001++aaaa 3 02-16 22:47:13.750 21569-21569/com.felix.testrxjava E/timeInterval: 4000++aaaa 4
2.7 range
建立一個發射特定整數序列的Observable:第一個引數為起始值;第二個為傳送的個數,如果為0則不傳送,負數則拋異常,大於int型別的最大值也會拋異常。
/**
* 發射從1開始的10個數字
*
* @param view
*/
public void testRange(View view) {
Observable.range(1, 10)
.subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
Log.e("testRange", " " + integer);
}
});
}
2.8 timer
建立一個Observable,它在一個給定的延遲後發射一個特殊的值(一般是0),等同於Android中Handler的postDelay()。
timer()有一個三個引數的過載方法,可以傳入Scheduler排程器,預設使用的是Schedulers.computation()。
public void testTimer(View view) {
Observable.timer(2, TimeUnit.SECONDS)
.subscribe(new Action1<Long>() {
@Override
public void call(Long aLong) {
Log.e("testRange", " " + aLong);
}
});
}
2.9 repeat
建立重複發射特定的資料或資料序列的Observable
2.10 start
它接受一個函式作為引數,呼叫這個函式獲取一個值,然後返回一個會發射這個值給後續觀察者的Observable。Start操作符的多種RxJava實現都屬於可選的rxjava-async模組。
注意:這個函式只會被執行一次,即使多個觀察者訂閱這個返回的Observable。
2.11 empty
不呼叫onNext(),直接呼叫onComplete(),這裡onStart方法也會呼叫。
public void testEmpty(View view) {
mObservable = Observable.empty();
}
2.12 never
建立一個不發射資料並且也永遠不會結束的Observable。只有onStart方法也會被呼叫。
public void testNever(View view) {
mObservable = Observable.never();
mObservable.subscribe(new Subscriber<String>() {
@Override
public void onStart() {
Log.e("testNever", "onStart");
}
@Override
public void onCompleted() {
Log.e("testNever", "onCompleted");
}
@Override
public void onError(Throwable e) {
e.printStackTrace();
}
@Override
public void onNext(String s) {
Log.e("testNever", "onNext " + s);
}
});
}
2.13 error
建立一個不發射資料並且以錯誤結束的Observable。 只會回撥onStart和onError方法。
public void testError(View view) {
mObservable = Observable.error(new Exception("error test"));
}
2.14 using
using操作符讓你可以指示Observable建立一個只在它的生命週期記憶體在的資源,當Observable終止時這個資源會被自動釋放。
using操作符接受三個引數:
- Func0,一個使用者建立一次性資源的工廠函式
- Func1,一個用於建立Observable的工廠函式
- Action1,一個用於釋放資源的函式
當一個觀察者訂閱using返回的Observable時,using將會使用Observable工廠函式建立觀察者要觀察的Observable,同時使用資源工廠函式建立一個你想要建立的資源。當觀察者取消訂閱這個Observable時,或者當觀察者終止時(無論是正常終止還是因錯誤而終止),using使用第三個函式釋放它建立的資源。
我們來看一個例子
public void using(View view) {
Observable.using(
() -> new TestUsingThread(),
t -> Observable.timer(10, TimeUnit.SECONDS),
resource -> resource.stopThread()
).subscribe(
LogUtils::e,
e -> e.printStackTrace(),
() -> LogUtils.e("completed")
);
}
class TestUsingThread extends Thread {
private volatile boolean flag = true;
int index = 0;
public void stopThread() {
this.flag = false;
}
public TestUsingThread() {
this.start();
}
@Override
public void run() {
while (flag) {
Log.e("=====", index++ + "");
SystemClock.sleep(index * 1000);
}
}
}
測試執行緒一直執行,知道時間達到10s,呼叫關閉方法關閉執行緒,這裡看到執行緒停止運行了。用來關閉資源很好
02-18 00:39:46.087 29094-29417/com.felix.testrxjava E/=====: 0
02-18 00:39:47.088 29094-29417/com.felix.testrxjava E/=====: 1
02-18 00:39:49.088 29094-29417/com.felix.testrxjava E/=====: 2
02-18 00:39:52.088 29094-29417/com.felix.testrxjava E/=====: 3
02-18 00:39:56.090 29094-29417/com.felix.testrxjava E/=====: 4
02-18 00:39:56.103 29094-29419/com.felix.testrxjava E/LogUtils: 0
02-18 00:39:56.104 29094-29419/com.felix.testrxjava E/LogUtils: completed
3 Single
Single類似於Observable,不同的是,它總是隻發射一個值,或者一個錯誤通知,而不是發射一系列的值。訂閱Single只需要如下兩個方法, 需要注意Single是沒有onStart
方法.
onSuccess - Single發射單個的值到這個方法.
onError - 如果無法發射需要的值,Single發射一個Throwable物件到這個方法
3.1 建立
Single的建立方式和Observable基本類似,Observable支援的方法,Single基本都支援。
使用示例如下:
public void testSingle(View view) {
Single<String> single = Single.create(new Single.OnSubscribe<String>() {
@Override
public void call(SingleSubscriber<? super String> singleSubscriber) {
if (!singleSubscriber.isUnsubscribed()) {
if (SystemClock.currentThreadTimeMillis() % 2 == 0) {
singleSubscriber.onSuccess("Hello");
singleSubscriber.onSuccess("World");
} else {
singleSubscriber.onError(new Exception("sth error"));
}
}
}
});
single.subscribe(new SingleSubscriber<String>() {
@Override
public void onSuccess(String value) {
Log.e("###", value);
}
@Override
public void onError(Throwable error) {
error.printStackTrace();
}
});
}
3.2 concatWith: Single轉化為Observable
4 Observer
Observer 即觀察者,它決定事件觸發的時候將有怎樣的行為。
Observer 是一個介面,包含了三個方法
onNext() 普通事件的回撥。
onCompleted(): 事件佇列完結。RxJava 不僅把每個事件單獨處理,還會把它們看做一個佇列。RxJava 規定,當不會再有新的 onNext() 發出時,需要觸發 onCompleted() 方法作為標誌。
onError(): 事件佇列異常。在事件處理過程中出異常時,onError() 會被觸發,同時佇列自動終止,不允許再有事件發出。
在一個正確執行的事件序列中, onCompleted() 和 onError() 有且只有一個,並且是事件序列中的最後一個。需要注意的是,onCompleted() 和 onError() 二者也是互斥的,即在佇列中呼叫了其中一個,就不應該再呼叫另一個。
一般不會直接使用Observer介面,而是使用實現了其介面的抽象類Subscriber,Observer用法如下:
public void testObserver(View view) {
Observable.just(12)
.subscribe(new Observer<Integer>() {
@Override
public void onCompleted() {
Log.e(TAG, "" + "onCompleted");
}
@Override
public void onError(Throwable e) {
e.printStackTrace();
}
@Override
public void onNext(Integer integer) {
Log.e(TAG, "" + integer);
}
});
}
5 Subscriber
Subscriber 是一個實現了 Observer 和 Subscription 兩個介面的抽象類。 Subscriber 對 Observer 介面進行了一些擴充套件,但他們的基本使用方式是完全一樣的:
Subscriber與Observer相比
- Subscriber多了一個onStart方法
- Subscriber多了一個unsubscribe()方法,用於取消訂閱
- Subscriber多了一個isUnsubscribed()方法,用於判斷訂閱狀態
6 Subject
Subject 是一個神奇的物件,它可以是一個Observable同時也可以是一個Observer。Subject是一個抽象類,繼承了Observable的同時也實現了Observer介面。
常用的Subject實現類有以下幾個:
6.1 PublishSubject
PublishSubject只會把在訂閱發生的時間點之後來自原始Observable的資料發射給觀察者。需要注意的是,PublishSubject可能會一建立完成就立刻開始發射資料(除非你可以阻止它發生),因此這裡有一個風險:在PublishSubject被建立後到有觀察者訂閱它之前這個時間段內,一個或多個數據可能會丟失。
public void publishSubject(View view) {
//create方法是無參的,需要通過後續的onNext,onComplete,onError等方法傳送資料
PublishSubject<String> subject = PublishSubject.create();
subject.onNext("1. Hello World");
subject.subscribe(new Subscriber<String>() {
@Override
public void onCompleted() {
Log.e("PublishSubject", "onCompleted");
}
@Override
public void onError(Throwable e) {
Log.e("PublishSubject", "onError " + e.getMessage());
}
@Override
public void onNext(String s) {
Log.e("PublishSubject", "onNext " + s);
}
});
subject.onNext("2.This is Felix");
subject.onCompleted();
}
以上demo列印結果如下,只有訂閱以後的訊息推送才會接收到,這裡需要注意訂閱之前的subject.onCompleted/subject.onError回撥,也會在訂閱之後收到,標誌此Observable結束:
com.felix.testrxjava E/PublishSubject: onNext 2.This is Felix
com.felix.testrxjava E/PublishSubject: onCompleted
PublishSubject一般用於建立連線Observables並且同時可被觀測的實體。比如為公共資源建立獨立、抽象或更易觀測的點這種場景。例如如下場景,我們需要監測一個內部請求的結果,不管成功失敗。
//建立一個PublishSubject用於接收最終的結果
final PublishSubject<Boolean> publishSubject = PublishSubject.create();
publishSubject.subscribe(new Action1<Boolean>() {
@Override
public void call(Boolean aBoolean) {
//最終的結果在這裡接收監聽並處理
Log.e("PublishSubject", "subscribe-- " + aBoolean);
}
});
//建立一個私有的Observable,只有內部的變數才能訪問到
Observable.create(new Observable.OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> subscriber) {
for (int i = 0; i < 10; i++) {
subscriber.onNext(i);
}
subscriber.onCompleted();
}
}).doOnCompleted(new Action0() {
////當Observable結束時要會呼叫這裡
@Override
public void call() {
publishSubject.onNext(true);
}
}).subscribe();//空的訂閱表示不關注中間過程
以上執行的結果,就達到了只監控最終結果的目的。
com.felix.testrxjava E/PublishSubject: subscribe-- true
6.2 BehaviorSubject
當觀察者訂閱BehaviorSubject時,它開始發射原始Observable最近發射的資料(如果此時還沒有收到任何資料,它會發射一個預設值),然後繼續發射訂閱後的資料流。
public void behaviorSubject(View view) {
Integer integer = 12;
BehaviorSubject<Integer> behaviorSubject =BehaviorSubject.create(integer);
behaviorSubject.subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
Log.e("BehaviorSubject1", "1. " + integer);
}
});
behaviorSubject.onNext(11);
behaviorSubject.onNext(23);
behaviorSubject.onNext(58);
behaviorSubject.subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
Log.e("BehaviorSubject2", "2. " + integer);
}
});
}
以上例子,BehaviorSubject.create()
,不傳引數的時候,是不會發送預設值的。輸出如下:
com.felix.testrxjava E/BehaviorSubject1: 1. 11
com.felix.testrxjava E/BehaviorSubject1: 1. 23
com.felix.testrxjava E/BehaviorSubject1: 1. 58
com.felix.testrxjava E/BehaviorSubject2: 2. 58
帶一個引數的時候,其內部會預設認為此引數是預設值的,也就會發送預設值即使這個引數為null。我們看看原始碼
//無參方法,預設第一個引數是null,第二個引數是false
public static <T> BehaviorSubject<T> create() {
return create(null, false);
}
//帶有預設值的方法,預設第二個引數是true
public static <T> BehaviorSubject<T> create(T defaultValue) {
return create(defaultValue, true);
}
private static <T> BehaviorSubject<T> create(T defaultValue, boolean hasDefault) {
final SubjectSubscriptionManager<T> state = new SubjectSubscriptionManager<T>();
if (hasDefault){
state.setLatest(NotificationLite.instance().next(defaultValue));
}
state.onAdded = new Action1<SubjectObserver<T>>() {
@Override
public void call(SubjectObserver<T> o) {
o.emitFirst(state.getLatest(), state.nl);
}
};
state.onTerminated = state.onAdded;
return new BehaviorSubject<T>(state, state);
}
6.3 ReplaySubject
ReplaySubject會快取它所訂閱的所有資料,向任意一個訂閱它的觀察者重發。
public void replaySubject(View view) {
ReplaySubject<Integer> subject = ReplaySubject.create();
for (int i = 0; i < 3; i++) {
subject.onNext(i);
}
subject.subscribe(new Subscriber<Integer>() {
@Override
public void onCompleted() {
Log.e("replaySubject", "onCompleted");
}
@Override
public void onError(Throwable e) {
Log.e("replaySubject", "onError " + e.getMessage());
}
@Override
public void onNext(Integer integer) {
Log.e("replaySubject", "onError " + integer);
}
});
for (int i = 0; i < 5; i++) {
subject.onNext(i);
}
subject.subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
Log.e("replaySubject2", "onError2 " + integer);
}
});
}
以上執行的結果將都打印出所有迴圈的資料,由於沒有執行取消訂閱unsubscribe方法。
com.felix.testrxjava E/replaySubject: onError 0
com.felix.testrxjava E/replaySubject: onError 1
com.felix.testrxjava E/replaySubject: onError 2
com.felix.testrxjava E/replaySubject: onError 0
com.felix.testrxjava E/replaySubject: onError 1
com.felix.testrxjava E/replaySubject: onError 2
com.felix.testrxjava E/replaySubject: onError 3
com.felix.testrxjava E/replaySubject: onError 4
com.felix.testrxjava E/replaySubject: onError2 0
com.felix.testrxjava E/replaySubject: onError2 1
com.felix.testrxjava E/replaySubject: onError2 2
com.felix.testrxjava E/replaySubject: onError2 0
com.felix.testrxjava E/replaySubject: onError2 1
com.felix.testrxjava E/replaySubject: onError2 2
com.felix.testrxjava E/replaySubject: onError2 3
com.felix.testrxjava E/replaySubject: onError2 4
如果你把ReplaySubject當作一個觀察者使用,注意不要從多個執行緒中呼叫它的onNext方法(包括其它的on系列方法),這可能導致同時(非順序)呼叫,這會違反Observable協議,給Subject的結果增加了不確定性。
6.4 AsyncSubject
一個AsyncSubject只在原始Observable完成後,發射來自原始Observable的最後一個值。(如果原始Observable沒有發射任何值,AsyncObject也不發射任何值)它會把這最後一個值發射給任何後續的觀察者。
public void asyncSubject(View view) {
AsyncSubject<Integer> subject = AsyncSubject.create();
subject.subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
Log.e("asyncSubject", "1. " + integer);
}
});
for (int i = 0; i < 3; i++) {
subject.onNext(i);
}
//onCompleted方法呼叫後才會觸發訂閱的回撥,否則無回撥。
subject.onCompleted();
subject.subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
Log.e("asyncSubject2", "2. " + integer);
}
});
}
輸出結果如下
com.felix.testrxjava E/asyncSubject: 1. 2
com.felix.testrxjava E/asyncSubject2: 2. 2
說明
- onCompleted方法呼叫後才會觸發訂閱的回撥,否則無回撥。
- 無論何時訂閱,都會回撥,而且都只會回撥最終結果。
6.5 SerializedSubject
多個執行緒中呼叫Subject的onNext方法(包括其它的on系列方法),可能導致同時(非順序)呼叫Subscriber,這會違反Observable協議,給Subject的結果增加了不確定性。 要避免此類問題,你可以將 Subject 轉換為一個 SerializedSubject。SerializedSubject保證了同一時刻只有一個執行緒可以呼叫其方法發射資料。
我們看看傳統的方式
public void serializedSubject(View view) {
final PublishSubject<String> subject = PublishSubject.create();
subject.subscribe(new Action1<String>() {
@Override
public void call(String s) {
Log.e("serializedSubject", s);
pos++;
}
});
ExecutorService service = Executors.newFixedThreadPool(3);
for (int i = 0; i < 3; i++) {
service.submit(new Runnable() {
@Override
public void run() {
for (int j = 0; j < 3; j++) {
SystemClock.sleep(300);
subject.onNext(Thread.currentThread().getName() + " " + pos);
}
}
});
}
service.shutdown();
}
輸出結果如下,我們看到,執行緒1和執行緒2同時訪問了onNext方法,這裡就導致了輸出兩個一樣的結果 3
serializedSubject: pool-1-thread-1 0
serializedSubject: pool-1-thread-2 1
serializedSubject: pool-1-thread-3 2
serializedSubject: pool-1-thread-1 3
serializedSubject: pool-1-thread-2 3
serializedSubject: pool-1-thread-3 5
serializedSubject: pool-1-thread-1 6
serializedSubject: pool-1-thread-2 7
serializedSubject: pool-1-thread-3 8
如果我們加上serializedSubject裝飾一下
public void serializedSubject(View view) {
final PublishSubject<String> subject = PublishSubject.create();
final SerializedSubject serializedSubject = new SerializedSubject(subject);
serializedSubject.subscribe(new Action1<String>() {
@Override
public void call(String s) {
Log.e("serializedSubject", s);
pos++;
}
});
ExecutorService service = Executors.newFixedThreadPool(3);
for (int i = 0; i < 3; i++) {
service.submit(new Runnable() {
@Override
public void run() {
for (int j = 0; j < 3; j++) {
SystemClock.sleep(300);
serializedSubject.onNext(Thread.currentThread().getName() + " " + pos);
}
}
});
}
service.shutdown();
}
我們再看看輸出,如下:無論重試多少次,都會發現不會重複了。加了SerializedSubject的裝飾,就不會同時又多個執行緒訪問onNext方法了。
serializedSubject: pool-1-thread-1 0
serializedSubject: pool-1-thread-2 1
s