1. 程式人生 > >在Android中合理的使用Rxjava封裝同步和非同步呼叫

在Android中合理的使用Rxjava封裝同步和非同步呼叫

同步和非同步的區別:
 一般我們在程式中看到一些耗時的操作的方法,如果它有返回值等待呼叫者接收的情況就屬於同步呼叫此時當前執行緒會
 阻塞在這裡等待結果的返回
 非同步呼叫方法一般會呼叫方法是傳入響應的監聽器類listener通過介面回撥的方式來通知呼叫者結果而當前執行緒是不
 會阻塞等待結果返回的

 當需要封裝現有 API 為 Observable 的時候,可以從一下幾點來考慮:
   如果為同步 API 則使用 Observable.fromCallable()
   如果為非同步 API 則:
      避免使用 Observable.create()
      使用 Flowable.create() 並正確實現如下步驟
      在合適的地方呼叫 onNext(), onCompleted(), 和 onError()
      如果需要清理資源,則使用 setCancellation()
      選擇正確的 BackPressure 背壓策略
1.rxjava封裝同步呼叫 比如使用rxjava來封裝SharePreference的使用
 RxView.clicks(findViewById(R.id.btn_rxjava_wrap_sync))
                .compose(RxUtils.preventDuplicateClicks())
                .subscribe(new Consumer() {
                    @Override
                    public void accept(Object o) throws Exception {

                        rxWrapSyncForSharepreferences(sharedPreferences).subscribe(new Consumer<Boolean>() {
                            @Override
                            public void accept(Boolean aBoolean) throws Exception {
                                Toast.makeText(RxjavaWrapAsyncAndsyncActivity.this, aBoolean ? "成功" : "失敗", Toast.LENGTH_SHORT).show();
                            }
                        }, new Consumer<Throwable>() {
                            @Override
                            public void accept(Throwable throwable) throws Exception {
                                throwable.printStackTrace();
                                Log.d(TAG, "accept: " + throwable.getMessage());
                            }
                        });
                    }
                });
 private Observable<Boolean> rxWrapSyncForSharepreferences(final SharedPreferences sharedPreferences) {
        return Observable.fromCallable(new Callable<Boolean>() {
            @Override
            public Boolean call() throws Exception {
                boolean result = sharedPreferences.edit().putInt("a", 1)
                        .putInt("b", 2)
                        .putString("c", "c")
                        .commit();

                Log.d(TAG, "call: thread : " + Thread.currentThread().getName());

                return result;
            }
        }).compose(RxUtils.<Boolean>observableToMain());
    }

以上的場景為當我們點選按鈕時會觸發sp儲存一些資料的操作 並且我們需要得到sp是否儲存成功的返回值因此此時我們就可以使用Rxjava來封裝這個操作

 首先我們使用Observable.fromCallable(new Callable<Boolean>(){...})的方式包裝我們的sp儲存操作同時使用

RxUtils.<Boolean>observableToMain()即

public static <T> ObservableTransformer<T,T> observableToMain(){
        return new ObservableTransformer<T, T>() {
            @Override
            public ObservableSource<T> apply(@NonNull Observable<T> upstream) {
                return upstream.subscribeOn(Schedulers.io())
                        .unsubscribeOn(Schedulers.io())
                        .observeOn(AndroidSchedulers.mainThread());
            }
        };
    }

將我們的sp儲存操作放到子執行緒進行操作 並同時以返回Observable的形式給下游進行儲存結果的訂閱

2.使用Rxjava封裝非同步操作

 而在 Android 中還有很多非同步回撥場景,在網路上到處可以看到使用 Observable.create() 來封裝非同步呼叫的示例

但是,使用 Observable.create() 有很多缺點.。

下面使用一個示例來看看如何封裝非同步 API。假設我們用 SensorManager 來追蹤裝置的加速度。 普通的實現方式是這樣的:

public class RxjavaWrapAsyncAndsyncActivity extends AppCompatActivity {


    private SensorManager sensonManager;
    private Sensor sensor;

    private Disposable disposable1;
    private Disposable disposable2;
    private Disposable netWorkDisposable;

