RxJava2詳細攻略(上)
0.簡介
RxJava其實就是提供一套非同步程式設計的API,這套API是基於觀察者模式的,而且是鏈式呼叫的,所以使用RxJava編寫的程式碼的邏輯會非常簡介。
RxJava有三個基本元素:
1.被觀察者(Observable)
2.觀察者(Observer)
3.訂閱(subscribe)
下面來說說以上三者是如何寫作的:
首先在gradle檔案中新增依賴:
implementation 'io.reactivex.rxjava2:rejava.2.1.4'
implementation 'io.reactivex.rxjava2.2.0.0'
1.建立被觀察者
Observable observable = Observable.create(new ObservableOnSubscribe<Integer>(){ @Override public void subscribe(ObservableEmitter<Integer> e)throws Exception{ Log.d(TAG, "==currentThread name: " + Thread.currentThread().getName()); e.onNext(1); e.onNext(2); e.onComplete(); } });
2.建立觀察者
Observer observer = new Observer<Integer>(){ @Override public void onSubscribe(Disposable d){ Log.d(TAG, "==onSubscribe"); } @Override public void onNext(Integer integer){ Log.d(TAG, "==onNext" + integer); } @Override public void onError(Throwable e){ Log.d(TAG, "==onError"); } @Override public void onComplete(){ Log.d(TAG, "==onComplete"); } };
3.訂閱
observable.subscribe(observer);
或者使用鏈式呼叫
Observable.create(new ObservableOnSubscribe < Integer > () { @Override public void subscribe(ObservableEmitter < Integer > e) throws Exception { Log.d(TAG, "=========================currentThread name: " + Thread.currentThread().getName()); e.onNext(1); e.onNext(2); e.onNext(3); e.onComplete(); } }) .subscribe(new Observer < Integer > () { @Override public void onSubscribe(Disposable d) { Log.d(TAG, "======================onSubscribe"); } @Override public void onNext(Integer integer) { Log.d(TAG, "======================onNext " + integer); } @Override public void onError(Throwable e) { Log.d(TAG, "======================onError"); } @Override public void onComplete() { Log.d(TAG, "======================onComplete"); } });
被觀察者傳送的事件有以下幾種,總結如下表:
事件種類 | 作用 |
---|---|
onNext() | 傳送該事件時,觀察者會回撥onNext()方法 |
onError() | 傳送該事件時,觀察者會回撥onError()方法,當傳送該事件之後,其他事件不會繼續傳送 |
onComplete() | 傳送該事件時,觀察者會呼叫onComplete()方法,當傳送該事件之後,其他事件將不會發送 |
其實可以將RxJava比喻成一個做果汁的過程,家裡有很多種水果(要傳送的原始資料),你想榨果汁喝,這時候你就要想究竟要喝什麼果汁呢?如果你想喝牛油果雪梨檸檬汁,那你就要把這三種水果混在一起榨汁(使用各種操作符變換你想傳送給觀察者的資料),榨完後,你就可以喝上你想要的果汁了(把處理好的資料傳送給觀察者)。
總結如下圖:
下面就來講解RxJava各種操作的操作符。
1.建立操作符
以下就是講解建立被觀察者的各種操作符
1.1 create()
方法預覽
public static <T> Observable <T> create(ObservableOnSubscribe<T> source)
有什麼用
建立一個被觀察者
怎麼用
Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>(){
@Override
public void subscribe(ObservableEmitter<String> e)throws Exception{
e.onNext("Hello Observer");
e.onComplete;
}
});
以上的程式碼非常簡單,建立ObservableOnSubscribe並重寫其subscribe方法,就可以通過ObservableEmitter發射器向觀察者傳送事件。
以下建立一個觀察者,來驗證這個被觀察者是否成功建立。
Observer<String> observer = new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(String s) {
Log.d("chan","=============onNext " + s);
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
Log.d("chan","=============onComplete ");
}
};
observable.subscribe(observer);
列印結果:
05-20 16:16:50.654 22935-22935/com.example.louder.rxjavademo D/chan: =============onNext Hello Observer
=============onComplete
1.2 just()
方法預覽
public static <T> Observable<T> just(T item)
......
public static <T> Observable<T> just(T item1, T item2, T item3, T item4, T item5, T item6, T item7, T item8, T item9, T item10)
有什麼用
建立一個被觀察者,併發送事件,傳送的事件不可以超過10個以上。
怎麼用
Observable.just(1,2,3)
.subscribe(new Observer <Integer>(){
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "=================onSubscribe");
}
@Override
public void onNext(Integer integer) {
Log.d(TAG, "=================onNext " + integer);
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "=================onError ");
}
@Override
public void onComplete() {
Log.d(TAG, "=================onComplete ");
}
})
上面的程式碼直接使用鏈式呼叫,程式碼也非常簡單,這裡就不細說了,看看列印結果:
05-20 16:27:26.938 23281-23281/? D/chan: =================onSubscribe
=================onNext 1
=================onNext 2
=================onNext 3
=================onComplete
1.3 From操作符
1.3.1 fromArray()
方法預覽
public static <T> Observable<T> fromArray(T---items)
有什麼用?
這個方法和just()類似,只不過fromArray可以傳入多於10個的變數,並且可以傳入一個數組。
怎麼用
Integer array[] = {1,2,3,4};
Observable.fromArray(array)
.subscribe(new Observer<Integer>(){
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "=================onSubscribe");
}
@Override
public void onNext(Integer integer) {
Log.d(TAG, "=================onNext " + integer);
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "=================onError ");
}
@Override
public void onComplete() {
Log.d(TAG, "=================onComplete ");
}
});
程式碼和just()基本上一樣,直接看列印結果:
05-20 16:35:23.797 23574-23574/com.example.louder.rxjavademo D/chan: =================onSubscribe
=================onNext 1
=================onNext 2
=================onNext 3
=================onNext 4
=================onComplete
1.3.2 fromCallable()
方法預覽
public static <T> Observable <T> fromCallable(Callable<? extends T> supplier)
有什麼用
這裡的 Callable 是 java.util.concurrent 中的 Callable, Callable和 Runnable的用法基本一致,只是它會返回一個結果值,這個結果值就是傳送給觀察者的。
怎麼用
Observable.fromCallable(new Callable<Integer>(){
@Override
public Integer call() throws Exception{
return 1;
}
})
.subscribe(new Consumer<Integer>(){
@Override
public void accept(Integer integer) throws Exception{
Log.d(TAG, "==accept" + integer);
}
});
列印結果
05-26 13:01:43.009 6890-6890/? D/chan: ================accept 1
1.3.3 fromFuture()
方法預覽
public static <T> Observable<T> fromFuture(Future<? extends T> future)
有什麼用?
引數中的Future是 java.util.concurrent 中的Future,Future的作用是增加了cancel()等方法操作Callable,它可以通過get()方法來獲取Callable返回的值。
怎麼用
FutureTask<String> futureTask = new FutureTask<> (new Callable<String>(){
@Override
public String call() throws Exception{
Log.d(TAG, "CallableDemo is Running");
return "返回結果";
}
});
Observable.fromFuture(futureTask)
.doOnSubscribe(new Consumer<Disposable>(){
@Override
public void accept(Disposable disposable)throws Exception{
futureTask.run();
}
})
.subscribe(new Consumer<String>(){
@Override
public void accept(String s) throws Exception{
Log.d(TAG, "==accept" + s);
}
});
doOnSubscribe()的作用就是隻有訂閱時才會傳送事件,具體會在下面講解。
列印結果
05-26 13:54:00.470 14429-14429/com.example.rxjavademo D/chan: CallableDemo is Running
================accept 返回結果
1.3.4 fromIterable()
方法預覽
public static <T> Observable<T> fromIterable(Iterable<? extends T> source)
有什麼用
直接傳送一個List集合資料給觀察者
怎麼用
List<Integer> list = new ArrayList<>();
list.add(0);
list.add(1);
list.add(2);
Observable.fromIterable(list)
.subscribe(new Observable<Integer>(){
@Override
public void onSubscribe(Disposable d){
Log.d(TAG, "==onSubscribe");
}
@Override
public void onNext(Integer integer) {
Log.d(TAG, "=================onNext " + integer);
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "=================onError ");
}
@Override
public void onComplete() {
Log.d(TAG, "=================onComplete ");
}
});
列印結果如下:
05-20 16:43:28.874 23965-23965/? D/chan: =================onSubscribe
=================onNext 0
=================onNext 1
=================onNext 2
=================onComplete
1.4 defer()
方法預覽
public static <T> Observable<T> defer(Callable<> extends ObservableSource<? extends T>> supplier)
有什麼用
這個方法的作用是直到被觀察者被訂閱後才會建立被觀察者。
怎麼用
//i要定義為成員變數
Interger i = 100;
Observable<Integer> observable = Observable.deger(new Callable<ObservabelSource<? extends Integer>>){
@Override
public ObservableSource<? extends Integer>call()throws Exception{
return Observable.just(i);
}
});
i = 200;
Observer observer = new Observer<Integer>(){
@Override
public void onSubscribe(Disposable d){
}
@Override
public void onNext(Integer integer) {
Log.d(TAG, "================onNext " + integer);
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
};
observable.subscribe(observer);
i = 300;
observable.subscribe(observer);
列印結果如下:
05-20 20:05:01.443 26622-26622/? D/chan: ================onNext 200
================onNext 300
因為defer()只有觀察者訂閱的時候才會建立新的被觀察者,所以每訂閱一次就會列印一次,並且都是列印i最新的值。
1.5 timer()
方法預覽
public static Observable<Long> timer(long delay, TimeUnit unit)
有什麼用
當到指定時間後就會發送一個OL的值給觀察者。
怎麼用
Observable.timer(2, TimeUnit.SECONDS)
.subscribe(new Observer<Long>(){
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Long aLong) {
Log.d(TAG, "===============onNext " + aLong);
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
列印結果:
05-20 20:27:48.004 27204-27259/com.example.louder.rxjavademo D/chan: ===============onNext 0
1.6 interval()
方法預覽
public static Observable<Long> interval(long period, TimeUnit unit)
public static Observable<Long> interval(long initialDelay, long period, TimeUnit unit)
有什麼用
每隔一段時間就會發送一個事件,這個事件是從0開始,不斷增1的數字。
怎麼用
Observable.interval(4, TimeUnit.SECONDS)
.subscribe(new Observer<Long>(){
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "==============onSubscribe ");
}
@Override
public void onNext(Long aLong) {
Log.d(TAG, "==============onNext " + aLong);
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
列印結果:
05-20 20:48:10.321 28723-28723/com.example.louder.rxjavademo D/chan: ==============onSubscribe
05-20 20:48:14.324 28723-28746/com.example.louder.rxjavademo D/chan: ==============onNext 0
05-20 20:48:18.324 28723-28746/com.example.louder.rxjavademo D/chan: ==============onNext 1
05-20 20:48:22.323 28723-28746/com.example.louder.rxjavademo D/chan: ==============onNext 2
05-20 20:48:26.323 28723-28746/com.example.louder.rxjavademo D/chan: ==============onNext 3
05-20 20:48:30.323 28723-28746/com.example.louder.rxjavademo D/chan: ==============onNext 4
05-20 20:48:34.323 28723-28746/com.example.louder.rxjavademo D/chan: ==============onNext 5
從時間就可以看出每隔4秒就會發出一次數字遞增1的事件。這裡說下interval()第三個方法的initialDelay引數,這個引數的意思是onSubscribe回撥之後,再次回撥onNext的間隔時間。
1.7 intervalRange()
方法預覽
public static Observable<Long> intervalRange(long start, long count, long initialDelay, long period, TimeUnit unit)
public static Observable<Long>intervalRange(long start, long connt, long initialDelay long period, TimeUnit unit, Scheduler scheduler)
有什麼用
可以指定傳送事件的開始值和數量,其他與interval()的功能一樣。
怎麼用
Observable.intervalRange(2, 5, 2, 1, TimeUnit.SECONDS)
.subscribe(new Observer<Long>(){
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "==============onSubscribe ");
}
@Override
public void onNext(Long aLong) {
Log.d(TAG, "==============onNext " + aLong);
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
列印結果:
05-21 00:03:01.672 2504-2504/com.example.louder.rxjavademo D/chan: ==============onSubscribe
05-21 00:03:03.674 2504-2537/com.example.louder.rxjavademo D/chan: ==============onNext 2
05-21 00:03:04.674 2504-2537/com.example.louder.rxjavademo D/chan: ==============onNext 3
05-21 00:03:05.674 2504-2537/com.example.louder.rxjavademo D/chan: ==============onNext 4
05-21 00:03:06.673 2504-2537/com.example.louder.rxjavademo D/chan: ==============onNext 5
05-21 00:03:07.674 2504-2537/com.example.louder.rxjavademo D/chan: ==============onNext
可以看出收到5次onNext事件,並且都是從2開始的。
1.8 range()
方法預覽
public static Observable<Integer> range(final int start, final int count)
有什麼用
同時傳送一定範圍的事件序列
怎麼用
Observable.range(2, 5)
.subscribe(new Observer < Integer > () {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "==============onSubscribe ");
}
@Override
public void onNext(Integer aLong) {
Log.d(TAG, "==============onNext " + aLong);
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
列印結果:
05-21 00:09:17.202 2921-2921/? D/chan: ==============onSubscribe
==============onNext 2
==============onNext 3
==============onNext 4
==============onNext 5
==============onNext 6
1.9 rangeLong()
方法預覽
public static Observable<Long> rangeLong(long start, long count)
有什麼用
作用與range()一樣,只是資料型別為Long
怎麼用
用法與range()一樣,這裡不做贅述。
1.10 empty()&never()&error()
方法預覽
public static <T> Observable<T> empty()
public static <T> Observable<T> never()
public static <T> Observable<T> error(final Throwable exception)
有什麼用
- empty():直接傳送onComplete()事件
- never():不傳送任何事件
- error():傳送onError()事件
怎麼用
Observable.empty()
.subscribe(new Observer<Object>(){
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "==================onSubscribe");
}
@Override
public void onNext(Object o) {
Log.d(TAG, "==================onNext");
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "==================onError " + e);
}
@Override
public void onComplete() {
Log.d(TAG, "==================onComplete");
}
});
列印結果:
05-26 14:06:11.881 15798-15798/com.example.rxjavademo D/chan: ==================onSubscribe
==================onComplete
換成never()的列印結果:
05-26 14:12:17.554 16805-16805/com.example.rxjavademo D/chan: ==================onSubscribe
換成onError()的列印結果:
05-26 14:12:58.483 17817-17817/com.example.rxjavademo D/chan: ==================onSubscribe
==================onError java.lang.NullPointerException
2.轉換操作符
2.1 map()
方法預覽
public final <R> Observable<R> map(Function<? super T, ? extends R> mapper)
有什麼用
map可以將被觀察者傳送的資料型別轉變成其他的型別
怎麼用?
以下程式碼將Integer型別的資料轉換成String
Observable.just(1, 2, 3)
.map(new Function<Integer, String>(){
@Override
public String apply(Integer integer)throws Exception{
return "I'm " + integer;
}
})
.subscribe(new Observer<String>(){
@Override
public void onSubscribe(Disposable d){
Log.e(TAG, "==onSubscribe");
}
@Override
public void onNext(String s){
Log.e(TAG, "==onNext" + s);
}
@Override
public void onError(Throwable e){
}
@Override
public void onComplete(){
}
});
列印結果:
05-21 09:16:03.490 5700-5700/com.example.rxjavademo E/chan: ===================onSubscribe
===================onNext I'm 1
===================onNext I'm 2
===================onNext I'm 3
2.2 flatMap()
方法預覽
public final<R> Observable flatMap(Function<? super T, ? extends ObservableSource<? extends R>> mapper)
有什麼用
這個方法可以將事件序列中的元素進行整合加工,返回一個新的被觀察者。
怎麼用?
flatMap()其實與map()類似,但是faltMap()返回的是一個Observable。現在用一個例子來說明flatmap()的用法。
假設有一個Person類,這個類的定義如下:
public class Person{
private String name;
private List<Plan> planList = new ArrayList<>();
public Person(String name, List<Plan> planList){
this.name = name;
this.planList = planList;
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public List<Plan> getPlanList() {
return planList;
}
public void setPlanList(List<Plan> planList) {
this.planList = planList;
}
}
Person類有一個name和planList兩個變數,分別代表的是人名和計劃清單。
Plan類的定義如下:
public class Plan{
private String time;
private String content;
private List<String> actionList = new ArrayList<>();
public Plan(String time, String content){
this.name = name;
this.content = content;
}
public String getTime() {
return time;
}
public void setTime(String time) {
this.time = time;
}
public String getContent() {
return content;
}
public void setContent(String content) {
this.content = content;
}
public List<String> getActionList() {
return actionList;
}
public void setActionList(List<String> actionList) {
this.actionList = actionList;
}
}
現在有一個需求就是要將Person集合中的每個元素中的Plan的action打印出來。首先用map()來實現這個需求:
Observable.fromIterable(personList)
.map(new Function <Person, List<Plan>>(){
@Override
public List <Plan> apply(Person person) throws Exception{
return person.getPlanList();
}
})
.subscribe(new Observer<List <Plan>>(){
@Override
public void onSubscribe(Disposable d){
}
@Override
public void onNext(List<Plan> plans){
for(Plan plan : plans){
List<String> planActionList = planActionList();
for(String action: planActionList){
Log.d(TAG, "==action" + action);
}
}
}
@Override
public void onError(Throwable e){
}
@Override
public void onComplete(){
}
});
可以看到onNext()用了巢狀for迴圈來實現,如果程式碼邏輯複雜起來,可能需要多重迴圈才能實現。
現在看下使用flatMap()實現:
Observable.fromIterable(personList)
.flatMap(new Function<Person, ObservableSourfe<Plan>> (){
@Override
public ObservableSource<Plan> apply(Person person){
return Observable.formIterable(person.getPlanList());
}
})
.flatMap(new Function<Plan, ObservableSource<String>>(){
@Override
public ObservableSource<String> apply(Plan plan) throws Exception{
return Observable.fromIterable(plan.getActionList());
}
})
.subscribe(new Observer<String>(){
@Override
public void onSubscribe(Disposable d){
}
@Override
public void onNext(String s){
Log.d(TAG, "==action" + s);
}
@Override
public void onError(Throwable e){
}
@Override
public void onComplete(){
}
});
從程式碼可以看出,只需要兩個flatMap()就可以完成需求,並且程式碼邏輯非常清晰。
2.3 concatMap()
方法預覽
public final <R> Observable<R> concatMap(Function<? super T, ? extends ObservableSource<? extends R>> mapper)
public final <R> Observable<R> concatMap(Function<? super T, ? extends ObservableSource<? extends R>> mapper, int prefetch)
有什麼用?
concatMap()和flatMap()基本上是一樣的,只不過concatMap()轉發出來的事件是有序的,而flatMap()是無序的。
怎麼用
Observable.fromIterable(personList)
.flatMap(new Function<Person, ObservableSource<Plan>>(){
@Override
public ObservableSource<Plan> apply(Person person){
if("chan".equals(person.getName())){
return Observable.fromIterable(person.getPlanList()).delay(10, TimeUnit.MILLISECONDS);
}
return Observable.fromIterable(person.getPlanList());
}
})
.subscribe(new Observer<Plan>(){
@Override
public void onSubscribe(Disposable d){
}
@Override
public void onNext(Plan plan){
Log.d(TAG, "==plan" + plan.getContent());
}
@Override
public void onError(Throwable e){
}
@Override
public void onComplete(){
}
});
為了更好地驗證flatMap是無序的,使用了一個delay()方法來延遲,直接看列印結果:
05-21 13:57:14.031 21616-21616/com.example.rxjavademo D/chan: ==================plan chan 上課
==================plan chan 寫作業
==================plan chan 打籃球
05-21 13:57:14.041 21616-21641/com.example.rxjavademo D/chan: ==================plan Zede 開會
==================plan Zede 寫程式碼
==================plan Zede 寫文章
本來Zede的事件傳送順序是排在chan事件之前,但是經過延遲後,這兩個事件序列傳送順序互換了。
現在來驗證下concatMap()是否是有序的,使用上面同樣的程式碼,只是把flatMap()換成concatMap(),列印結果如下:
05-21 13:58:42.917 21799-21823/com.example.rxjavademo D/chan: ==================plan Zede 開會
==================plan Zede 寫程式碼
==================plan Zede 寫文章
==================plan chan 上課
==================plan chan 寫作業
==================plan chan 打籃球
這就代表concatMap()轉換後傳送的事件序列是有序的了。
2.4 buffer()
方法預覽
public final Observable<List<T>> buffer(int count, int skip)
有什麼用?
從需要傳送的事件當中獲取一定數量的事件,並將這些事件放到緩衝區當中一併發出。
怎麼用?
buffer有兩個引數,一個是count,另一個skip。count緩衝區元素的數量,skip就代表緩衝區滿了之後,傳送下一次事件序列的時候要跳過多少元素。這樣說可能還是有點抽象,直接看程式碼:
Observable.just(1, 2, 3, 4, 5)
.buffer(2, 1)
.subscribe(new Observer<List <Integer>>(){
@Override
public void onSubscribe(Disposable d){
}
@Override
public void onNext(List<Integer> integers){
Log.d(TAG, "==緩衝區大小:" + integers.size());
for(Integer i: integers){
Log.d(TAG, "==元素:" + i);
}
}
@Override
public void onError(Throwable e){
}
@Override
public void onComplete(){
}
});
列印結果:
05-21 14:09:34.015 22421-22421/com.example.rxjavademo D/chan: ================緩衝區大小: 2
================元素: 1
================元素: 2
================緩衝區大小: 2
================元素: 2
================元素: 3
================緩衝區大小: 2
================元素: 3
================元素: 4
================緩衝區大小: 2
================元素: 4
================元素: 5
================緩衝區大小: 1
================元素: 5
從結果可以看出,每次傳送事件,指標都會往後移動一個元素再取值,直到指標移動到沒有元素的時候就會停止取值。
2.5 groupBy()
方法預覽
public final <K> Observable<GroupedObservable<K, T>> groupBy(Function<? super T, ? extends K> keySelector)
有什麼用
將傳送的資料進行分組,每個分組都會返回一個被觀察者。
怎麼用
Observable.just(5, 2, 3, 4, 1, 6, 8, 9, 7, 10)
.groupBy(new Function<Integer, Integer>(){
@Override
public Integer apply(Integer integer) throws Exception{
return integer % 3;
}
})
.subscribe(new Observer<GroupedObservable<Integer, Integer>>(){
@Override
public void onSubscribe(Disposable d){
Log.d(TAG, "==onSubscribe");
}
@Override
public void onNext(GroupedObservable<Integer, Integer> integerIntegerGroupedObservable){
Log.d(TAG, "==onNext");
integerIntegerGroupObservable.subscribe(new Observer<Integer>(){
@Override
public void onSubscribe(Disposable d){
Log.d(TAG, "==GroupedObservable onSubscribe");
}
@Override
public void onNext(Integer integer){
Log.d(TAG, "==GroupedObservable onNext groupName: " + integerIntegerGroupedObservable.getKey() + " value" + integer);
@Override
public void onError(Throwable e){
Log.d(TAG, "==GroupedObservable onError");
}
}
@Override
public void onComplete(){
Log.d(TAG, "==GroupedObservable onComplete");
}
});
}
@Override
public void onError(Throwable e){
Log.d(TAG, "==onError");
}
@Override
public void onComplete(){
Log.d(TAG, "==onComplete");
}
});
在groupBy()方法返回的引數是分組的名字,每返回一個值,那就代表會建立一個組,以上的程式碼就是將1~10的資料分成3組,檢視列印結果:
05-26 14:38:02.062 21451-21451/com.example.rxjavademo D/chan: ====================onSubscribe
05-26 14:38:02.063 21451-21451/com.example.rxjavademo D/chan: ====================onNext
====================GroupedObservable onSubscribe ====================GroupedObservable onNext groupName: 2 value: 5
====================GroupedObservable onNext groupName: 2 value: 2
====================onNext
====================GroupedObservable onSubscribe
====================GroupedObservable onNext groupName: 0 value: 3
05-26 14:38:02.064 21451-21451/com.example.rxjavademo D/chan: ====================onNext
====================GroupedObservable onSubscribe
====================GroupedObservable onNext groupName: 1 value: 4
====================GroupedObservable onNext groupName: 1 value: 1
====================GroupedObservable onNext groupName: 0 value: 6
====================GroupedObservable onNext groupName: 2 value: 8
====================GroupedObservable onNext groupName: 0 value: 9
====================GroupedObservable onNext groupName: 1 value: 7
====================GroupedObservable onNext groupName: 1 value: 10
05-26 14:38:02.065 21451-21451/com.example.rxjavademo D/chan: ====================GroupedObservable onComplete
====================GroupedObservable onComplete
====================GroupedObservable onComplete
====================onComplete
可以看到返回的結果中是有三個組的。
2.6 scan()
方法預覽
public final Observable<T> scan(BiFunction<T, T, T> accumulator)
有什麼用?
將資料以一定的邏輯聚合起來
怎麼用?
Observable.just(1, 2, 3, 4, 5)
.scan(new BiFunction<Integer, Integer, Integer>(){
@Override
public Integer apply(Integer integer, Integer integer2)throws Exception{
Log.d(TAG, "==apply");
Log.d(TAG, "==integer" + integer);
Log.d(TAG, "==integer2" + integer2);
return integer + integer2;
}
})
.subscribe(new Consumer<Integer>(){
@Override
public void accept(Integer integer) throws Exception{
Log.d(TAG, "==accept" + integer);
}
});
列印結果:
05-26 14:45:27.784 22519-22519/com.example.rxjavademo D/chan: ====================accept 1
====================apply
====================integer 1
====================integer2 2
====================accept 3
====================apply
05-26 14:45:27.785 22519-22519/com.example.rxjavademo D/chan: ====================integer 3
====================integer2 3
====================accept 6
====================apply
====================integer 6
====================integer2 4
====================accept 10
====================apply
====================integer 10
====================integer2 5
====================accept 15
2.7 window()
方法預覽
public final Observable<Observable<T>> window(long count)
有什麼用?
傳送指定數量的事件時,就將這些事件分為一組。Window中的count的引數就是代表指定的數量,例如將count指定為2,那麼每發2個數據就會將這兩個資料分成一組。
怎麼用?
Observable.just(1, 2, 3, 4, 5)
.window(2)
.subscribe(new Observer<Observable<Integer>>(){
@Override
public void onSubscribe(Disposable d){
Log.d(TAG, "==onSubscribe");
}
@Override
public void onNext(Observable<Integer> integerObservable){
integerObservable.subscribe(new Observer<Integer>(){
@Override
public void onSubscribe(Disposable d){
Log.d(TAG, "==integerObservable onSubscribe");
}
@Override
public void onNext(Integer integer){
Log.d(TAG, "==integerObservable onNext" + integer);
}
@Override
public void onError(Throwable e){
Log.d(TAG, "==integerObservable onError");
}
@Override
public void onComplete(){
Log.d(TAG, "==integerObservable onComplete");
}
});
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "=====================onError ");
}
@Override
public void onComplete() {
Log.d(TAG, "=====================onComplete ");
}
});
列印結果:
05-26 15:02:20.654 25838-25838/com.example.rxjavademo D/chan: =====================onSubscribe
05-26 15:02:20.655 25838-25838/com.example.rxjavademo D/chan: =====================integerObservable onSubscribe
05-26 15:02:20.656 25838-25838/com.example.rxjavademo D/chan: =====================integerObservable onNext 1
=====================integerObservable onNext 2
=====================integerObservable onComplete
=====================integerObservable onSubscribe
=====================integerObservable onNext 3
=====================integerObservable onNext 4
=====================integerObservable onComplete
=====================integerObservable onSubscribe
=====================integerObservable onNext 5
=====================integerObservable onComplete
=====================onComplete
從結果可以發現,window()將1~5的事件分成了3組。