7章 RxJava高階用法(一)
CSDN學院課程地址
- RxJava2從入門到精通-初級篇:https://edu.csdn.net/course/detail/10036
- RxJava2從入門到精通-中級篇:https://edu.csdn.net/course/detail/10037
- RxJava2從入門到精通-進階篇:https://edu.csdn.net/course/detail/10038
- RxJava2從入門到精通-原始碼分析篇:https://edu.csdn.net/course/detail/10138
7. RxJava高階用法(一)
7.1 自定義Operator
自定義Operator屬於RxJava的高階用法,可以自己自定義一些適用於常見應用場景的操作符。實現自定義Operator很簡單,只需要實現RxJava提供的ObservableOperator
lift
操作符將自定義操作符應用到我們的程式中。下面我們使用自定義Operator,該操作符的作用是將List
集合轉換成String
型別的輸出
1、實現ObservableOperator,建立自定義Operator
public class CustomOperator implements ObservableOperator<String, List<String>> { @Override public Observer<? super List<String>> apply(final Observer<? super String> observer) throws Exception { return new Observer<List<String>>() { @Override public void onSubscribe(Disposable d) { observer.onSubscribe(d); } @Override public void onNext(List<String> strings) { observer.onNext(strings.toString()); } @Override public void onError(Throwable e) { observer.onError(e); } @Override public void onComplete() { observer.onComplete(); } }; } }
2、使用lift操作符新增自定義Operator
public class Main { public static void main(String[] args) { //建立被觀察者 Observable.create(new ObservableOnSubscribe<List<String>>() { @Override //預設在主執行緒裡執行該方法 public void subscribe(@NonNull ObservableEmitter<List<String>> e) throws Exception { ArrayList<String> list = new ArrayList<>(); list.add("1"); list.add("2"); list.add("3"); list.add("4"); e.onNext(list); e.onComplete(); } }) .lift(new CustomOperator()) .subscribe(new Observer<String>() { @Override public void onSubscribe(Disposable d) { } @Override public void onNext(String s) { System.out.println("onNext=" + s); } @Override public void onError(Throwable e) { } @Override public void onComplete() { System.out.println("onComplete"); } }); } }
3、輸出結果
onNext=[1, 2, 3, 4]
onComplete
7.2 自定義Transformer
自定義Transformer表示一個批量操作符的變換器,如果你在很多Observable中使用相同的一系列操作符,比如每次都要使用到map
+take
+doOnNext
等操作,那麼就可以定義一個通用的Transformer物件,裡面可以將需要重複用到的操作符打包成Transformer物件,使用compose操作符將Transformer物件應用到我們的Observable上即可
實現自定義Transformer很簡單,只需要實現RxJava提供的ObservableTransformer
介面,實現對應的功能即可,同時,使用compose
操作符將自定義Transformer應用到我們的程式中。下面我們使用自定義Transformer,該Transformer的作用是將發射的資料從Integer
轉換成String
,並取2個數據項,同時在發射的時候監聽發射事件,進行輸出的列印
1、實現ObservableTransformer,建立自定義Transformer
public class CustomTransformer implements ObservableTransformer<Integer, String> {
@Override
public ObservableSource<String> apply(io.reactivex.Observable<Integer> upstream) {
return upstream.take(2).map(new Function<Integer, String>() {
@Override
public String apply(Integer integer) throws Exception {
return "序號:" + integer + "發射成功";
}
}).doOnNext(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
System.out.println(s + ",準備發射");
}
});
}
}
2、使用compose操作符新增自定義Transformer
public class Main {
public static void main(String[] args) {
//建立被觀察者
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
emitter.onNext(1);
emitter.onNext(2);
emitter.onNext(3);
}
})
.compose(new CustomTransformer())
.subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(String s) {
System.out.println(s);
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
}
}
3、輸出結果
序號:1發射成功,準備發射
序號:1發射成功
序號:2發射成功,準備發射
序號:2發射成功
在安卓開發中,通常我們也會自定義Transformer來實現我們常用的執行緒切場景,具體如下
public static <T> ObservableTransformer<T, T> schedulersTransformer() {
return new ObservableTransformer<T, T>() {
@Override
public ObservableSource<T> apply(@NonNull Observable<T> upstream) {
return upstream.subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread());
}
};
}
public static <T> FlowableTransformer<T, T> schedulersTransformerForFlowable() {
return new FlowableTransformer<T, T>() {
@Override
public Publisher<T> apply(Flowable<T> upstream) {
return upstream.subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread());
}
};
}
7.3 自定義Plugin
自定義Plugin表示自定義外掛,自定義外掛可以在RxJavaPlugins中提供的介面中去插入自己的一段程式碼操作,類似於面向切面程式設計,或者理解成Android的Hook。如果你需要在所有的訂閱事件中去插入一段統一的操作,或者是監聽所有訂閱事件發生異常時的回撥,都可以使用自定義外掛。在實際應用中,目前並未發現有什麼作用
實現自定義Plugin只需要呼叫RxJavaPlugins提供的set方法即可,下面我們通過例子輸出Observable和Observer的地址資訊,來驗證每次訂閱的時候,回撥自定義Plugin的方法中,外掛物件和源物件是否為同一個物件
1、通過設定ObservableSubscribe,每次對Observable操作的時候回撥
public class Main {
public static void main(String[] args) {
RxJavaPlugins.setOnObservableAssembly(new CustomObservableAssembly());//任意操作符都有回撥
RxJavaPlugins.setOnObservableSubscribe(new CustomObservableSubscribe());//每次subscribe時候有回撥
Observable observable = getObservable();
Observer<Integer> observer = getObserver();
System.out.println("main observable.toString:" + observable.toString());
System.out.println("main observer.toString:" + observer.toString());
observable.subscribe(observer);
}
public static Observable getObservable() {
return Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
emitter.onNext(5);
emitter.onNext(2);
emitter.onNext(3);
}
});
}
public static Observer<Integer> getObserver() {
return new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Integer integer) {
System.out.println("onNext=" + integer);
}
@Override
public void onError(Throwable e) {
System.out.println(e.getMessage());
}
@Override
public void onComplete() {
}
};
}
}
2、CustomObservableAssembly
public class CustomObservableAssembly implements Function<Observable, Observable> {
@Override
public Observable apply(Observable observable) throws Exception {
System.out.println("CustomObservableAssembly observable.toString:" + observable.toString());
observable.take(2);
return observable;
}
}
3、CustomObservableSubscribe
public class CustomObservableSubscribe implements BiFunction<Observable, Observer, Observer> {
@Override
public Observer apply(Observable observable, Observer observer) throws Exception {
System.out.println("CustomObservableSubscribe observable.toString:" + observable.toString() + ",observer.toString:" + observer.toString());
return observer;
}
}
4、輸出結果
地址相同說明是同個物件,自定義外掛Hook成功
CustomObservableAssembly observable.toString:[email protected]a7ca
main observable.toString:[email protected]a7ca
main observer.toString:[email protected]
CustomObservableSubscribe observable.toString:[email protected]a7ca,observer.toString:[email protected]
onNext=5
onNext=2
onNext=3
補充:
可以通過設定ErrorHandler,發生異常時會回撥
RxJavaPlugins.setErrorHandler();
可以通過設定SchedulerHandler來Hook到對應的schedule
RxJavaPlugins.setIoSchedulerHandler();
RxJavaPlugins.setNewThreadSchedulerHandler();
RxJavaPlugins.setComputationSchedulerHandler();
RxJavaPlugins.setSingleSchedulerHandler();
錯誤演示:
由於CustomObservableAssembly是在任意操作符操作的時候都會回撥,所以在回撥裡面是不可以對observable再進行操作符的操作,否則回撥裡面observable的操作符還是會回撥CustomObservableAssembly自身,導致死迴圈,發生StackOverflowError
public class CustomObservableAssembly implements Function<Observable, Observable> {
@Override
public Observable apply(Observable observable) throws Exception {
System.out.println("CustomObservableAssembly observable.toString:" + observable.toString());
observable.take(2);
return observable;
}
}
由於CustomObservableSubscribe是在subscribe之後進行的回撥,如果在回撥裡面對observable進行操作符的操作,這個時候是不會生效的,因為在subscribe之後onNext的函式是不會再處理後面新添的操作符,原理與原始碼有關
public class CustomObservableSubscribe implements BiFunction<Observable, Observer, Observer> {
@Override
public Observer apply(Observable observable, Observer observer) throws Exception {
System.out.println("CustomObservableSubscribe observable.toString:" + observable.toString() + ",observer.toString:" + observer.toString());
observable.take(2);
return observer;
}
}