    @Override
    protected void onCreate(Bundle savedInstanceState) {
        super.onCreate(savedInstanceState);
        setContentView(R.layout.activity_rxjava_wrap_async_andsync);

        sensonManager = (SensorManager) getSystemService(SENSOR_SERVICE);
        sensor = sensonManager.getDefaultSensor(Sensor.TYPE_ACCELEROMETER);
   }


    SensorEventListener listener = new SensorEventListener() {
        @Override
        public void onSensorChanged(SensorEvent event) {
            Log.d(TAG, "onSensorChanged: " + Arrays.toString(event.values));
        }

        @Override
        public void onAccuracyChanged(Sensor sensor, int accuracy) {
            Log.d(TAG, "onAccuracyChanged: " + accuracy);
        }
    };

    @Override
    protected void onResume() {
        super.onResume();
       sensorManager.registerListener(sensorListener, sensor, SensorManager.SENSOR_DELAY_NORMAL); 
    }

    @Override
    protected void onPause() {
        super.onPause();

        sensonManager.unregisterListener(listener, sensor);
    }
}

而下面是一種使用Observable.create()的方式封裝為 RxJava Observable 的方式:

public Observable<SensorEvent> getSensorDataAsync1(final SensorManager sensonManager, final Sensor sensor, final int timespan) {


        return Observable.create(new ObservableOnSubscribe<SensorEvent>() {
            @Override
            public void subscribe(@NonNull final ObservableEmitter<SensorEvent> emitter) throws Exception {

                if(emitter.isDisposed()){
                    return;
                }

                final SensorEventListener sensorEventListener = new SensorEventListener() {
                    @Override
                    public void onSensorChanged(SensorEvent event) {
                        Log.d(TAG, "onSensorChanged: observable thread : " + Thread.currentThread().getName());

                        if (!emitter.isDisposed()) {
                            //由於加速度感測器會一直髮射資料因此在此處
                           //我們需要的是一個熱的Hot Observable 因此此處
                            //沒有呼叫onCompleted()方法
                            emitter.onNext(event);
                        }
                    }

                    @Override
                    public void onAccuracyChanged(Sensor sensor, int accuracy) {}
                };

             sensonManager.registerListener(sensorEventListener, sensor, timespan);

            }
        }).compose(RxUtils.<SensorEvent>observableToMain());
    }

以上這種使用Observable.create()封裝非同步操作的方式有很大的缺陷:

雖然上面的程式碼可以工作。但是距離一個符合 Observable 規範的實現來差很多, Observable 規範有如下幾點: 1. 如果 Subscriber 取消註冊了,則需要取消註冊加速度的監聽器來避免記憶體洩露 2. 需要捕獲可能出現的異常,然後把異常傳遞到 Observable 的 onError() 函式來避免導致程式崩潰 3. 在呼叫 onNext() 或者 onError() 之前,需要判斷 Subscriber 是否還在監聽事件,避免發射不必要的資料 4. 需要處理 backpressure(資料發射的速度比 Subscriber 處理的速度要快的情況),防止 MissingBackpressureException 異常。 上面的4點中使用Observable.create()還是可以實現前面四點的

public Observable<SensorEvent> getSensorDataAsync1(final SensorManager sensonManager, final Sensor sensor, final int timespan) {


        return Observable.create(new ObservableOnSubscribe<SensorEvent>() {
            @Override
            public void subscribe(@NonNull final ObservableEmitter<SensorEvent> emitter) throws Exception {

                if(emitter.isDisposed()){
                    return;
                }

                final SensorEventListener sensorEventListener = new SensorEventListener() {
                    @Override
                    public void onSensorChanged(SensorEvent event) {
                        Log.d(TAG, "onSensorChanged: observable thread : " + Thread.currentThread().getName());

                        if (!emitter.isDisposed()) {
                            emitter.onNext(event);
                        }
                    }

                    @Override
                    public void onAccuracyChanged(Sensor sensor, int accuracy) {

                    }
                };


                //這個地方使用setCancellable()或者setDisposable()的區別
//                https://github.com/ReactiveX/RxJava/issues/4812
//                https://stackoverflow.com/questions/43280248/what-is-the-difference-between-rxjava-2-cancellable-and-disposable#
                emitter.setCancellable(new Cancellable() {
                    @Override
                    public void cancel() throws Exception {
//                        throw new Exception("異常終止");
                        sensonManager.unregisterListener(sensorEventListener, sensor);
                    }
                });

                sensonManager.registerListener(sensorEventListener, sensor, timespan);

            }
        }).compose(RxUtils.<SensorEvent>observableToMain());
    }

