RxJava 2.x 使用最佳實踐
以前寫過 Rxjava 系列教程, 如下所示
上面的這些教程覆蓋了 rxjava 的方方面面,很詳細。只是當時寫的時候是基於 rxjava 1.X 的版本寫的,後來 rxjava 進入了快速迭代的時期,很快就出現了 2.x 版本。根據 Rxjava 官方的GitHub 來看,2.x 相對於 1.x 做了很多改進,刪除了不少的類,同時也增加了一些新的類。基於以上背景,以前的這些文章,就顯得有些不足,為了緊跟 rxjava 的步伐,下面的這篇部落格,就是對 rxjava 的重新認識。
Rxjava、RxAndroid
新增依賴
compile 'io.reactivex.rxjava2:rxandroid:2.0.1'
compile 'io.reactivex.rxjava2:rxjava:2.1.2'
create() :建立
create操作符應該是最常見的操作符了,主要用於產生一個Obserable被觀察者物件,為了方便大家的認知,以後的教程中統一把被觀察者Observable稱為發射器(上游事件),觀察者Observer稱為接收器(下游事件)。
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(@NonNull ObservableEmitter<Integer> e) throws Exception {
e.onNext(1);
e.onNext(2);
e.onNext(3);
e.onComplete(); //結束
e.onNext( 4 );
}
})
.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(@NonNull Disposable d) {
Log.e("zhao" , "onSubscribe: " + d.isDisposed());
}
@Override
public void onNext(@NonNull Integer integer) {
Log.e("zhao", "onNext: " + integer);
}
@Override
public void onError(@NonNull Throwable e) {
Log.e("zhao", "onError: ");
}
@Override
public void onComplete() {
Log.e("zhao", "onComplete: ");
}
});
結果是:
E/zhao: onSubscribe: false
E/zhao: onNext: 1
E/zhao: onNext: 2
E/zhao: onNext: 3
E/zhao: onComplete:
需要注意的幾點是:
1)在發射完 3 之後, 呼叫 e.onComplete() 方法,結束 發射資料。4 沒有發射出來。
2) 另外一個值得注意的點是,在RxJava 2.x中,可以看到發射事件方法相比1.x多了一個throws Excetion,意味著我們做一些特定操作再也不用try-catch了。
3) 並且2.x 中有一個Disposable概念,這個東西可以直接呼叫切斷,可以看到,當它的isDisposed()返回為false的時候,接收器能正常接收事件,但當其為true的時候,接收器停止了接收。所以可以通過此引數動態控制接收事件了。
在上面接收資料的時候,我們用了 Observer 物件,需要實現 4 個 方法。這顯得過於累贅,我們可以用 Consumer 物件來代替 Observer 物件,程式碼如下:
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(@NonNull ObservableEmitter<Integer> e) throws Exception {
e.onNext(1);
e.onNext(2);
e.onNext(3);
e.onComplete();
e.onNext(4);
}
})
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.e("zhao", "accept: " + integer);
}
});
效果如下:
E/zhao: accept: 1
E/zhao: accept: 2
E/zhao: accept: 3
需要注意的是:
1)、Consumer 物件完全代替了Observer ,效果是一樣的。Consumer 顧名思義是消費者的意思,是消費資料的物件。Consumer 物件是 Rxjava 2.x 才出現的,老版本沒有。
map 操作符
map基本算是 RxJava 中一個最簡單的操作符了,熟悉 RxJava 1.x 的知道,它的作用是對發射時間傳送的每一個事件應用一個函式,是的每一個事件都按照指定的函式去變化,而在2.x中它的作用幾乎一致。
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(@NonNull ObservableEmitter<Integer> e) throws Exception {
e.onNext(1);
e.onNext(2);
e.onNext(3);
}
})
.map(new Function<Integer, String>() {
@Override
public String apply(@NonNull Integer integer) throws Exception {
// map 操作符,就是轉換輸入、輸出 的型別;本例中輸入是 Integer , 輸出是 String 型別
Log.e("zhao", "apply: " + integer + " 執行緒:" + Thread.currentThread().getName());
return "This is result " + integer;
}
})
.subscribeOn(Schedulers.io()) //在子執行緒發射
.observeOn(AndroidSchedulers.mainThread()) //在主執行緒接收
.subscribe(new Consumer<String>() {
@Override
public void accept(@NonNull String s) throws Exception {
Log.e("zhao", "accept: " + s + " 執行緒:" + Thread.currentThread().getName());
}
});
結果是:
E/zhao: apply: 1 執行緒:RxCachedThreadScheduler-1
E/zhao: apply: 2 執行緒:RxCachedThreadScheduler-1
E/zhao: apply: 3 執行緒:RxCachedThreadScheduler-1
E/zhao: accept: This is result 1 執行緒:main
E/zhao: accept: This is result 2 執行緒:main
E/zhao: accept: This is result 3 執行緒:main
flatMap 操作符
FlatMap 是一個很有趣的東西,我堅信你在實際開發中會經常用到。它可以把一個發射器Observable 通過某種方法轉換為多個Observables,然後再把這些分散的Observables裝進一個單一的發射器Observable。但有個需要注意的是,flatMap並不能保證事件的順序,如果需要保證,需要用到我們下面要講的ConcatMap。
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(@NonNull ObservableEmitter<Integer> e) throws Exception {
e.onNext(1);
e.onNext(2);
e.onNext(3);
}
})
.flatMap(new Function<Integer, ObservableSource<String>>() {
@Override
public ObservableSource<String> apply(@NonNull Integer integer) throws Exception {
List<String> list = new ArrayList<>();
for (int i = 0; i < 3; i++) {
list.add("I am value " + integer);
}
//隨機生成一個時間
int delayTime = (int) (1 + Math.random() * 10);
return Observable.fromIterable(list).delay(delayTime, TimeUnit.MILLISECONDS);
}
})
.subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
Log.e("zhao", "accept: " + s);
}
});
效果如下:
E/zhao: accept: I am value 1
E/zhao: accept: I am value 3
E/zhao: accept: I am value 3
E/zhao: accept: I am value 3
E/zhao: accept: I am value 1
E/zhao: accept: I am value 1
E/zhao: accept: I am value 2
E/zhao: accept: I am value 2
E/zhao: accept: I am value 2
一切都如我們預期中的有意思,為了區分concatMap(下一個會講),我在程式碼中特意動了一點小手腳,我採用一個隨機數,生成一個時間,然後通過delay(後面會講)操作符,做一個小延時操作,而檢視Log日誌也確認驗證了我們上面的說法,它是無序的。
concatMap 操作符
上面其實就說了,concatMap 與 FlatMap 的唯一區別就是 concatMap 保證了順序,所以,我們就直接把 flatMap 替換為 concatMap 驗證吧。
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(@NonNull ObservableEmitter<Integer> e) throws Exception {
e.onNext(1);
e.onNext(2);
e.onNext(3);
}
})
.concatMap(new Function<Integer, ObservableSource<String>>() {
@Override
public ObservableSource<String> apply(@NonNull Integer integer) throws Exception {
List<String> list = new ArrayList<>();
for (int i = 0; i < 3; i++) {
list.add("I am value " + integer);
}
//隨機生成一個時間
int delayTime = (int) (1 + Math.random() * 10);
return Observable.fromIterable(list).delay(delayTime, TimeUnit.MILLISECONDS);
}
})
.subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
Log.e("zhao", "accept: " + s);
}
});
效果如下:
E/zhao: accept: I am value 1
E/zhao: accept: I am value 1
E/zhao: accept: I am value 1
E/zhao: accept: I am value 2
E/zhao: accept: I am value 2
E/zhao: accept: I am value 2
E/zhao: accept: I am value 3
E/zhao: accept: I am value 3
E/zhao: accept: I am value 3
zip 操作符
構建一個 String 發射器 和 Integer 發射器
//建立 String 發射器
private Observable<String> getStringObservable() {
return Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(@NonNull ObservableEmitter<String> e) throws Exception {
e.onNext("A");
e.onNext("B");
e.onNext("C");
}
});
}
//建立 String 發射器
private Observable<Integer> getIntegerObservable() {
return Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(@NonNull ObservableEmitter<Integer> e) throws Exception {
e.onNext(1);
e.onNext(2);
e.onNext(3);
e.onNext(4);
e.onNext(5);
}
});
}
使用 zip 操作符
Observable.zip(getStringObservable(), getIntegerObservable(), new BiFunction<String, Integer, String>() {
@Override
public String apply(@NonNull String s, @NonNull Integer integer) throws Exception {
return s + integer;
}
})
.subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
Log.e("zhao", "accept: " + s);
}
});
效果如下:
E/zhao: accept: A1
E/zhao: accept: B2
E/zhao: accept: C3
需要注意的是:
1) zip 組合事件的過程就是分別從發射器A和發射器B各取出一個事件來組合,並且一個事件只能被使用一次,組合的順序是嚴格按照事件傳送的順序來進行的,所以上面截圖中,可以看到,1永遠是和A 結合的,2永遠是和B結合的。
2) 最終接收器收到的事件數量是和傳送器傳送事件最少的那個傳送器的傳送事件數目相同,所以如截圖中,5很孤單,沒有人願意和它交往,孤獨終老的單身狗。
interval 操作符
interval操作符是每隔一段時間就產生一個數字,這些數字從0開始,一次遞增1直至無窮大
//方法1
Flowable.interval(1, TimeUnit.SECONDS)
.subscribe(new Consumer<Long>() {
@Override
public void accept(@NonNull Long aLong) throws Exception {
Log.e("zhao", "accept11>: " + aLong);
}
});
//方法2
Observable.interval(1, TimeUnit.SECONDS)
.subscribe(new Consumer<Long>() {
@Override
public void accept(Long aLong) throws Exception {
Log.e("zhao", "accept:22> " + aLong);
}
});
效果如下:
E/zhao: accept11>: 0
E/zhao: accept11>: 1
E/zhao: accept11>: 2
E/zhao: accept11>: 3
E/zhao: accept11>: 4
倒計時
既然 interval 操作符會產生從 0 到無窮大的序列,那麼我們我們會返回來思考一下,如果倒過來想, 就會發現可以用 interval 方法,實現一個倒計時的功能。
建立一個倒計時的 Observable
/**
* 產生一個倒計時的 Observable
* @param time
* @return
*/
public Observable<Long> countdown(final long time) {
return Observable.interval(1, TimeUnit.SECONDS)
.map(new Function<Long, Long>() {
@Override
public Long apply(@NonNull Long aLong) throws Exception {
return time - aLong;
}
}).take( time + 1 );
}
實現倒計時的功能
countdown(4).subscribe(new Consumer<Long>() {
@Override
public void accept(Long aLong) throws Exception {
Log.e("zhao", "accept: 倒計時: " + aLong);
}
});
效果如下:
E/zhao: accept: 倒計時: 4
E/zhao: accept: 倒計時: 3
E/zhao: accept: 倒計時: 2
E/zhao: accept: 倒計時: 1
E/zhao: accept: 倒計時: 0
repeat 操作符:重複的發射資料
repeat 重複地發射資料
- repeat( ) //無限重複
- repeat( int time ) //設定重複的次數
Observable
.just(1, 2)
.repeat( 3 ) //重複3次
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.e("zhao", "accept: " + integer);
}
});
效果:
E/zhao: accept: 1
E/zhao: accept: 2
E/zhao: accept: 1
E/zhao: accept: 2
E/zhao: accept: 1
E/zhao: accept: 2
range :發射特定的整數序列
range 發射特定整數序列的 Observable
- range( int start , int end ) //start :開始的值 , end :結束的值
要求: end >= start
Observable
.range( 1 , 5 )
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.e("zhao", "accept: " + integer);
}
});
效果:
E/zhao: accept: 1
E/zhao: accept: 2
E/zhao: accept: 3
E/zhao: accept: 4
E/zhao: accept: 5
fromArray : 遍歷陣列
Integer[] items = {0, 1, 2, 3, 4, 5};
Observable
.fromArray(items)
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.e("zhao", "accept: " + integer
);
}
});
效果是:
E/zhao: accept: 0
E/zhao: accept: 1
E/zhao: accept: 2
E/zhao: accept: 3
E/zhao: accept: 4
E/zhao: accept: 5
fromIterable : 遍歷集合
List<String> list = new ArrayList<>();
list.add("a");
list.add("b");
list.add("c");
Observable
.fromIterable(list)
.subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
Log.e("zhao", "accept: " + s);
}
});
效果
E/zhao: accept: a
E/zhao: accept: b
E/zhao: accept: c
toList : 把資料轉換成 List 集合
Observable
.just(1, 2, 3, 4)
.toList()
.subscribe(new Consumer<List<Integer>>() {
@Override
public void accept(List<Integer> integers) throws Exception {
Log.e("zhao", "accept: " + integers);
}
});
效果是
accept: [1, 2, 3, 4]
把陣列轉化成 List 集合
Integer[] items = {0, 1, 2, 3, 4, 5};
Observable
.fromArray( items ) //遍歷陣列
.toList() //把遍歷後的陣列轉化成 List
.subscribe(new Consumer<List<Integer>>() {
@Override
public void accept(List<Integer> integers) throws Exception {
Log.e("zhao", "accept: " + integers);
}
});
效果是:
accept: [0, 1, 2, 3, 4, 5]
delay : 延遲發射資料
Observable
.just(1, 2, 3)
.delay(3, TimeUnit.SECONDS) //延遲3秒鐘,然後在發射資料
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.e("zhao", "accept: " + integer);
}
});
效果:
E/zhao: accept: 1
E/zhao: accept: 2
E/zhao: accept: 3
背壓 BackPressure
背壓產生的原因: 被觀察者傳送訊息太快以至於它的操作符或者訂閱者不能及時處理相關的訊息。在 Rxjava 1.x 版本很容易就會報錯,使程式發生崩潰。
...
Caused by: rx.exceptions.MissingBackpressureException
...
...
為了解決這個問題,在RxJava2裡,引入了Flowable這個類:Observable不包含 backpressure 處理,而 Flowable 包含。
下面我們來模擬一個觸發背壓的例項 , 發射器每1毫秒發射一個數據,接收器每一秒處理一個數據。資料產生是資料處理的1000 倍。
首先用 RxJava 2.x 版本的 Observable 來實現。
Observable.interval(1, TimeUnit.MILLISECONDS)
.subscribeOn(Schedulers.io())
.observeOn(Schedulers.newThread())
.subscribe(new Consumer<Long>() {
@Override
public void accept(Long aLong) throws Exception {
Thread.sleep(1000);
Log.e("zhao", "onNext: " + aLong);
}
});
經過測試,app 很健壯,沒有發生崩潰,日誌每1秒列印一次。在上面我們說到 2.x 版本中 Observable 不再支援背壓,發神器生成的資料全部快取在記憶體中。
Observable :
不支援 backpressure 處理,不會發生 MissingBackpressureException 異常。
所有沒有處理的資料都快取在記憶體中,等待被訂閱者處理。
壞處是:當產生的資料過快,記憶體中快取的資料越來越多,佔用大量記憶體。
然後用 RxJava 2.x 版本的 Flowable 來實現。
Flowable.interval(1, TimeUnit.MILLISECONDS)
.subscribeOn(Schedulers.io())
.observeOn(Schedulers.newThread())
.subscribe(new Consumer<Long>() {
@Override
public void accept(Long aLong) throws Exception {
Thread.sleep(1000);
Log.e("zhao", "onNext: " + aLong);
}
});
執行起來發生崩潰,崩潰日誌如下:
io.reactivex.exceptions.OnErrorNotImplementedException: Can't deliver value 128 due to lack of requests
...
...
Caused by: io.reactivex.exceptions.MissingBackpressureException: Can't deliver value 128 due to lack of requests
很明顯發生了 MissingBackpressureException 異常 , 128 代表是 Flowable 最多快取 128 個數據,快取次超過 128 個數據,就會報錯。可喜的是,Rxjava 已經給我們提供瞭解決背壓的策略。
onBackpressureDrop
onBackpressureDrop() :當緩衝區資料滿 128 個時候,再新來的資料就會被丟棄,如果此時有資料被消費了,那麼就會把當前最新產生的資料,放到緩衝區。簡單來說 Drop 就是直接把存不下的事件丟棄。
onBackpressureDrop 測試
Flowable.interval( 1 , TimeUnit.MILLISECONDS)
.onBackpressureDrop() //onBackpressureDrop 一定要放在 interval 後面否則不會生效
.subscribeOn(Schedulers.io())
.observeOn(Schedulers.newThread())
.subscribe(new Consumer<Long>() {
@Override
public void accept(Long aLong) throws Exception {
Thread.sleep(1000);
Log.e("zhao", "onNext: " + aLong);
}
});
效果如下:
E/zhao: onNext: 0
E/zhao: onNext: 1
...
E/zhao: onNext: 126
E/zhao: onNext: 127
E/zhao: onNext: 96129
E/zhao: onNext: 96130
E/zhao: onNext: 96131
從日誌上分析來看,發射器發射的 0 ~ 127 總共 128 個數據是連續的,下一個資料就是 96129 , 128 ~ 96128 的資料被丟棄了。
注意事項
1、onBackpressureDrop 一定要放在 interval 後面否則不會生效
onBackpressureLatest
onBackpressureLatest 就是隻保留最新的事件。
onBackpressureBuffer
onBackpressureBuffer:預設情況下快取所有的資料,不會丟棄資料,這個方法可以解決背壓問題,但是它有像 Observable 一樣的缺點,快取資料太多,佔用太多記憶體。
onBackpressureBuffer(int capacity) :設定快取佇列大小,但是如果緩衝資料超過了設定的值,就會報錯,發生崩潰。
onBackpressureBuffer(int capacity) 測試
Flowable.interval( 1 , TimeUnit.MILLISECONDS)
.onBackpressureBuffer( 1000 ) //設定緩衝佇列大小為 1000
.subscribeOn(Schedulers.io())
.observeOn(Schedulers.newThread())
.subscribe(new Consumer<Long>() {
@Override
public void accept(Long aLong) throws Exception {
Thread.sleep(1000);
Log.e("zhao", "onNext: " + aLong);
}
});
執行起來後,過了幾秒鐘,發生崩潰,日誌如下:
io.reactivex.exceptions.OnErrorNotImplementedException: Buffer is full
···
Caused by: io.reactivex.exceptions.MissingBackpressureException: Buffer is full
通過日誌可以看出,緩衝區已經滿了。
注意事項
1、onBackpressureBuffer 一定要放在 interval 後面否則不會生效
參考資料
個人微訊號:zhaoyanjun125
, 歡迎關注