1. 程式人生 > >RxJava2.x進階

RxJava2.x進階

一,RxJava背壓策略

1,背壓:被觀察者傳送訊息太快以至於它的操作符或者訂閱者不能及時處理相關訊息,背壓是在非同步的場景下才會出現,即被觀察者和觀察者處於不同的執行緒中。在RxJava2.x中新增了Flowable型別是支援背壓的(預設佇列大小128),Flowable很多操作符內部也使用了背壓策略。

2,Flowable背壓策略一共有5種
①,MISSING,此策略表示,通過Create方法建立的Flowable沒有指定背壓策略,不會對通過OnNext發射的資料做快取或丟棄處理,需要下游通過背壓操作符(onBackpressureBuffer()/onBackpressureDrop()/onBackpressureLatest())指定背壓策略。
②,ERROR,此策略表示,如果放入Flowable的非同步快取池中的資料超限了,則會丟擲MissingBackpressureException異常。Flowable預設佇列是128,這段程式碼改為128就可以正常執行。

Flowable.create(new FlowableOnSubscribe<Integer>() {

            @Override
            public void subscribe(FlowableEmitter<Integer> e) throws Exception {
                for (int i = 0; i < 129; i++) {
                    e.onNext(i);
                }
            }
        }, BackpressureStrategy.ERROR)
                .subscribeOn(Schedulers.newThread())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Consumer<Integer>() {
                    @Override
                    public void accept(Integer integer) throws Exception {
                        Log.i(TAG, "accept: integer=" + integer);
                    }
                });

③,BUFFER,此策略表示,Flowable的非同步快取池同Observable的一樣,沒有固定大小,可以無限制新增資料,不會丟擲MissingBackpressureException異常,但會導致OOM,這段程式碼會導致ANR。

 Flowable.create(new FlowableOnSubscribe<Integer>() {

            @Override
            public void subscribe(FlowableEmitter<Integer> e) throws Exception {
                for (int i = 0; ; i++) {
                    e.onNext(i);
                }
            }
        }, BackpressureStrategy.BUFFER)
                .subscribeOn(Schedulers.newThread())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Consumer<Integer>() {
                    @Override
                    public void accept(Integer integer) throws Exception {
                        Log.i(TAG, "accept: integer=" + integer);
                    }
                });

④,DROP,此策略表示,如果Flowable的非同步快取池滿了,則會丟掉將要放入快取池中的資料。這段程式碼不會引起異常,只會列印0~127,第128則被丟棄。

 Flowable.create(new FlowableOnSubscribe<Integer>() {

            @Override
            public void subscribe(FlowableEmitter<Integer> e) throws Exception {
                for (int i = 0; i<129; i++) {
                    e.onNext(i);
                }
            }
        }, BackpressureStrategy.DROP)
                .subscribeOn(Schedulers.newThread())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Consumer<Integer>() {
                    @Override
                    public void accept(Integer integer) throws Exception {
                        Log.i(TAG, "accept: integer=" + integer);
                    }
                });

⑤,LATEST,此策略表示,如果快取池滿了,會丟掉將要放入快取池中的資料。這一點和DROP策略一樣,不同的是,不管快取池的狀態如何,LATEST策略會將最後一條資料強行放入快取池中。這段程式碼會列印0~127和666,因為666是最後一條資料

Flowable.create(new FlowableOnSubscribe<Integer>() {

            @Override
            public void subscribe(FlowableEmitter<Integer> e) throws Exception {
                for (int i = 0; i<667; i++) {
                    e.onNext(i);
                }
            }
        }, BackpressureStrategy.LATEST)
                .subscribeOn(Schedulers.newThread())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Consumer<Integer>() {
                    @Override
                    public void accept(Integer integer) throws Exception {
                        Log.i(TAG, "accept: integer=" + integer);
                    }
                });

總結:Flowable不僅可以通過create建立時指定背壓策略,還可以通過其他操作符建立。例如:just,fromArray等建立後指定背壓策略。還可以通過onBackpressureBuffer()對應BackpressureStrategy.BUFFER,onBackpressureDrop()對應Backpressure.DROP,onBackpressureLatest()對應BackpressureStrategy.LATEST。

二,RxJava+Retrofit

1,實現一個網路請求 https://api.douban.com/v2/movie/top250?start=0&count=10 (豆瓣電影測試介面)

先在gradle中配置相關框架

    compile 'io.reactivex.rxjava2:rxjava:2.1.3'
    compile 'com.squareup.retrofit2:retrofit:2.3.0'
    compile 'com.squareup.retrofit2:converter-gson:2.3.0'
    compile 'com.squareup.retrofit2:adapter-rxjava2:2.3.0'
    compile 'io.reactivex.rxjava2:rxandroid:2.0.1'
    compile 'com.squareup.okhttp3:okhttp:3.9.0'
    compile 'com.squareup.okhttp3:logging-interceptor:3.9.0'
    compile 'com.squareup.retrofit2:converter-scalars:2.3.0'

建立請求介面,定義Movie實體

public interface ApiService {
    @GET("top250")
    Observable<Movie> getTopMovie(@Query("start") int start, @Query("count") int count);
}

建立Retrofit請求

 String baseUrl = "https://api.douban.com/v2/movie/";
 Retrofit retrofit = new Retrofit.Builder()
                .baseUrl(baseUrl)               .addConverterFactory(ScalarsConverterFactory.create())//請求結果轉換為基本型別
                .addConverterFactory(GsonConverterFactory.create())//請求的結果轉為實體類
                .addCallAdapterFactory(RxJava2CallAdapterFactory.create())  //適配RxJava2.0, RxJava1.x則為RxJavaCallAdapterFactory.create()
                .build();
        apiService = retrofit.create(ApiService.class);

傳送請求,拿到資料

apiService.getTopMovie(0, 10)
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Observer<Movie>() {
                    @Override
                    public void onSubscribe(Disposable d) {
                        Log.d(TAG, "onSubscribe: ");
                    }

                    @Override
                    public void onNext(Movie movie) {
                       //得到資料
                    }

                    @Override
                    public void onError(Throwable e) {
                        Log.e(TAG, "onError: " + e.getMessage());
                    }

                    @Override
                    public void onComplete() {
                        Log.e(TAG, "onComplete:");
                    }
                });

2,RxJava+Retrofit封裝