1. 程式人生 > >RxJava1.X升級到RxJava2.X筆記

RxJava1.X升級到RxJava2.X筆記

描述 RxJava 1.X RxJava 2.X
package包名 rx.xxx io.reactivex.xxx
1.X早於Reactive Streams規範出現,僅部分支援規範 完全支援
對背壓的支援不完善 Observable設計為不支援背壓
新增Flowable支援背壓
null空值 支援 不再支援null值,傳入null值會丟擲 NullPointerException
Schedulers執行緒排程器 Schedulers.immediate()
Schedulers.trampoline()

Schedulers.computation()
Schedulers.newThread()
Schedulers.io()
Schedulers.from(executor)
AndroidSchedulers.mainThread()
移除Schedulers.immediate()
新增Schedulers.single()
其它未變
Single 行為類似Observable,但只會發射一個onSuccessonError 按照Reactive Streams規範重新設計,遵循協議onSubscribe(onSuccess/onError)
Completable 行為類似Observable
,要麼全部成功,要麼就失敗
按照Reactive Streams規範重新設計,遵循協議onSubscribe (onComplete/onError)
Maybe 2.X新增,行為類似Observable,可能會有一個數據或一個錯誤,也可能什麼都沒有。可以將其視為一種返回可空值的方法。這種方法如果不丟擲異常的話,將總是會返回一些東西,但是返回值可能為空,也可能不為空。按照Reactive Streams規範設計,遵循協議onSubscribe (onSuccess/onError/onComplete)
Flowable 2.X新增,行為類似Observable
,按照Reactive Streams規範設計,支援背壓Backpressure
Subject AsyncSubject
BehaviorSubject
PublishSubject
ReplaySubject
UnicastSubject
2.X依然維護這些Subject現有的功能,並新增:
AsyncProcessor
BehaviorProcessor
PublishProcessor
ReplayProcessor
UnicastProcessor
支援背壓Backpressure
Subscriber Subscriber 由於與Reactive Streams的命名衝突,Subscriber已重新命名為Disposable

library依賴變化

//1.X
compile 'io.reactivex:rxjava:1.2.1'
compile 'io.reactivex:rxandroid:1.2.1'

//2.X
compile 'io.reactivex.rxjava2:rxjava:2.0.0'
compile 'io.reactivex.rxjava2:rxandroid:2.0.0'

package變化

變動主要為rx.xxx --> io.reactivex.xxx

//1.X
import rx.Observable;
import rx.Subscription;
import rx.android.schedulers.AndroidSchedulers;
import rx.schedulers.Schedulers;
import rx.functions.Action1;

//2.X
import io.reactivex.Observable;
import io.reactivex.ObservableSource;
import io.reactivex.ObservableTransformer;
import io.reactivex.disposables.Disposable;
import io.reactivex.android.schedulers.AndroidSchedulers;
import io.reactivex.schedulers.Schedulers;
import io.reactivex.functions.Consumer;

null

RxJava 2.X不再支援null值,如果傳入一個null會丟擲NullPointerException

Observable.just(null);

Single.just(null);

Observable.fromCallable(() -> null)
    .subscribe(System.out::println, Throwable::printStackTrace);

Observable.just(1).map(v -> null)
    .subscribe(System.out::println, Throwable::printStackTrace);

案例1

//1.X
public static final Observable.Transformer IO_TRANSFORMER = new Observable.Transformer() {
    @Override public Object call(Object observable) {
        return ((Observable) observable).subscribeOn(Schedulers.io())
                                        .unsubscribeOn(Schedulers.io())
                                        .observeOn(Schedulers.io());
    }
};
public static final <T> Observable.Transformer<T, T> applySchedulers(Observable.Transformer transformer){
    return (Observable.Transformer<T, T>)transformer;
}
Action1<Integer> onNext = null;
String[] items = { "item1", "item2", "item3" };
Subscription subscription = Observable.from(items)
                                      .compose(RxUtil.<String>applySchedulers(IO_TRANSFORMER))
                                      .map(new Func1<String, Integer>() {
                                                  @Override public Integer call(String s) {
                                                      return Integer.valueOf(s);
                                                  }
                                              })
                                      .subscribe(onNext);
//TODO subscription.unsubscribe();   

//2.X
public static final ObservableTransformer IO_TRANSFORMER = new ObservableTransformer() {
    @Override public ObservableSource apply(Observable upstream) {
        return upstream.subscribeOn(Schedulers.io())
                       .unsubscribeOn(Schedulers.io())
                       .observeOn(Schedulers.io());
    }
};
public static final <T> ObservableTransformer<T, T> applySchedulers(ObservableTransformer transformer){
    return (ObservableTransformer<T, T>)transformer;
}
Consumer<Integer> onNext = null;
String[] items = { "item1", "item2", "item3" };
Disposable disposable = Observable.fromArray(items)
                                  .compose(RxUtil.<String>applySchedulers(IO_TRANSFORMER))
                                  .map(new Function<String, Integer>() {
                                              @Override public Integer apply(String s) throws Exception {
                                                  return Integer.valueOf(s);
                                              }
                                          })
                                  .subscribe(onNext);
//TODO disposable.dispose();
  • .subscribe(...)返回值的變化:1.X為Subscription, 2.X為Disposable
  • Transformer的變化:1.X為rx.Observable內部的Transformer介面, 繼承自Func1<Observable<T>, Observable<R>>, 2.X為io.reactivexObservableTransformer<Upstream, Downstream>,是一個獨立的介面
  • AndroidSchedulers的變化: 1.X為rx.android.schedulers.AndroidSchedulers, 2.X為io.reactivex.android.schedulers.AndroidSchedulers
  • Func1的變化: 1.X為rx.functions.Func1, 2.X為io.reactivex.functions.Function
  • 其它過載方法見下方截圖
    1.X
    2.X

案例2

//1.X
public class AppBaseActivity extends AppCompatActivity {
    ...
    private CompositeSubscription mCompositeSubscription;

    protected void addSubscription(Subscription subscription) {
        if (null == mCompositeSubscription) {
            mCompositeSubscription = new CompositeSubscription();
        }
        mCompositeSubscription.add(subscription);
    }

    @Override protected void onDestroy() {
        if (null != mCompositeSubscription) {
            mCompositeSubscription.unsubscribe();
        }
        super.onDestroy();
    }
    ...
}

//2.X
public class AppBaseActivity extends AppCompatActivity {
    ...
    private   CompositeDisposable mCompositeDisposable;

    protected void addDisposable(Disposable disposable) {
        if (null == mCompositeDisposable) {
            mCompositeDisposable = new CompositeDisposable();
        }
        mCompositeDisposable.add(disposable);
    }

    @Override protected void onDestroy() {
        if (null != mCompositeDisposable) {
            mCompositeDisposable.clear();
        }
        super.onDestroy();
    }
    ...
}

我現在維護的專案基本就這些改動,如有錯誤之處,還望批評指正,謝謝!