RxJava2的do系列操作符之doOnNext和doFinally
阿新 • • 發佈:2019-01-08
1.doOnNext
它產生的Observable每發射一項資料就會呼叫它一次,但是它的Action不是接受一個Notification引數,而是接受發射的資料項。
Observable.just(1, 2, 3)
.doOnNext(new Action1<Integer>() {
@Override
public void call(Integer item) {
if( item > 1 ) {
throw new RuntimeException( "Item exceeds maximum value" );
}
}
}).subscribe(new Subscriber<Integer>() {
@Override
public void onNext(Integer item) {
System.out.println("Next: " + item);
}
@Override
public void onError(Throwable error) {
System.err.println("Error: " + error.getMessage());
}
@Override
public void onCompleted() {
System.out.println("Sequence complete.");
}
});
doOnNext一般用於在subscribe之前對資料的一些處理,比如資料的儲存等;
public void onSaveData() {
Observable
.create(new ObservableOnSubscribe<Boolean>() {
@Override
public void subscribe(@NonNull ObservableEmitter<Boolean> e) throws Exception {
List<NewsChannelBean> oldItems = dao.query(Constant.NEWS_CHANNEL_ENABLE);
e.onNext(!compare(oldItems, adapter.getmMyChannelItems()));
}
})
.subscribeOn(Schedulers.io())
.doOnNext(new Consumer<Boolean>() {
@Override
public void accept(@NonNull Boolean aBoolean) throws Exception {
if (aBoolean) {
List<NewsChannelBean> enableItems = adapter.getmMyChannelItems();
List<NewsChannelBean> disableItems = adapter.getmOtherChannelItems();
dao.removeAll();
for (int i = 0; i < enableItems.size(); i++) {
NewsChannelBean bean = enableItems.get(i);
dao.add(bean.getChannelId(), bean.getChannelName(), Constant.NEWS_CHANNEL_ENABLE, i);
}
for (int i = 0; i < disableItems.size(); i++) {
NewsChannelBean bean = disableItems.get(i);
dao.add(bean.getChannelId(), bean.getChannelName(), Constant.NEWS_CHANNEL_DISABLE, i);
}
}
}
})
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<Boolean>() {
@Override
public void accept(@NonNull Boolean isRefresh) throws Exception {
RxBus.getInstance().post(NewsTabLayout.TAG, isRefresh);
}
}, ErrorAction.error());
}
2、doFinally
當它產生的Observable終止之後會被呼叫,無論是正常還 是異常終止
/**
* 載入資料
*
* @param isRefresh 是否為下拉重新整理(初次載入)
* @param isNeedProgress 是否需要顯示Progress
*/
private void laodData(final boolean isRefresh, final boolean isNeedProgress) {
if (!ContextUtils.checkNetworkConnection(getActivity())) {
mEmptyViewHelper.setNoNetworkEmptyView(true);
ContextUtils.showToast(getActivity(), R.string.noconnectionremind);
return;
}
mEmptyViewHelper.setNoNetworkEmptyView(false);
Observable.create(new ObservableOnSubscribe<ClientRecvObject>() {
@Override
public void subscribe(@NonNull ObservableEmitter<ClientRecvObject> e) throws Exception {
User loginedUser = LoginUserManager.getLoginedUser(mConfiguration);
int pageIndex = 1;
if (!isRefresh) {
pageIndex = mPageIndex + 1;
}
ClientRecvObject remindClient = RemindConnector.getQARemindList(getActivity(), loginedUser, pageIndex);
e.onNext(remindClient);
e.onComplete();
}
}).subscribeOn(Schedulers.io())
.doOnSubscribe(new Consumer<Disposable>() {
@Override
public void accept(@NonNull Disposable disposable) throws Exception {
if (null == getActivity()) {
disposable.dispose();
return;
}
if (isNeedProgress) {
showLoadingProgress(getString(R.string.loading), true);
}
}
})
.subscribeOn(AndroidSchedulers.mainThread())
.observeOn(AndroidSchedulers.mainThread())
.doFinally(new Action() {
@Override
public void run() throws Exception {
dismissLoadingProgress();
}
}).subscribe(new Observer<ClientRecvObject>() {
private Disposable disposable;
@Override
public void onSubscribe(@NonNull Disposable d) {
disposable = d;
}
@Override
public void onNext(@NonNull ClientRecvObject clientRecvObject) {
if (getActivity() == null) {
disposable.dispose();
}
if (null != clientRecvObject && clientRecvObject.isSuccess()) {
if (!isRefresh) {
mPageIndex++;
}
RemindList remindList = (RemindList) clientRecvObject.getClientData();
refreshUI(isRefresh, remindList);
}
}
@Override
public void onError(@NonNull Throwable e) {
}
@Override
public void onComplete() {
}
});
}