但是要實現 上面的第4步就沒有這麼簡單了,畢竟 RxJava 中的 Backpressure 都可以出一本書來詳細介紹其內容了。

每次封裝非同步 API 都需要手工實現 Backpressure 則是非常痛苦的,也不是常人可以正確做到的。

因此,在 RxJava 2.x版本中,RxJava 開發人員提供了一個新的型別的Observable即Flowable

Flowable和Observable的唯一區別就是Flowable預設是支援背壓backpressure的因此我們可使用Flowable來實現我們的背壓策略

private Flowable<SensorEvent> getSensorDataAsync2(final SensorManager sensonManager, final Sensor sensor, final int timespan) {
        return Flowable.create(new FlowableOnSubscribe<SensorEvent>() {
            @Override
            public void subscribe(@NonNull final FlowableEmitter<SensorEvent> emitter) throws Exception {
                Log.d(TAG, "flowable thread " + Thread.currentThread().getName());

                if(emitter.isCancelled()){
                    return;
                }

                final SensorEventListener sensorEventListener = new SensorEventListener() {
                    @Override
                    public void onSensorChanged(SensorEvent event) {
                        Log.d(TAG, "onSensorChanged: flowable thread " + Thread.currentThread().getName());
                        if (!emitter.isCancelled()) {
                            emitter.onNext(event);
                            //此處Flowable是一個熱的Flowable
                            //此處不能呼叫onComplete()我們是希望訂閱者一直接受到sensor的資料
//                            emitter.onComplete();
                        }
                    }

                    @Override
                    public void onAccuracyChanged(Sensor sensor, int accuracy) {

                    }
                };


                emitter.setCancellable(new Cancellable() {
                    @Override
                    public void cancel() throws Exception {
                        sensonManager.unregisterListener(sensorEventListener, sensor);
                    }
                });


                sensonManager.registerListener(sensorEventListener, sensor, timespan);

            }
        }, BackpressureStrategy.BUFFER) //指定一種背壓策略
                .compose(RxUtils.<SensorEvent>flowableToMain());
    }

注意上面的實現中,並沒有處理 第 2、3 步驟,Flowable.create() 自動處理了該問題。 通過 setCancellation() 來實現 第 1 個步驟,而第 4 個步驟只要指定 backpressure 背壓策略就可以了

處理 Backpressure

Backpressure 是用來描述,生產者生產資料的速度比消費者消費資料的速度快的一種情況。

如果沒有處理這種情況,則會出現 MissingBackpressureException 。

由於手工實現 Backpressure 是很困難的,如果使用 fromAsync() 函式則我們只需要理解各種

Backpressure 策略即可,不用自己實現。

  • BUFFER(快取):使用無限個數的內部快取(RxJava 預設使用 16 個元素的內部快取),一開始會建立一個 128 個元素的緩衝物件,然後動態的擴充套件直到 JVM 記憶體不足。

    使用 hot Observable 的時候通常會指定這種策略,比如上面的示例。

  • LATEST(使用最新的):只發射最新生成的資料,之前的舊的資料被丟棄。類似於使用緩衝個數為 1 的快取。

    cold Observable 通常可以使用這種策略。比如 Andorid 裡面的電量變化、或者 最近的位置資訊就可以使用這種策略。之前舊的資料已經為無效資料直接丟棄就好。

  • DROP(直接丟棄):只發射第一個資料,之後的所有資料就丟棄。

    通常用來指生成一個數據的 Observable。

  • ERROR / NONE: 預設的不指定任何策略,會出現 MissingBackpressureException

在封裝非同步 API 的時候,根據非同步 API 的特點,來選擇合適的策略是非常重要