1. 程式人生 > >RXJava 2.0 操作符的使用

RXJava 2.0 操作符的使用

public class RxjavaXU {
@Test
public void Flowable() throws Exception {
//背壓使用
Flowable.create(new FlowableOnSubscribe<Integer>() {
@Override
public void subscribe(FlowableEmitter<Integer> e) throws Exception {
for (int i = 0; i < 1000000000; i++) {
                    e.onNext(i);
                }
} //背壓的模式 /** * * MISSING 顧名思義,就是當你上游一股腦的拋下來很多東西的時候,我就只接一部分,其他的我都miss掉。 * ERROR 當下遊無法再繼續接受請求的時候會丟擲異常MissingBackpressureException 上游,你拼命拋吧,我這邊要是拿不下了,我就報警,老子不幹了! * BUFFER 上游不斷的發出onNext請求,直到下游處理完 上游,你拋吧,我就屯著,實在不行了,系統崩潰,咱倆同歸於盡
* DROP 如果下游已經溢位了,會丟棄掉溢位了的onNext請求。 上游,你就拼命拋吧,我這裡就拿那麼多,我處理完了我再去你最新的裡面拿 * LATEST 如果下游已經溢位了,會丟棄掉溢位了的onNext請求,但是會在內部快取一個最新的onNext請求,在下一次請求的時候會把這個最新的先返回出去。 有,任何形式的轉載都請聯絡作者獲得授權並註明出處。 */ }, BackpressureStrategy.BUFFER
).subscribe(new Subscriber<Integer>() { @Override public void onSubscribe(Subscription s) { s.request(100); } @Override public void onNext(Integer integer) { try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(integer); } @Override public void onError(Throwable t) { } @Override public void onComplete() { } }); } //---------轉換操作符-------------------------------------- @Test public void empty() throws Exception { Observable.empty() .subscribe(new Consumer<Object>() { @Override public void accept(Object o) throws Exception { System.out.println("直接呼叫"); } }); } @Test public void map() throws Exception { Observable.just("111") .map(new Function<String, Integer>() { @Override public Integer apply(String s) throws Exception { return 1; } }).subscribe(new Observer<Integer>() { @Override public void onSubscribe(Disposable d) { } @Override public void onNext(Integer bitmap) { System.out.println(bitmap); } @Override public void onError(Throwable e) { } @Override public void onComplete() { } }); } @Test public void flatMap() throws Exception { Observable.just("getToken", "login") .flatMap(new Function<String, ObservableSource<?>>() { @Override public ObservableSource<?> apply(String s) throws Exception { //執行完下面的以後才往下執行,沒有執行完不會往下執行 return creatRespence(s); } }).subscribe(new Observer<Object>() { @Override public void onSubscribe(Disposable d) { } @Override public void onNext(Object o) { System.out.println(o.toString()); } @Override public void onError(Throwable e) { } @Override public void onComplete() { } }); } private ObservableSource<?> creatRespence(final String s) { return Observable.create(new ObservableOnSubscribe<String>() { @Override public void subscribe(ObservableEmitter<String> e) throws Exception { Thread.sleep(2000); e.onNext("登入" + s); } }); } @Test public void groupBy() throws Exception { //分書的例子 Observable.just(1, 2, 3, 4, 5) .groupBy(new Function<Integer, String>() { @Override public String apply(Integer integer) throws Exception { return integer > 2 ? "A類書籍" : "B類書籍"; } }) .subscribe(new Consumer<GroupedObservable<String, Integer>>() { @Override public void accept(final GroupedObservable<String, Integer> stringIntegerGroupedObservable) throws Exception { stringIntegerGroupedObservable.subscribe(new Consumer<Integer>() { @Override public void accept(Integer integer) throws Exception { String key = stringIntegerGroupedObservable.getKey(); System.out.println(key + integer); } }); } }); } @Test public void range() throws Exception { //資料累加 Observable.range(1, 10) .scan(new BiFunction<Integer, Integer, Integer>() { @Override public Integer apply(Integer integer, Integer integer2) throws Exception { return integer + integer2; } }) .subscribe(new Observer<Integer>() { @Override public void onSubscribe(Disposable d) { } @Override public void onNext(Integer integer) { System.out.println(integer); } @Override public void onError(Throwable e) { } @Override public void onComplete() { } }); } @Test public void buffer() throws Exception { //如果你有10000條資料要插入資料庫,你慢慢插,時間太久,可以使用,多少條插入一次 Observable.just(1, 2, 3, 4, 5, 6) .buffer(5)//5條插入一次 .subscribe(new Observer<List<Integer>>() { @Override public void onSubscribe(Disposable d) { } @Override public void onNext(List<Integer> integers) { System.out.println(integers); } @Override public void onError(Throwable e) { } @Override public void onComplete() { } }); } //---------過濾操作符-------------------------------------- @Test public void Filter() throws Exception { Observable.just(1, 2, 3, 4, 5) //過濾操作符 .filter(new Predicate<Integer>() { @Override public boolean test(Integer integer) throws Exception { //大於2的才要 if (integer > 2) { return true; } return false; } }).subscribe(new Observer<Integer>() { @Override public void onSubscribe(Disposable d) { } @Override public void onNext(Integer integer) { System.out.println(integer); } @Override public void onError(Throwable e) { } @Override public void onComplete() { } }); } @Test public void take() throws Exception { //每隔一秒鐘 ,時間單位 //這個API要RXAndroid 才有效果 Observable.interval(1, TimeUnit.SECONDS) .take(5)//控制數量,超過5個執行onComplete 完成 .subscribe(new Observer<Long>() { @Override public void onSubscribe(Disposable d) { System.out.println("onSubscribe"); } @Override public void onNext(Long aLong) { System.out.println(aLong); } @Override public void onError(Throwable e) { } @Override public void onComplete() { System.out.println("完成"); } }); } @Test public void distinct() throws Exception { //去除重複的元素 Observable.just(1, 2, 2, 4, 5, 3, 4, 5, 6) .distinct()//去重 .subscribe(new Observer<Integer>() { @Override public void onSubscribe(Disposable d) { } @Override public void onNext(Integer integer) { System.out.println("--->" + integer); } @Override public void onError(Throwable e) { } @Override public void onComplete() { } }); } @Test public void elementAt() throws Exception { Observable.just(1, 2, 2, 4, 5, 3, 4, 5, 6) .elementAt(5)//過濾了重複的之後,只處理第五個 .subscribe(new Consumer<Integer>() { @Override public void accept(Integer integer) throws Exception { //本來是1,2,4,5,3,6 System.out.println(integer);//列印3 } }); } //------------條件操作符---------------------- @Test public void All() throws Exception { Observable.just(1, 2, 3, 4, 5) .all(new Predicate<Integer>() { @Override public boolean test(Integer integer) throws Exception { return integer > 2;//裡面的東西全部都大於2,有一個不滿足都false } }).subscribe(new Consumer<Boolean>() { @Override public void accept(Boolean aBoolean) throws Exception { System.out.println(aBoolean); } }); } @Test public void contains() throws Exception { Observable.just(1, 2, 3, 4, 5, 6) .contains(3)//裡面的資料是否有3,是就true // .any(new Predicate<Integer>() { // @Override // public boolean test(Integer integer) throws Exception { // return integer == 3;//與contains一樣的,裡面可以寫邏輯 // } // }) // .isEmpty()//是否有時間 .subscribe(new Consumer<Boolean>() { @Override public void accept(Boolean aBoolean) throws Exception { System.out.println(aBoolean); } }); } @Test public void skipWhile() throws Exception { //要在android下執行,才有效果 // 開始 結束 延時多久發 間隔多久執行下一個時間 Observable.intervalRange(0, 5, 1, 100, TimeUnit.MILLISECONDS) .skipWhile(new Predicate<Long>() { @Override public boolean test(Long aLong) throws Exception { return aLong < 2;//少於2,不下發處理,注意:如果是true,不下發處理 } })//調過一段資料 .subscribe(new Consumer<Long>() { @Override public void accept(Long aLong) throws Exception { System.out.println(aLong);//1,2小於上面的條件不處理,結果為:3,4,5 } }); } //-------------合併操作符------------------- @Test public void startWith() throws Exception { Observable.just(1, 2, 3, 4, 5, 6) //先發送他的資料,中間插入資料 .startWith(Observable.just(2, 3, 1, 2, 3, 9)) .subscribe(new Consumer<Integer>() { @Override public void accept(Integer integer) throws Exception { System.out.println(integer); } }) ; } @Test public void collect() throws Exception { //最多加入4個Observable,超過了就不行了 Observable.concat(Observable.just(1, 2, 3) , Observable.just(4, 5, 6)) .subscribe(new Consumer<Integer>() { @Override public void accept(Integer integer) throws Exception { System.out.println(integer); } }); } @Test public void merge() throws Exception { Observable<Integer> just1 = Observable.just(1, 2, 3); Observable<Integer> just2 = Observable.just(4, 5, 6); Observable<Integer> just3 = Observable.just(7, 8, 9); //按照裡面的順序執行 Observable.merge(just1, just2, just3).subscribe(new Consumer<Integer>() { @Override public void accept(Integer integer) throws Exception { System.out.println(integer);//123456789 } }); //下面延時,並行執行 需要android 執行 // Flowable.merge(Flowable.intervalRange(1, 5, 1000, 100, TimeUnit.MILLISECONDS), // Flowable.intervalRange(6, 10, 1000, 100, TimeUnit.MILLISECONDS)) // .subscribe(new Consumer<Long>() { // @Override // public void accept(Long o) throws Exception { // System.out.println(o); // } // }); } @Test public void mergeDelayError() throws Exception { //如果中途有發生錯誤,延時執行 Flowable.mergeDelayError( Flowable.create(new FlowableOnSubscribe<Publisher<?>>() { @Override public void subscribe(FlowableEmitter<Publisher<?>> e) throws Exception { e.onError(new NullPointerException());//延遲拋異常 } }, BackpressureStrategy.BUFFER) , Flowable.intervalRange(1, 5, 1000, 100, TimeUnit.MILLISECONDS)) .subscribe(new Consumer<Object>() { @Override public void accept(Object o) throws Exception { System.out.println(o.toString()); } }); } @Test public void zip() throws Exception { Flowable.zip( Flowable.just(1, 2, 3)// , Flowable.just(4, 5, 6) , Flowable.just(4, 3)//如果數量不同 , new Function3<Integer, Integer, Integer, Integer>() { @Override public Integer apply(Integer integer, Integer integer2, Integer integer3) throws Exception { return integer + integer2 + integer3; } }).subscribe(new Consumer<Integer>() { @Override public void accept(Integer integer) throws Exception { System.out.println(integer);//如果上面的資料長度不同,最後一個不處理 } }); } //-----錯誤操作符--------------- @Test public void onErrorALL() throws Exception { Flowable.create(new FlowableOnSubscribe<String>() { @Override public void subscribe(FlowableEmitter<String> e) throws Exception { for (int i = 0; i < 10; i++) { if (i == 2) { e.onError(new Throwable("出現錯誤")); } else { e.onNext(i + ""); } } } }, BackpressureStrategy.BUFFER) .onErrorReturn(new Function<Throwable, String>() { //當出現錯誤的時候可以返回一個結果彌補一下,然後立馬停止往下執行 @Override public String apply(Throwable throwable) throws Exception { Log.e("----", "處理錯誤資訊:" + throwable.toString()); //上面發生異常,返回一個彌補的結果 return "彌補錯誤的結果"; } }).subscribe(new Consumer<String>() { @Override public void accept(String s) throws Exception { Log.e("----", "正確的資料:" + s); } }, new Consumer<Throwable>() { @Override public void accept(Throwable throwable) throws Exception { Log.e("----", "出現錯誤的資料》》" + throwable.getMessage()); } }); Flowable.create(new FlowableOnSubscribe<String>() { @Override public void subscribe(FlowableEmitter<String> e) throws Exception { for (int i = 0; i < 10; i++) { if (i == 2) { e.onError(new Throwable("出現錯誤")); } else { e.onNext(i + ""); } } } }, BackpressureStrategy.BUFFER) .onErrorResumeNext(new Function<Throwable, Publisher<? extends String>>() { @Override public Publisher<? extends String> apply(Throwable throwable) throws Exception { Log.e("----", "處理錯誤資訊:" + throwable.toString()); //上面發生異常(使用下面的備用方案) return Flowable.just("從新定義1個發源", "從新定義2個發源", "從新定義3個發源"); } }).subscribe(new Consumer<String>() { @Override public void accept(String s) throws Exception { Log.e("----", "正確的資料:" + s); } }); Observable.create(new ObservableOnSubscribe<String>() { @Override public void subscribe(ObservableEmitter<String> e) throws Exception { for (int i = 0; i <= 5; i++) { if (i == 2) { //注意這裡是Exception e.onError(new Exception("出現錯誤了")); } else { e.onNext(i + ""); } try { Thread.sleep(1000); } catch (Exception ex) { ex.printStackTrace(); } } e.onComplete(); } }).onExceptionResumeNext(new Observable<String>() { @Override protected void subscribeActual(Observer<? super String> observer) { observer.onNext("替換出現錯誤的資訊"); observer.onComplete(); } }).subscribe(new io.reactivex.functions.Consumer<String>() { @Override public void accept(String s) throws Exception { Log.e("---", "--》》" + s); } }); Observable.create(new ObservableOnSubscribe<String>() { @Override public void subscribe(ObservableEmitter<String> e) throws Exception { for (int i = 0; i <= 9; i++) { if (i == 2) { e.onError(new Exception("出現錯誤了")); } else { e.onNext(i + ""); } try { Thread.sleep(1000); } catch (Exception ex) { ex.printStackTrace(); } } e.onComplete(); } }).subscribeOn(Schedulers.newThread()) // .retry(new Predicate<Throwable>() { // @Override // public boolean test(Throwable throwable) throws Exception { // Log.e("--", "retry錯誤: "+throwable.toString()); //// //返回假就是不讓重新發射資料了,呼叫觀察者的onError就終止了。 //// //返回真就是讓被觀察者重新發射請求 // return false; // } // }) // .retry(new BiPredicate<Integer, Throwable>() { // @Override // public boolean test(Integer integer, Throwable throwable) throws Exception { // //integer錯誤次數 // Log.e("--", "retry錯誤: "+integer); // return true; // } // }) //重連第幾次 .retry(3, new