1. 程式人生 > >Rxjava 在專案中的簡單使用

Rxjava 在專案中的簡單使用

防止重複點選

 RxView.clicks(mBinding.btclick).throttleFirst(2, TimeUnit.SECONDS)
                .subscribe(new Consumer<Object>() {
                    @Override
                    public void accept(Object o) throws Exception {
                        ToastUtils.showToast(TestActivity.this, "2秒內只能點選一次"
); } });

監聽輸入框內容狀態變化

  RxTextView.textChanges(mBinding.etTest).skip(5).subscribe(new Consumer<CharSequence>() {
            @Override
            public void accept(CharSequence charSequence) throws Exception {
                if(!TextUtils.isEmpty(charSequence.toString())){
                    ToastUtils.showToast(TestActivity.this
,"輸入框不為空"); }else{ ToastUtils.showToast(TestActivity.this,"輸入框為空"); } } });

這裡的skip(long count)裡面的引數的意思是當輸入框內容第一次到達count數量時候,開始進行輸入狀態監聽。

如果有需要監聽多個EditText的內容變化時候,可以這麼寫:

 Observable<CharSequence> ob1 = RxTextView.textChanges(mBinding.etTest1).skip(1
); Observable<CharSequence> ob2 = RxTextView.textChanges(mBinding.etTest2).skip(1); Observable.combineLatest(ob1, ob2, new BiFunction<CharSequence, CharSequence, Boolean>() { @Override public Boolean apply(CharSequence charSequence, CharSequence charSequence2) throws Exception { return charSequence.length()>5 && charSequence2.length()>10; } }).subscribe(new Consumer<Boolean>() { @Override public void accept(Boolean aBoolean) throws Exception { if(aBoolean) { Log.d("數量", "符合要求"); } } });

延遲任務

mBinding.btclick.setOnClickListener( v->{
            Observable.timer(3, TimeUnit.SECONDS).subscribeOn(Schedulers.io())
                    .observeOn(AndroidSchedulers.mainThread())
            .subscribe(new Consumer<Long>() {
                @Override
                public void accept(Long aLong) throws Exception {
                    ToastUtils.showToast(TestActivity.this,"3秒延遲執行");
                }
            });
        });

非同步填充集合

     private List<String> mList;
     private Disposable disposable;
disposable = Observable.create(new ObservableOnSubscribe<TaskItem>() {
            @Override
            public void subscribe(ObservableEmitter<TaskItem> e) throws Exception {
                for (int i = 0; i < 10; i++) {
                    TaskItem item = new TaskItem("123456789"+i,123456789,987654321, new Random().nextBoolean());
                    e.onNext(item);
                }
            }
        }).subscribeOn(Schedulers.newThread())           .observeOn(AndroidSchedulers.mainThread()).subscribe(new Consumer<TaskItem>() {
            @Override
            public void accept(TaskItem taskItem) throws Exception {
                mList.add(taskItem);
            }
        });

基本寫法

Observable.create(new ObservableOnSubscribe<Integer>() {
                @Override
                public void subscribe(ObservableEmitter<Integer> e) throws Exception {
                    for (int i = 0; i < 10; i++){
                        e.onNext(i);
                    }
                    e.onComplete();
                }
            }).subscribe(new Observer<Integer>() {
                @Override
                public void onSubscribe(Disposable d) {
                    Log.d("測試數量","disPosablle");

                }

                @Override
                public void onNext(Integer integer) {
                    Log.d("測試數量",integer.toString());
                }

                @Override
                public void onError(Throwable e) {
                    Log.d("測試數量",e.toString());

                }

                @Override
                public void onComplete() {
                    Log.d("測試數量","onComplete");

                }
            });

基本型別轉換(map)

/**
     * map
         * 基本型別轉換 一對一
         *  just(T...)將傳入的引數依次傳送
         *  from(T[])/from(Iterable<? extends T>)將傳入的陣列或者Iterable拆分成Java物件依次傳送
         */
 Observable.just(1, 2, 3, 4, 5)
                    .map(new Function<Integer, String>() {
                        @Override
                        public String apply(Integer integer) throws Exception {
                            return "This is" + integer;
                        }
                    }).subscribe(new Consumer<String>() {
                @Override
                public void accept(String s) throws Exception {
                    System.out.println(s);
                }
            });

遍歷集合取資料

 Observable.fromIterable(list).flatMap(new Function<TaskItem, ObservableSource<? extends String>>() {
            @Override
            public ObservableSource<? extends String> apply(TaskItem taskItem) throws Exception {
                return Observable.just(taskItem.getTaskNum());
            }
        }).subscribe(new Consumer<String>() {
            @Override
            public void accept(String s) throws Exception {
                System.out.println(s);
            }
        });

