RxJava過濾操作符filter、elementAt、distinct、skip、take、ignoreElements、throttleFirst, buffer
阿新 • • 發佈:2018-11-21
原 https://blog.csdn.net/qq_36523667/article/details/78761470 Android函式響應式程式設計——必學的RxJava過濾操作符filter、elementAt、distinct、skip、take、ignoreElements、throttleFirst 2017年12月09日 21:40:00 閱讀數:370 之前採用的都是分開的寫法,現在想想還是寫在一起好。 1.filter:例子中就是過濾大於2的 rx.Observable.just(1,2,3,4).filter(new Func1<Integer, Boolean>() { @Override public Boolean call(Integer integer) { return integer > 2; } }).subscribe(new Action1<Integer>() { @Override public void call(Integer integer) { Log.i("xbh", integer + ""); } }); 輸出: 12-09 05:28:52.499 15381-15381/com.hdu.a15058124.homework3 I/xbh: 3 12-09 05:28:52.499 15381-15381/com.hdu.a15058124.homework3 I/xbh: 4 2.elementAt:例子中是取得第3個 rx.Observable.just(1,2,3,4).elementAt(2).subscribe(new Action1<Integer>() { @Override public void call(Integer integer) { Log.i("xbh", integer + ""); } }); 3.distinct:例子中是把重複的只輸出1次 rx.Observable.just(1,2,2,3,4,1,1,1,1).distinct().subscribe(new Action1<Integer>() { @Override public void call(Integer integer) { Log.i("xbh", integer + ""); } }); 輸出: 12-09 05:34:56.823 21013-21013/com.hdu.a15058124.homework3 I/xbh: 1 12-09 05:34:56.823 21013-21013/com.hdu.a15058124.homework3 I/xbh: 2 12-09 05:34:56.823 21013-21013/com.hdu.a15058124.homework3 I/xbh: 3 12-09 05:34:56.824 21013-21013/com.hdu.a15058124.homework3 I/xbh: 4 4.skip、take:例子中skip就是從第3個開始取包括第三個,take就是取第3個之前的不包括第3個 rx.Observable.just(1,2,3,4,5,6,7,8).skip(2).subscribe(new Action1<Integer>() { @Override public void call(Integer integer) { Log.i("xbh", integer + ""); } }); 輸出: 12-09 05:36:47.594 22769-22769/com.hdu.a15058124.homework3 I/xbh: 3 12-09 05:36:47.594 22769-22769/com.hdu.a15058124.homework3 I/xbh: 4 12-09 05:36:47.594 22769-22769/com.hdu.a15058124.homework3 I/xbh: 5 12-09 05:36:47.594 22769-22769/com.hdu.a15058124.homework3 I/xbh: 6 12-09 05:36:47.594 22769-22769/com.hdu.a15058124.homework3 I/xbh: 7 12-09 05:36:47.594 22769-22769/com.hdu.a15058124.homework3 I/xbh: 8 rx.Observable.just(1,2,3,4,5,6,7,8).take(2).subscribe(new Action1<Integer>() { @Override public void call(Integer integer) { Log.i("xbh", integer + ""); } }); 輸出: 12-09 05:39:09.274 24985-24985/? I/xbh: 1 12-09 05:39:09.274 24985-24985/? I/xbh: 2 5.ignoreElements:忽略1,2,3,4,即忽略他的資料元素(也可以理解為onNext事件),另外的都保留。 rx.Observable.just(1,2,3,4).ignoreElements().subscribe(new Subscriber<Integer>() { @Override public void onCompleted() { Log.i("xbh", "任務佇列全部完成"); } @Override public void onError(Throwable e) { Log.i("xbh", "出錯了"); } @Override public void onNext(Integer s) { Log.i("xbh", "觸發的事件:" + s); } @Override public void onStart() { Log.i("xbh", "開始"); } }); 輸出: 12-09 05:47:30.009 32525-32525/? I/xbh: 開始 12-09 05:47:30.009 32525-32525/? I/xbh: 任務佇列全部完成 6.throttleFirst:觀察者每過一段時間向被觀察者取得一串onNext,但是他只要第一個,另外的全部忽略。 這個例子中第一次當i=0的時候,本來應該執行這個觀察者的接受到資料的邏輯了,但是200ms還沒過,所以當然不執行。 接下來for迴圈休息了100ms,100ms後,有兩個onNext了,一個裡面的值是0,一個是1。但是現在是100ms還是沒到200ms。 for迴圈又休息了100ms,終於到了200ms,但是0,1他只取了第一個,那就是0。 接下來以此類推。 rx.Observable.create(new rx.Observable.OnSubscribe<Integer>() { @Override public void call(Subscriber<? super Integer> subscriber) { for (int i = 0; i < 10; i ++) { subscriber.onNext(i); try { Thread.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } } subscriber.onCompleted(); } }).throttleFirst(200, TimeUnit.MILLISECONDS).subscribe(new Action1<Integer>() { @Override public void call(Integer integer) { Log.i("xbh", integer + ""); } }); 輸出: 12-09 06:22:36.428 406-406/com.hdu.a15058124.homework3 I/xbh: 0 12-09 06:22:36.629 406-406/com.hdu.a15058124.homework3 I/xbh: 2 12-09 06:22:36.831 406-406/com.hdu.a15058124.homework3 I/xbh: 4 12-09 06:22:37.032 406-406/com.hdu.a15058124.homework3 I/xbh: 6 12-09 06:22:37.234 406-406/com.hdu.a15058124.homework3 I/xbh: 8 7.throttleWithTimeout:這個就是和上面有一點點區別。都是200ms執行一次方法,但是這裡的200ms是根據你給他資料的時間開始計時的,如果200ms內你又給了他一個數據,那麼之前的資料就會被拋棄,並且重新開始計時。 這個例子中:第一次i=0。0%3=0。所以for迴圈會睡眠300ms,也就是說在200ms內沒有第二個數進來破壞這一切,所以0成功輸出。 但是i=1的時候,for迴圈只會睡眠100ms了,那麼1被扔進來後,100ms內又扔了個2進來,所以1被拋棄了,2還被留在這裡,又重新開始計時。 也就是說,如果你每個都只睡100ms,那麼一個數都不會被輸出來。 rx.Observable.create(new rx.Observable.OnSubscribe<Integer>() { @Override public void call(Subscriber<? super Integer> subscriber) { for (int i = 0; i < 10; i ++) { subscriber.onNext(i); int sleep = 100; if (i % 3 == 0) { sleep = 300; } try { Thread.sleep(sleep); } catch (InterruptedException e) { e.printStackTrace(); } } subscriber.onCompleted(); } }).throttleWithTimeout(200, TimeUnit.MILLISECONDS).subscribe(new Action1<Integer>() { @Override public void call(Integer integer) { Log.i("xbh", integer + ""); } }); 輸出: 12-09 06:18:20.260 28966-29004/? I/xbh: 0 12-09 06:18:20.761 28966-29004/com.hdu.a15058124.homework3 I/xbh: 3 12-09 06:18:21.265 28966-29004/com.hdu.a15058124.homework3 I/xbh: 6 12-09 06:18:21.768 28966-29004/com.hdu.a15058124.homework3 I/xbh: 9 每個都睡100ms的輸出: 12-09 06:26:08.757 3982-3982/com.hdu.a15058124.homework3 I/xbh: 9 buffer操作符的功能: 1:能一次性集齊多個結果到列表中,訂閱後自動清空相應結果,直到完全清除 2: 也可以週期性的集齊多個結果到列表中,訂閱後自動清空相應結果,直到完全清除 buffer 例子1: 一次訂閱2個 //一次訂閱2個 Observable.range(1,5).buffer(2).subscribe(new Observer<List<Integer>>() { @Override public void onCompleted() { LogUtils.d("-----------------onCompleted:"); } @Override public void onError(Throwable e) { LogUtils.d("----------------->onError:"); } @Override public void onNext(List<Integer> strings) { LogUtils.d("----------------->onNext:" + strings); } }); 顯示結果: 02-20 15:52:16.433 15913-15913/com.rxandroid.test1 D/----->: ----------------->onNext:[1, 2] 02-20 15:52:16.433 15913-15913/com.rxandroid.test1 D/----->: ----------------->onNext:[3, 4] 02-20 15:52:16.433 15913-15913/com.rxandroid.test1 D/----->: ----------------->onNext:[5] 02-20 15:52:16.433 15913-15913/com.rxandroid.test1 D/----->: -----------------onCompleted: 例子2:一次全部訂閱 //一次全部訂閱 Observable.range(1,5).buffer(5).subscribe(new Observer<List<Integer>>() { @Override public void onCompleted() { LogUtils.d("-----------------onCompleted:"); } @Override public void onError(Throwable e) { LogUtils.d("----------------->onError:"); } @Override public void onNext(List<Integer> strings) { LogUtils.d("----------------->onNext:" + strings); } }); 結果: 02-20 15:54:56.423 21917-21917/com.rxandroid.test1 D/----->: ----------------->onNext:[1, 2, 3, 4, 5] 02-20 15:54:56.423 21917-21917/com.rxandroid.test1 D/----->: -----------------onCompleted: 例子3:每次剔除一個 //每次剔除1個 Observable.range(1, 5).buffer(5, 1).subscribe(new Observer<List<Integer>>() { @Override public void onCompleted() { LogUtils.d("-----------------onCompleted:"); } @Override public void onError(Throwable e) { LogUtils.d("----------------->onError:"); } @Override public void onNext(List<Integer> strings) { LogUtils.d("----------------->onNext:" + strings); } }); 02-20 16:05:53.323 26556-26556/com.rxandroid.test1 D/----->: ----------------->onNext:[1, 2, 3, 4, 5] 02-20 16:05:53.323 26556-26556/com.rxandroid.test1 D/----->: ----------------->onNext:[2, 3, 4, 5] 02-20 16:05:53.323 26556-26556/com.rxandroid.test1 D/----->: ----------------->onNext:[3, 4, 5] 02-20 16:05:53.323 26556-26556/com.rxandroid.test1 D/----->: ----------------->onNext:[4, 5] 02-20 16:05:53.323 26556-26556/com.rxandroid.test1 D/----->: ----------------->onNext:[5] 02-20 16:05:53.323 26556-26556/com.rxandroid.test1 D/----->: -----------------onCompleted: 注意當skip==count的時候,框架認為同一操作,一次就清除了count個元素 Observable.range(1, 5).buffer(5, 5).subscribe(new Observer<List<Integer>>() { @Override public void onCompleted() { LogUtils.d("-----------------onCompleted:"); } @Override public void onError(Throwable e) { LogUtils.d("----------------->onError:"); } @Override public void onNext(List<Integer> strings) { LogUtils.d("----------------->onNext:" + strings); } }); 02-20 16:09:24.343 14991-14991/com.rxandroid.test1 D/----->: ----------------->onNext:[1, 2, 3, 4, 5] 02-20 16:09:24.343 14991-14991/com.rxandroid.test1 D/----->: -----------------onCompleted: 例子4:週期性訂閱多個結果 Observable.create(new Observable.OnSubscribe<String>() { @Override public void call(Subscriber<? super String> subscriber) { if (subscriber.isUnsubscribed()) return; while (true) { subscriber.onNext("訊息" + SystemClock.elapsedRealtime()); SystemClock.sleep(2000);//每隔2s傳送訊息 } } }).subscribeOn(Schedulers.io()). buffer(3, TimeUnit.SECONDS).//每隔3秒 取出訊息 subscribe(new Observer<List<String>>() { @Override public void onCompleted() { LogUtils.d("-----------------onCompleted:"); } @Override public void onError(Throwable e) { LogUtils.d("----------------->onError:"); } @Override public void onNext(List<String> strings) { LogUtils.d("----------------->onNext:" + strings); } }); 02-20 16:55:33.283 17087-18151/com.rxandroid.test1 D/----->: ----------------->onNext:[訊息370507667, 訊息370509668] 02-20 16:55:36.323 17087-18151/com.rxandroid.test1 D/----->: ----------------->onNext:[訊息370511668] 02-20 16:55:39.303 17087-18151/com.rxandroid.test1 D/----->: ----------------->onNext:[訊息370513669, 訊息370515669] 02-20 16:55:54.883 23122-23316/com.rxandroid.test1 D/----->: ----------------->onNext:[訊息370529168, 訊息370531172] 02-20 16:55:57.863 23122-23316/com.rxandroid.test1 D/----->: ----------------->onNext:[訊息370533184] 02-20 16:56:00.883 23122-23316/com.rxandroid.test1 D/----->: ----------------->onNext:[訊息370535184, 訊息370537184] 02-20 16:56:03.863 23122-23316/com.rxandroid.test1 D/----->: ----------------->onNext:[訊息370539184] 02-20 16:56:06.863 23122-23316/com.rxandroid.test1 D/----->: ----------------->onNext:[訊息370541185, 訊息370543204] 02-20 16:56:09.863 23122-23316/com.rxandroid.test1 D/----->: ----------------->onNext:[訊息370545204] 02-20 16:56:12.863 23122-23316/com.rxandroid.test1 D/----->: ----------------->onNext:[訊息370547204, 訊息370549204] 02-20 16:56:15.863 23122-23316/com.rxandroid.test1 D/----->: ----------------->onNext:[訊息370551204]