Android:RxJava的使用
說明:
RxJava用於非同步執行任務,跟建立子執行緒執行任務無本質區別,優點在於讓程式碼看起來整潔優雅些,並不能減少程式碼量
一、加入jar包依賴(app下的build.gradle):
dependencies {
...
compile 'io.reactivex.rxjava2:rxjava:2.+'
compile 'io.reactivex.rxjava2:rxandroid:2.+'
}
二、建立Observable(傳送資料):
1.通過create方法建立(需在subscribe方法中手動呼叫各個監聽方法,<>可以為任意型別)
Observable<T> observable = Observable.create(new ObservableOnSubscribe<T>() {
@Override
public void subscribe(ObservableEmitter<T> e) throws Exception {
/*
需要手動呼叫各個監聽方法
*/
e.onNext(T);
e.onComplete();
}
});
2.通過just方法建立(自動按順序呼叫各個監聽方法,<>
Observable<T> observable = Observable.just(T);
3.通過fromIterable方法建立(根據傳入的列表多次呼叫onNext方法,<>可以為任意型別)
List<T> list = new ArrayList<>();
list.add(T); //onNext方法會呼叫size次
Observable<T> observable = Observable.fromIterable(list);
4.通過defer方法建立(延時呼叫,<>可以為任意型別)
Observable<T> observable = Observable.defer(new Callable<ObservableSource<? extends T>>() { @Override public ObservableSource<? extends T> call() throws Exception { return Observable.just(T); } });
5.通過interval方法建立(定時執行onNext方法,只能<Long>型別,onNext實參從0開始,每次+1)
Observable<Long> observable = Observable.interval(5, TimeUnit.SECONDS); //定時5秒執行onNext,第1個引數為數值,第2個引數為單位,這裡為秒
6.通過range方法建立(執行指定次數的onNext方法,只能<Integer>型別,onNext實參從開始值開始,每次+1)
Observable<Integer> observable = Observable.range(開始值, 次數);
7.通過rangeLong方法建立(執行指定次數的onNext方法,只能<Long>型別,onNext實參從開始值開始,每次+1)
Observable<Long> observable = Observable.rangeLong(開始值, 次數);
8.通過timer方法建立(延時指定時間執行一次onNext方法,只能<Long>型別,onNext實參為0)
Observable<Long> observable = Observable.timer(5, TimeUnit.SECONDS); //延時5秒執行一次onNext,第1個引數為延時時間,第2個引數為單位,這裡為秒
9.通過repeat方法建立(重複執行onNext方法,<>可以為任意型別)
Observable<T> observable = Observable.just(T).repeat();
三、建立監聽(監聽,接收資料):
1.Observer方式(執行訂閱事件後,無錯誤時,依次執行onSubscribe、onNext、onComplete):
Observer<T> observer = new Observer<T>() {
/**
* 順序:1
**/
@Override
public void onSubscribe(Disposable d) {
}
/**
* 順序:2
**/
@Override
public void onNext(T arg) {
}
/**
* 錯誤回撥
**/
@Override
public void onError(Throwable e) {
}
/**
* 順序:3
**/
@Override
public void onComplete() {
}
};
2.Consumer方式(執行訂閱事件後,accept方法會被執行):
Consumer consumer = new Consumer<T>() {
@Override
public void accept(T arg) throws Exception {
}
};
四、執行訂閱事件(執行傳送資料動作):
1.執行事件:
observable.subscribe(observer);
2.取消事件:
Observable<T> observable = ...;
(1)獲得Disposable物件有2種方式:
方式1:
Disposable disposable = observable.subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
}
});
方式2:
Disposable disposable = null;
Observer<T> observer = new Observer<T>() {
@Override
public void onSubscribe(Disposable d) {
disposable = d;
}
...
}
(2)取消事件
disposable.dispose();
五、操作符的使用:
1.map方法(型別轉換,將原資料型別轉換為目標型別):
(1)建立Observable時呼叫map轉換資料:
//A和B是2個不同的類
Observable<B> observable = Observable.just(new A()).map(new Function<A, B>() {
@Override
public B apply(A a) throws Exception {
//資料轉換邏輯
B b = ...;
return b;
}
});
(2)在監聽中接收的是轉換後的資料型別:
Observer<B> observer = new Observer<B>() {
...
@Override
public void onNext(B b) {
}
};
observable.subscribe(observer);
2.flatMap方法(與fromIterable配合使用,將列表轉換為單個例項,呼叫size次onNext方法):
List<T> list = ...;
Observable<T> observable = Observable.just(list).flatMap(new Function<List<T>, ObservableSource<T>>() {
@Override
public ObservableSource<T> apply(List<T> list) throws Exception {
return Observable.fromIterable(list); //onNext方法會呼叫size次,每次收到的實參都是T的單個物件
}
});
Observer<B> observer = ...;
observable.subscribe(observer);
3.filter方法(過濾器,test中判斷條件,返回true才執行onNext):
Observable<T> observable = ...;
observable.filter(new Predicate<T>() {
@Override
public boolean test(T t) throws Exception {
if (條件) {
return true; //返回true執行下一步,呼叫onNext
}
return false; //返回false不會執行onNext
}
});
Observer<B> observer = ...;
observable.subscribe(observer);
4.take方法(指定onNext方法執行次數):
Observable<T> observable = ...;
observable.take(1).subscribe(...);
5.doOnNext方法(會在onNext方法之前執行)
Observable<T> observable = ...;
observable.doOnNext(new Consumer<T>() {
@Override
public void accept(T t) throws Exception { //此方法會在onNext之前執行
}
}).subscribe(...);
六、執行緒排程:
1.說明:
subscribeOn:設定Observable中任務的執行緒是哪種方式
observeOn:設定Observer中任務的執行在哪個執行緒
Schedulers.newThread():啟動一個新的執行緒
Schedulers.io():內部使用了無上限的執行緒池
Schedulers.computation():預設排程器,內部使用了固定的執行緒池
Schedulers.single():單執行緒
Schedulers.trampoline():按順序執行佇列中的任務
2.使用:
Observable<Integer> observable = ...;
observable.subscribeOn(Schedulers.io()) //控制Observable中subscribe方法在子執行緒中執行
.observeOn(AndroidSchedulers.mainThread()) //控制Observer中的onNext、onError、onComplete方法在主執行緒(UI執行緒)執行
.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Integer arg) {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
七、Backpressure策略:
1.Backpressure有以下幾種模式(預設只能存128個事件的快取池):
ERROR:快取池溢位時,丟擲MissingBackpressureException異常
BUFFER:設定更大的快取池
DROP:超上限時丟棄掉
LATEST:同DROP,區別是最後一個事件能收到
2.使用:
(1)使用Flowable替換Observable
Flowable<T> flowable = Flowable.create(new FlowableOnSubscribe<T>() {
@Override
public void subscribe(FlowableEmitter<T> fe) throws Exception {
fe.onNext(t);
fe.onComplete();
}
}, BackpressureStrategy.ERROR); //此引數設定Backpressure策略
(2)使用Subscriber替換Observer,並在onSubscribe中呼叫Subscription.request()申請事件數量
Subscriber<T> subscriber = new Subscriber<T>() {
@Override
public void onSubscribe(Subscription s) {
s.request(Long.MAX_VALUE); //表示向生產者申請可以消費的事件數量
}
...
};
(3)執行訂閱事件
flowable.subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()).subscribe(subscriber);