過濾分組

   {
            Observable.fromIterable(list).groupBy(new Function<TaskItem, Boolean>() {
                @Override
                public Boolean apply(TaskItem taskItem) throws Exception {
                    return taskItem.isChecked();
                }
            }).subscribe(new Consumer<GroupedObservable<Boolean, TaskItem>>() {
                @Override
                public void accept(GroupedObservable<Boolean, TaskItem> booleanTaskItemGroupedObservable) throws Exception {

                    if (booleanTaskItemGroupedObservable.getKey()){
                        booleanTaskItemGroupedObservable.subscribe(new Consumer<TaskItem>() {
                            @Override
                            public void accept(TaskItem taskItem) throws Exception {
                                System.out.print("boolean值" + booleanTaskItemGroupedObservable.getKey()+": ");
                                System.out.println(taskItem);
                            }
                        });
                    }else{
                        booleanTaskItemGroupedObservable.subscribe(new Consumer<TaskItem>() {
                            @Override
                            public void accept(TaskItem taskItem) throws Exception {
                                System.out.print("boolean值" + booleanTaskItemGroupedObservable.getKey()+": ");
                                System.out.println(taskItem);
                            }
                        });
                    }
            }
            });
        }

在consumer端再次訂閱的時候,相當於直接在上一次訂閱的基礎上繼續訂閱,在此之前中的方法不會執行,錯誤示例如下:

{
            Observable.fromIterable(list).groupBy(new Function<TaskItem, Boolean>() {
                @Override
                public Boolean apply(TaskItem taskItem) throws Exception {
                    return taskItem.isChecked();
                }
            }).subscribe(new Consumer<GroupedObservable<Boolean, TaskItem>>() {
                @Override
                public void accept(GroupedObservable<Boolean, TaskItem> booleanTaskItemGroupedObservable) throws Exception {
                    if (booleanTaskItemGroupedObservable.getKey()) {
                        booleanTaskItemGroupedObservable.subscribe(new Consumer<TaskItem>() {
                        //這裡列印只會執行一次 以後不會再次列印
                          System.out.print("boolean值" + booleanTaskItemGroupedObservable.getKey() + ": ");
                            @Override
                            public void accept(TaskItem taskItem) throws Exception {                            
                                System.out.println(taskItem);
                            }
                        });
                    } else {
                        booleanTaskItemGroupedObservable.subscribe(new Consumer<TaskItem>() {
                         System.out.print("boolean值" + booleanTaskItemGroupedObservable.getKey() + ": ");
                            @Override
                            public void accept(TaskItem taskItem) throws Exception {                               
                                System.out.println(taskItem);
                            }
                        });
                    }
                }
            });
        }

點選事件

 RxView.clicks(mBinding.btclick).subscribe(new Consumer<Object>() {
            @Override
            public void accept(Object s) throws Exception {
                ToastUtils.showToast(TestActivity.this,"這是點選事件");
            }
        });

這裡注意,Consumer裡的方法不能隨便更改,會報找不到方法的錯誤,後期我正常使用的時候,有新發現我會再補充。

過濾器(filter)

 RxView.clicks(mBinding.btclick).subscribe(v ->
        {
            Observable.fromIterable(list).filter(new Predicate<TaskItem>() {
                @Override
                public boolean test(TaskItem taskItem) throws Exception {
                    return taskItem.isChecked();
                }
            }).subscribe(new Consumer<TaskItem>() {
                @Override
                public void accept(TaskItem consumer) throws Exception {
                    System.out.println(consumer.getTaskNum());
                }
            });
        });

filter裡的匿名內部類的實現返回的是一個boolean值,這裡作為判斷依據篩選出需要被髮送的資料。

傳送前n個元素(take)

 RxView.clicks(mBinding.btclick).subscribe(v->{
            Observable.fromIterable(list).take(5).filter(new Predicate<TaskItem>() {
                @Override
                public boolean test(TaskItem taskItem) throws Exception {
                    return taskItem.isChecked();
                }
            }).subscribe(new Consumer<TaskItem>() {
                @Override
                public void accept(TaskItem con) throws Exception {
                    System.out.println(con.getTaskNum());
                }
            });
        });

分組(groupBy)

上一個示例是有點小問題的,但是也可以使用,排序上達不到要求,這裡補充一下

