1. 程式人生 > >RxJava2的do系列操作符之doOnNext和doFinally

RxJava2的do系列操作符之doOnNext和doFinally

1.doOnNext
它產生的Observable每發射一項資料就會呼叫它一次,但是它的Action不是接受一個Notification引數,而是接受發射的資料項。

Observable.just(1, 2, 3)
          .doOnNext(new Action1<Integer>() {
          @Override
          public void call(Integer item) {
            if( item > 1 ) {
              throw new RuntimeException( "Item exceeds maximum value"
); } } }).subscribe(new Subscriber<Integer>() { @Override public void onNext(Integer item) { System.out.println("Next: " + item); } @Override public void onError(Throwable error) { System.err.println("Error: "
+ error.getMessage()); } @Override public void onCompleted() { System.out.println("Sequence complete."); } });

doOnNext一般用於在subscribe之前對資料的一些處理,比如資料的儲存等;

   public void onSaveData() {

        Observable
                .create(new ObservableOnSubscribe<Boolean>() {
                    @Override
public void subscribe(@NonNull ObservableEmitter<Boolean> e) throws Exception { List<NewsChannelBean> oldItems = dao.query(Constant.NEWS_CHANNEL_ENABLE); e.onNext(!compare(oldItems, adapter.getmMyChannelItems())); } }) .subscribeOn(Schedulers.io()) .doOnNext(new Consumer<Boolean>() { @Override public void accept(@NonNull Boolean aBoolean) throws Exception { if (aBoolean) { List<NewsChannelBean> enableItems = adapter.getmMyChannelItems(); List<NewsChannelBean> disableItems = adapter.getmOtherChannelItems(); dao.removeAll(); for (int i = 0; i < enableItems.size(); i++) { NewsChannelBean bean = enableItems.get(i); dao.add(bean.getChannelId(), bean.getChannelName(), Constant.NEWS_CHANNEL_ENABLE, i); } for (int i = 0; i < disableItems.size(); i++) { NewsChannelBean bean = disableItems.get(i); dao.add(bean.getChannelId(), bean.getChannelName(), Constant.NEWS_CHANNEL_DISABLE, i); } } } }) .observeOn(AndroidSchedulers.mainThread()) .subscribe(new Consumer<Boolean>() { @Override public void accept(@NonNull Boolean isRefresh) throws Exception { RxBus.getInstance().post(NewsTabLayout.TAG, isRefresh); } }, ErrorAction.error()); }

2、doFinally
當它產生的Observable終止之後會被呼叫,無論是正常還 是異常終止

    /**
     * 載入資料
     *
     * @param isRefresh      是否為下拉重新整理(初次載入)
     * @param isNeedProgress 是否需要顯示Progress
     */
    private void laodData(final boolean isRefresh, final boolean isNeedProgress) {
        if (!ContextUtils.checkNetworkConnection(getActivity())) {
            mEmptyViewHelper.setNoNetworkEmptyView(true);
            ContextUtils.showToast(getActivity(), R.string.noconnectionremind);
            return;
        }
        mEmptyViewHelper.setNoNetworkEmptyView(false);

        Observable.create(new ObservableOnSubscribe<ClientRecvObject>() {
            @Override
            public void subscribe(@NonNull ObservableEmitter<ClientRecvObject> e) throws Exception {
                User loginedUser = LoginUserManager.getLoginedUser(mConfiguration);
                int pageIndex = 1;
                if (!isRefresh) {
                    pageIndex = mPageIndex + 1;
                }
                ClientRecvObject remindClient = RemindConnector.getQARemindList(getActivity(), loginedUser, pageIndex);
                e.onNext(remindClient);
                e.onComplete();
            }
        }).subscribeOn(Schedulers.io())
                .doOnSubscribe(new Consumer<Disposable>() {
                    @Override
                    public void accept(@NonNull Disposable disposable) throws Exception {
                        if (null == getActivity()) {
                            disposable.dispose();
                            return;
                        }

                        if (isNeedProgress) {
                            showLoadingProgress(getString(R.string.loading), true);
                        }
                    }
                })
                .subscribeOn(AndroidSchedulers.mainThread())
                .observeOn(AndroidSchedulers.mainThread())
                .doFinally(new Action() {
                    @Override
                    public void run() throws Exception {
                        dismissLoadingProgress();
                    }
                }).subscribe(new Observer<ClientRecvObject>() {

            private Disposable disposable;

            @Override
            public void onSubscribe(@NonNull Disposable d) {
                disposable = d;
            }

            @Override
            public void onNext(@NonNull ClientRecvObject clientRecvObject) {
                if (getActivity() == null) {
                    disposable.dispose();
                }
                if (null != clientRecvObject && clientRecvObject.isSuccess()) {
                    if (!isRefresh) {
                        mPageIndex++;
                    }

                    RemindList remindList = (RemindList) clientRecvObject.getClientData();
                    refreshUI(isRefresh, remindList);
                }
            }

            @Override
            public void onError(@NonNull Throwable e) {

            }

            @Override
            public void onComplete() {

            }
        });
    }