RxJava2.0(二)五種被觀察者
五種被觀察者為Observable,Flowable,Single,Completable,Maybe。 五種被觀察者可通過toObservable,toFlowable,toSingle,toCompletable,toMaybe相互轉換
Observable
一簡介 1.Observable即被觀察者,決定什麼時候觸發事件以及觸發怎樣的事件。 2.Oberver即觀察者,他可以在不同的執行緒中執行任務,極大的簡化了併發操作,因為他建立了一個處於待命狀態的觀察者,可以在某一時刻響應Observable的通知,而不會造成阻塞。 3.ObservableEmitter資料發射器,發射Observable的onNext,onError,onComplete,onSubscribe方法。 4.subscribe() 訂閱Observable的四個方法,只有呼叫此方法才會開始發射資料。其有4個構造方法:
subscribe(onNext())
subscribe(onNext(),onError())
subscribe(onNext(),onError(),onComplete())
subscribe(onNext(),onError(),onComplete(),onSubscribe())
寫個過載:
//建立被觀察者
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter< String> emitter) {
//發射資料
emitter.onNext("Hello World");
}
//訂閱給觀察者,接收資料
}).subscribe(new Consumer<String>() {
@Override
public void accept(@NonNull String s) throws Exception {
Log.d(TAG, "accept: onNext = " + s);
Toast.makeText(MainActivity.this, s, Toast.LENGTH_SHORT).show();
}
//錯誤
}, new Consumer<Throwable>() {
@Override
public void accept(Throwable throwable) throws Exception {
Log.d(TAG, "accept: onError = " + throwable.getMessage());
}
//執行完成
}, new Action() {
@Override
public void run() throws Exception {
Log.d(TAG, "run: onComplete");
}
}, new Consumer<Disposable>() {
@Override
public void accept(Disposable disposable) throws Exception {
Log.d(TAG, "accept: onSubscribe");
}
});
5.若所有觀察者取消訂閱,則資料流停止,若重新訂閱,重新開始資料流;若部分取消訂閱,不會停止資料流,仍然繼續發射資料,當再次訂閱,不會重新開始資料流,只會收到當前發射資料。 二,HotObservable和ColdObservable 1.HotObservable:無論有無觀察者訂閱,事件始終會發生,與訂閱者們成一對多關係,共享資訊。適用於某些事件不確定何時發生和不確定發射的元素數量。 2. ColdObservable:只有訂閱者訂閱了,才開始發射資料。和訂閱者們成一 一對應關係,各自訊息是重新完整發送,彼此獨立互不干擾。 3. Observable的just,create,range,fromXX等操作符建立的是ColdObservable。 4. 相互轉換:publish 操作符 ColdObservable====>HotObservable; 呼叫connect()後才真正執行轉換。
//建立被觀察者HotObservable
ConnectableObservable<String> connectableObservable =
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) {
//發射資料
emitter.onNext("Hello World");
}
//開啟一個新執行緒,將ColdObservable轉換為HotObservable
}).observeOn(Schedulers.newThread()).publish();
//執行轉化
connectableObservable.connect();
//訂閱給觀察者,接收資料
connectableObservable.subscribe(new Consumer<String>() {
@Override
public void accept(@NonNull String s) throws Exception {
Log.d(TAG, "accept: onNext = " + s);
Toast.makeText(MainActivity.this, s, Toast.LENGTH_SHORT).show();
}
});
- 相互轉換:refCount 操作符HotObservable====>ColdObservable
//建立被觀察者HotObservable
ConnectableObservable<String> connectableObservable =
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) {
//發射資料
emitter.onNext("Hello World");
}
//開啟一個新執行緒,將ColdObservable轉換為HotObservable
}).observeOn(Schedulers.newThread()).publish();
//執行轉化
connectableObservable.connect();
//再次轉化為ColdObservable
Observable observable = connectableObservable.refCount();
//訂閱給觀察者,接收資料
observable.subscribe(new Consumer<String>() {
@Override
public void accept(@NonNull String s) throws Exception {
Log.d(TAG, "accept: onNext = " + s);
Toast.makeText(MainActivity.this, s, Toast.LENGTH_SHORT).show();
}
});
Flowable
Flowable可以看成是Observable的實現,只是它支援背壓,其所有操作付強制支援背壓。 Observable: ·一般處理不超過1000條資料,幾乎不會造成記憶體溢位。 ·不會背壓 ·處理同步流 Flowable: ·處理超過10KB的資料元素 ·檔案讀取與分析 ·讀取資料庫 ·處理網路I/O流 ·建立一個響應式的非阻塞介面
Single
只有onSuccess可onError事件,只能用onSuccess發射一個數據或一個錯誤通知,之後再發射資料也不會做任何處理,直接忽略。
Completable
只有onComplete和onError事件,不發射資料,沒有map,flatMap操作符。常常結合andThen操作符使用。 andThen:接下來執行。
Completable.create(new CompletableOnSubscribe() {
@Override
public void subscribe(CompletableEmitter emitter) throws Exception {
try {
TimeUnit.SECONDS.sleep(1);
//完成
emitter.onComplete();
} catch (InterruptedException e) {
//錯誤
emitter.onError(e);
}
}
//Completable已執行完成,接下來執行:新建Observable發射1-10這幾個資料
}).andThen(Observable.range(1, 10))
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.d(TAG, "accept: " + integer.toString());
}
});
Maybe
沒有onNext方法,同樣需要onSuccess發射資料,且只能發射0或1個數據,多發也不再處理。