RxView.clicks(mBinding.btclick).subscribe(v->{
            Observable<GroupedObservable<String,TestItem>> observable = Observable.fromIterable(testList).groupBy(new Function<TestItem, String>() {
                @Override
                public String apply(TestItem testItem) throws Exception {
                    return testItem.getPlace();
                }
            });
            Observable.concat(observable).subscribe(new Consumer<TestItem>() {
                @Override
                public void accept(TestItem testItem) throws Exception {
                    System.out.println("小區:"+testItem.getName()+"; 房源描述:"+testItem.getPlace()+"房屋價格"+testItem.getPrice());
                }
            });
        });

需要有分組的依據,類似於氣泡排序,找到一個就以當前下標對應的數值作為初始分組,往後找尋與它在同一分組的數值下標,並把對應的下標和數值移到當前下標的下一個位置。

組合(merge)

 String[] letters = new String[]{"A","B","C","D","E","F","G","H"};
        RxView.clicks(mBinding.btclick).subscribe( v->{
            Observable<String> letter = Observable.interval(300, TimeUnit.MILLISECONDS)
                    .map(new Function<Long, String>() {
                        @Override
                        public String apply(Long aLong) throws Exception {
                            return letters[aLong.intValue()];
                        }
                    }).take(letters.length);
            Observable<Long> number = Observable.interval(500,TimeUnit.MILLISECONDS)
                    .take(5);
            Observable.merge(letter,number).subscribe(new Consumer<Serializable>() {
                @Override
                public void accept(Serializable serializable) throws Exception {
                    System.out.println(serializable);
                }
            });
        });

這裡組合的意思,是按照發送元素的時間先後依次組合的。

插入資料(startWitch)

這裡只能在源Observable傳送資料前插入資料

插入資料(concat)

這裡與startWitch的不同在於concat嚴格按照Observable的先後次序插入資料

  String[] letters = new String[]{"A","B","C","D","E","F","G","H"};
        RxView.clicks(mBinding.btclick).subscribe( v->{
            Observable<String> letter = Observable.interval(300, TimeUnit.MILLISECONDS)
                    .map(new Function<Long, String>() {
                        @Override
                        public String apply(Long aLong) throws Exception {
                            return letters[aLong.intValue()];
                        }
                    }).take(letters.length);
            Observable<Long> number = Observable.interval(500,TimeUnit.MILLISECONDS)
                    .take(5);
            Observable.concat(letter,number).subscribe(new Consumer<Serializable>() {
                @Override
                public void accept(Serializable serializable) throws Exception {
                    System.out.println(serializable);
                }
            });
        });

這裡會等到前一個(letter)資料傳送完成後才會傳送後面(number)的資料

組合發射(zip)

/**
* 這裡接著上面的,Observable還是用上面的
*/
 Observable.zip(letter, number, new BiFunction<String, Long, String>() {
                @Override
                public String apply(String s, Long aLong) throws Exception {
                    return s +aLong;
                }
            }).subscribe(new Observer<String>() {

                @Override
                public void onSubscribe(Disposable d) {

                }

                @Override
                public void onNext(String s) {
                    System.out.println(s);
                }

                @Override
                public void onError(Throwable e) {

                }

                @Override
                public void onComplete() {

                }
            });

需要注意的是,zip組合的時候,當其中的某一個Observable傳送元素出現異常時,另一個也會停止傳送。

combineLatest

combineLatest(Observable, Observable, Func2)用於將兩個Observale最近發射的資料已經Func2函式的規則進展組合。

 List<String> strings1 = new ArrayList<String>() {{
            add("你好");add("很好"); add("非常好");
        }};
        List<String> strings2 = new ArrayList<String>() {{
            add("你hen好");add("很hen好"); add("hen非常好");
        }};
        RxView.clicks(mBinding.btclick).subscribe(v -> {
            Observable<String> s1 = Observable.interval(1,TimeUnit.SECONDS)
                    .map(new Function<Long, String>() {
                        @Override
                        public String apply(Long aLong) throws Exception {
                            return strings1.get(aLong.intValue());
                        }
                    }).take(strings1.size());
            Observable<String> s2 = Observable.interval(1,TimeUnit.SECONDS)
                    .map(new Function<Long, String>() {
                        @Override
                        public String apply(Long aLong) throws Exception {
                            return strings2.get(aLong.intValue());
                        }
                    }).take(strings2.size());
            //這裡take表示傳送的元素個數  去掉會報錯關閉頁面
            Observable.combineLatest(s1, s2, new BiFunction<String, String, String>() {
                @Override
                public String apply(String s, String s2) throws Exception {
                    return "name"+ s + "page" +s2;
                }
            }).subscribe(new Consumer<String>() {
                @Override
                public void accept(String s) throws Exception {
                    System.out.println(s);
                }
            });
        });