1. 程式人生 > >Rxjava2.0(一) 建立操作符

Rxjava2.0(一) 建立操作符

注:在使用RxJava 2操作符前,記得在專案的Gradle中新增依賴:

compile 'io.reactivex.rxjava2:rxandroid:2.0.1'
compile 'io.reactivex.rxjava2:rxjava:2.0.7'

1.create

private void create() {
    Observable.create(new ObservableOnSubscribe<Integer>() {
        @Override
public void subscribe(ObservableEmitter<Integer> e) throws 
Exception { e.onNext(1); e.onNext(2); e.onNext(3); e.onComplete(); } }).subscribe(new Observer<Integer>() { @Override public void onSubscribe(Disposable d) { LogUtils.syso("++++++onSubscribe++++++++++"); } @Override public void onNext(Integer integer) { LogUtils.syso
("++++++onNext++++++++++"+integer); } @Override public void onError(Throwable e) { LogUtils.syso("++++++onError++++++++++"+e.getMessage()); } @Override public void onComplete() { LogUtils.syso("++++++onComplete++++++++++"); } });

2.just

private void just() {
    Observable.just
(1,2,3) .subscribe(new Consumer<Integer>() { @Override public void accept(Integer integer) throws Exception { LogUtils.syso("+++just+++accept++++++++++"+integer); } }, new Consumer<Throwable>() { @Override public void accept(Throwable throwable) throws Exception { LogUtils.syso("+++just+++throwable++++++++++"+throwable.getMessage()); } }); }

應用場景
快速建立 被觀察者物件(Observable) & 傳送10個以下事件

3.fromArray

Integer[] items={0,1,2,3,4};
Observable.fromArray(items)
        .subscribe(new Consumer<Integer>() {
            @Override
public void accept(Integer integer) throws Exception {
                LogUtils.syso("====accept==="+integer.toString());
}
        }, new Consumer<Throwable>() {
            @Override
public void accept(Throwable throwable) throws Exception {
                LogUtils.syso("===throwable===="+throwable.getMessage());
}
  });

應用場景

  1. 快速建立 被觀察者物件(Observable) & 傳送10個以上事件(陣列形式)
  2. 陣列元素遍歷

fromIterable

private void fromIterable() {
    List<Student> mStudents=new ArrayList<>();
Student student;
    for (int i = 0; i < 3; i++) {
        student=new Student();
student.setSubject("測試"+i);
mStudents.add(student);
}
    Observable.fromIterable(mStudents)
            .subscribe(new Consumer<Student>() {
                @Override
public void accept(Student student) throws Exception {
                    LogUtils.syso("=====accept===="+student.getSubject());
}
            }, new Consumer<Throwable>() {
                @Override
public void accept(Throwable throwable) throws Exception {
                    LogUtils.syso("=====throwable===="+throwable.getMessage());
}
            });
}

應用場景

  1. 快速建立 被觀察者物件(Observable) & 傳送10個以上事件(陣列形式)
  2. 陣列元素遍歷

empty

/**
 * 該方法建立的被觀察者物件傳送事件的特點:僅傳送Complete事件,直接通知完成
* */
private void empty() {
    Observable.empty()
            .subscribe(new Observer<Object>() {
                @Override
public void onSubscribe(Disposable d) {
                    LogUtils.syso("=========onSubscribe=========");
}

                @Override
public void onNext(Object o) {
                    LogUtils.syso("=========onNext=========");
}

                @Override
public void onError(Throwable e) {
                    LogUtils.syso("=========onError=========");
}

                @Override
public void onComplete() {
                    LogUtils.syso("=========onComplete=========");
}
            });
}

error

/**
 *  該方法建立的被觀察者物件傳送事件的特點:僅傳送Error事件,直接通知異常
* */
private void error() {
    Observable.error(new RuntimeException())
            .subscribe(new Observer<Object>() {
                @Override
public void onSubscribe(Disposable d) {
                    LogUtils.syso("=========onSubscribe=========");
}

                @Override
public void onNext(Object o) {
                    LogUtils.syso("=========onNext=========");
}

                @Override
public void onError(Throwable e) {
                    LogUtils.syso("=========onError=========");
}

                @Override
public void onComplete() {
                    LogUtils.syso("=========onComplete=========");
}
            });
}

never

/**
 *  該方法建立的被觀察者物件傳送事件的特點:不傳送任何事件
* */
private void never() {
    Observable.never()
            .subscribe(new Observer<Object>() {
                @Override
public void onSubscribe(Disposable d) {
                    LogUtils.syso("=========onSubscribe=========");
}

                @Override
public void onNext(Object o) {
                    LogUtils.syso("=========onNext=========");
}

                @Override
public void onError(Throwable e) {
                    LogUtils.syso("=========onError=========");
}

                @Override
public void onComplete() {
                    LogUtils.syso("=========onComplete=========");
}
            });
}

defer

直到有觀察者(Observer )訂閱時,才動態建立被觀察者物件(Observable) and 傳送事件

private void defer() {
    i=1;
Observable<Integer> observable = Observable.defer(new Callable<ObservableSource<? extends Integer>>() {
        @Override
public ObservableSource<? extends Integer> call() throws Exception {
            return Observable.just(i);
}
    });
i=15;
observable.
            subscribe(new Consumer<Integer>() {
        @Override
public void accept(Integer integer) throws Exception {
            LogUtils.syso("====accept======"+integer);
}
    });
}

應用場景

動態建立被觀察者物件(Observable) & 獲取最新的Observable物件資料

timer

// 延遲指定時間後,傳送1個數值0Long型別)
private void timer() {
    Observable.timer(5, TimeUnit.SECONDS)
            .subscribe(new Consumer<Long>() {
                @Override
public void accept(Long aLong) throws Exception {
                    LogUtils.syso("================"+aLong);
}
            });
}
應用場景

比如延時之後跳轉頁面

interval

// 5s後每隔1s傳送一個long(0開始遞增 每次增加1)
private void interval() {
  Observable.interval(5,1,TimeUnit.SECONDS)
          .subscribe(new Consumer<Long>() {
              @Override
public void accept(Long aLong) throws Exception {
                LogUtils.syso("====accept====="+aLong);
}
          });
}

  1. 傳送事件的特點:每隔指定時間 就傳送 事件
  2. 傳送的事件序列 = 從0開始、無限遞增1的的整數序列

intervalRange

// 3s後每隔1s傳送一個long(2開始遞增 增加5個數)
private void intervalRange() {
  Observable.intervalRange(2,5,3,1,TimeUnit.SECONDS)
          .subscribe(new Consumer<Long>() {
              @Override
public void accept(Long aLong) throws Exception {
                LogUtils.syso("====accept====="+aLong);
}
          });
}

  1. 傳送事件的特點:每隔指定時間 就傳送 事件,可指定傳送的資料的數量

range

  1. 傳送事件的特點:連續傳送 1個事件序列,可指定範圍
          作用類似於intervalRange(),但區別在於:無延遲傳送事件
// 每次傳送一個long(2開始遞增 增加5個數)
private void range() {
  Observable.range(2,5)
            .subscribe(new Consumer<Integer>() {
                @Override
public void accept(Integer integer) throws Exception {
                 LogUtils.syso("=====accept===="+integer);
}
            });
}

rangeLong

private void rangeLong() {
  Observable.rangeLong(2,3)
           .subscribe(new Consumer<Long>() {
               @Override
public void accept(Long aLong) throws Exception {
                   LogUtils.syso("====rangeLong====="+aLong);
}
           });
}

類似於range(),區別在於該方法支援資料型別 = Long