1. 程式人生 > >Rxjava 的過濾操作符

Rxjava 的過濾操作符

public class RxFilterActivity extends AppCompatActivity {
    private final static String TAG = RxFilterActivity.class.getSimpleName();
@Override
protected void onCreate(@Nullable Bundle savedInstanceState) {
        super.onCreate(savedInstanceState);
setContentView(R.layout.activity_main);
// 根據指定條件過濾事件 有
filter() ofType() skip() // skipLast() distinct() distinctUntilChanged() /** * 作用:過濾特定的事件,根據書寫的判斷條件過濾 */ filter(); /** * 作用:過濾特定的資料型別 */ ofType(); /** * 跳過某個事件 */ skipAndSkipLast(); /** * 過濾事件中重複的事件/連續重複的事件 distinct / distinctUntilChanged */ distinct(); //根據指定事件數量過濾事件 有
take() takeLast() /** * 通過設定指定事件的數量,只發送特定數量的事件 * 指定觀察者最多能接收到的事件數量 */ take(); /** * 指定觀察者只能接收到被觀察者傳送的最後幾個事件 */ takeLast(); //根據指定時間過濾事件 有throttleFirst() throttleLast() sample() //throttleWithTimeout debounce() /** * 在某段時間內,只發送該段時間內第1次事件 / 最後1次事件 */ throttleFirst(); throttleLast();
/** * 在某段時間內,只發送該段時間內最新(最後)1次事件 * simple() throttleLast 相似,此處就不做詳細介紹了 */ /** * 傳送資料事件時,若兩次傳送事件的間隔 < 指定時間,就會丟棄前一次的資料,直到指定 * 時間內都沒有新資料發射時才會傳送後一次的資料 */ throttleWithTimeout(); debounce(); //根據指定事件位置過濾事件 有firstElement() lastElement() elementAt() elementAtOrError() /** * 僅選取第1個元素 / 最後一個元素 */ firstElement(); lastElement(); /** * 指定接收某個元素(通過 索引值 確定) * 注:允許越界,即獲取的位置索引 > 傳送事件序列長度 */ elementAt(); /** * elementAt()的基礎上, * 當出現越界情況(即獲取的位置索引 > 傳送事件序列長度)時,即丟擲異常 */ elementAtOrError(); } private void elementAtOrError() { Log.e(TAG,"-----------------------elementAtOrError-----------------------"); Observable.just(1,2,3,4,5) .elementAtOrError(6) .subscribe(new Consumer<Integer>() { @Override public void accept(Integer integer) throws Exception { Log.e(TAG,"這是個玩笑,不會走到這的"); } }); } private void elementAt() { Log.e(TAG,"-----------------------elementAt-----------------------"); Observable.create(getSource()) .elementAt(2) .subscribe(new Consumer<Integer>() { @Override public void accept(Integer integer) throws Exception { Log.e(TAG,"事件通過的value = " + integer); } }); Observable.create(getSource()) .elementAt(12,-10) .subscribe(new Consumer<Integer>() { @Override public void accept(Integer integer) throws Exception { Log.e(TAG,"事件通過的value = " + integer); } }); } private void lastElement() { Log.e(TAG,"-----------------------lastElement-----------------------"); Observable.just(1,2,3,4,5) .lastElement() .subscribe(new Consumer<Integer>() { @Override public void accept(Integer integer) throws Exception { Log.e(TAG," lastElement 通過的事件value = " + integer); } }); } private void firstElement() { Log.e(TAG,"-----------------------firstElement-----------------------"); Observable.just(1,2,3,4,5,6) .firstElement() .subscribe(new Consumer<Integer>() { @Override public void accept(Integer integer) throws Exception { Log.e(TAG," firstElement 通過的事件value = " + integer); } }); } private void debounce() { Log.e(TAG,"-----------------------debounce-----------------------"); Observable.create(getSource()) .debounce(2,TimeUnit.SECONDS,Schedulers.io()) .subscribe(getObserver()); } private void throttleWithTimeout() { Log.e(TAG,"-----------------------throttleWithTimeout-----------------------"); Observable.create(getSource()) //1秒中採用資料 .throttleWithTimeout(1,TimeUnit.SECONDS,Schedulers.io()) .subscribe(getObserver()); } private void throttleLast() { Log.e(TAG,"-----------------------throttleLast-----------------------"); Observable.create(getSource()).observeOn(Schedulers.io()) .throttleLast(1, TimeUnit.SECONDS) .subscribe(getObserver()); } @NonNull private Observer<Integer> getObserver() { return new Observer<Integer>() { @Override public void onSubscribe(Disposable d) { Log.e(TAG,"訂閱成功,準備接受事件"); } @Override public void onNext(Integer integer) { Log.e(TAG,"接受到的事件 value = " + integer); } @Override public void onError(Throwable e) { Log.e(TAG,"事件報錯"+e.getMessage()); } @Override public void onComplete() { Log.e(TAG,"事件傳送完成"); } }; } @NonNull private ObservableOnSubscribe<Integer> getSource() { return new ObservableOnSubscribe<Integer>() { @Override public void subscribe(ObservableEmitter<Integer> e) throws Exception { e.onNext(1); Thread.sleep(300); e.onNext(2); Thread.sleep(300); e.onNext(3); Thread.sleep(300); e.onNext(4); Thread.sleep(300); e.onNext(5); Thread.sleep(300); e.onNext(6); Thread.sleep(300); e.onNext(7); Thread.sleep(300); e.onNext(8); Thread.sleep(300); e.onNext(9); Thread.sleep(300); e.onComplete(); } }; } private void throttleFirst() { Log.e(TAG,"-----------------------throttleFirst-----------------------"); Observable.create(getSource()).observeOn(Schedulers.io()) .throttleFirst(1, TimeUnit.SECONDS) .subscribe(getObserver()); } private void takeLast() { Log.e(TAG,"-----------------------takeLast-----------------------"); Observable.just(1,2,3,4,5,6,7,8) .takeLast(4) .subscribe(new Consumer<Integer>() { @Override public void accept(Integer integer) throws Exception { Log.e(TAG,"通過takeLast過濾事件,接收被觀察者最後傳送的幾個事件,通過value = " + integer); } }); } private void take() { Log.e(TAG,"-----------------------take-----------------------"); Observable.just(1,2,3,4,5,6,7,8) .take(5) .subscribe(new Consumer<Integer>() { @Override public void accept(Integer integer) throws Exception { Log.e(TAG,"通過take過濾傳送事件的數量,通過value = " + integer); } }); } private void distinct() { Log.e(TAG,"-----------------------distinct-----------------------"); Observable.just(1,2,3,1,2,3) .distinct() .subscribe(new Consumer<Integer>() { @Override public void accept(Integer integer) throws Exception { Log.e(TAG,"通過distinct過濾掉資料來源中重複的資料 通過的value = " + integer); } }); Log.e(TAG,"-----------------------distinctUntilChanged-----------------------"); Observable.just(1,2,3,3,3,1,2,3) .distinctUntilChanged() .subscribe(new Consumer<Integer>() { @Override public void accept(Integer integer) throws Exception { Log.e(TAG,"通過distinctUntilChanged過濾掉資料來源中連續重複的資料,通過的value = " + integer); } }); } private void skipAndSkipLast() { Log.e(TAG,"-----------------------skipAndSkipLast-----------------------"); Observable.just(1,2,3,4,5,6) //跳過正序的第一項 .skip(1) //跳過正序的最後兩項 .skipLast(2) .subscribe(new Consumer<Integer>() { @Override public void accept(Integer integer) throws Exception { Log.e(TAG,"通過skipskipLast跳過事件,通過的為value = " + integer); } }); } private void ofType() { Log.e(TAG,"-----------------------ofType-----------------------"); Observable.just(1,"abc",2d,3f,4L,'a',2) .ofType(Integer.class) .subscribe(new Consumer<Integer>() { @Override public void accept(Integer s) throws Exception { Log.e(TAG,"通過ofType過濾事件,通過的 value = " + s); } }); } private void filter() { Log.e(TAG,"-----------------------filter-----------------------"); Observable.just(1,2,3,4,5) .filter(new Predicate<Integer>() { @Override public boolean test(Integer integer) throws Exception { return integer > 3; } }).subscribe(new Consumer<Integer>() { @Override public void accept(Integer integer) throws Exception { Log.e(TAG,"通過Filter過濾事件,通過的 value = " + integer); } }); } }