1. 程式人生 > >RxJava(十一)defer操作符實現程式碼支援鏈式呼叫

RxJava(十一)defer操作符實現程式碼支援鏈式呼叫

RxJava系列文章目錄導讀:

一、前言

現在越來越多Android開發者使用到RxJava,在Android使用RxJava主要有如下好處:
1,輕鬆切換執行緒。以前我們切換執行緒主要使用Handler等手段來做。
2,輕鬆解決回撥的巢狀問題。現在的app業務邏輯越來越複雜,多的時候3,4層回撥巢狀,使得程式碼可維護性變得很差。RxJava鏈式呼叫使得這些呼叫變得扁平化。

隨著RxJava的流行,越來越多的開源專案開始支援RxJava,像Retrofit、GreenDao等。這些開源專案支援RxJava使得我們解決複雜業務變得非常方便。

但是這些還不夠,有的時候我們自己的封裝的業務也需要支援RxJava,舉個例子:查詢資料、處理本地檔案等操作,總而言之就是一些耗時任務。而且還要處理這些操作的成功、失敗、執行緒切換等操作。
如果還是想以前那樣做,那就太low。

二、下面就來探討下如何使得程式碼支援RxJava風格

遇到這種問題,在我腦海裡浮現的第一種方式就是通過Observable的create操作符。因為在裡面我們可以控制資料的發射。就像上一篇文章那樣《RxJava switchIfEmpty操作符實現Android檢查本地快取邏輯判斷》

如下程式碼片段:

Observable.create(new Observable.OnSubscribe<Object>() {
            @Override
            public void call(Subscriber<? super Object> subscriber) {
                try {
                    List<Article> as = articleDao.queryBuilder
() .where(ArticleDao.Properties.CategoryId.eq(categoryId)) .orderDesc(ArticleDao.Properties.Id) .offset((pageIndex - 1) * pageSize) .limit(pageSize).list(); if (as == null || as.isEmpty
()) { subscriber.onNext(null); }else{ subscriber.onNext(as); } }catch (Exception e){ subscriber.onError(e); } subscriber.onCompleted(); } });

這樣確實沒有沒有問題。但是我們要封裝下, 每個方法都這樣寫維護性和擴充套件比較差(例如有天我想換種方式來實現而不是create,如果通過方法封裝一下,修改就變得容易多了)
如何封裝呢?通過分析知道,大部分程式碼是相同的,只是我們的業務不一樣。那麼通過模板方法解決吧。業務方法通過介面回撥的方式傳遞進來,因為我們不知道呼叫者是什麼業務。

回撥介面如下(T表示我們業務資料):

    public interface MyCallable<T> {
        T call();
    }

下面是模板程式碼:

    protected <R> Observable<R> createObservable(final MyCallable<R> callable) {
        return Observable.create(new Observable.OnSubscribe<R>() {
            @Override
            public void call(Subscriber<? super R> subscriber) {
                try {
                    R result = callable.call();
                    subscriber.onNext(result);
                } catch (Exception e) {
                    subscriber.onError(e);
                }
                subscriber.onCompleted();
            }
        });
    }

使用就非常簡單了呼叫createObservable方法,實現MyCallable介面即可,然後就是跟使用RxJava一樣處理邏輯。

三、分析greendao是如何支援RxJava風格的

看過Greendao原始碼的人知道,它也是通過這種方式支援RxJava的(下面看看他是怎麼做的):

    /**
     * Rx version of {@link AbstractDao#loadAll()} returning an Observable.
     */
    @Experimental
    public Observable<T> load(final K key) {
        return wrap(new Callable<T>() {
            @Override
            public T call() throws Exception {
                return dao.load(key);
            }
        });
    }

最終的實現也是通過dao.load(key)同步方法來實現的,關鍵是wrap方法了:

    protected <R> Observable<R> wrap(Callable<R> callable) {
        return wrap(RxUtils.fromCallable(callable));
    }

    //通過這個方法再包裝了一層(就是預設設定執行的執行緒)
    protected <R> Observable<R> wrap(Observable<R> observable) {
        if (scheduler != null) {
            return observable.subscribeOn(scheduler);
        } else {
            return observable;
        }
    }

通過程式碼可以看到預設執行的執行緒是IO執行緒:

    /**
     * The returned RxDao is a special DAO that let's you interact with Rx Observables using RX's IO scheduler for
     * subscribeOn.
     *
     * @see #rxPlain()
     */
    @Experimental
    public RxDao<T, K> rx() {
        if (rxDao == null) {
            rxDao = new RxDao<>(this, Schedulers.io());
        }
        return rxDao;
    }

所以使用greendao不用指定它在IO執行,因為框架已經幫我們設定了。

然後就是RxUtils.fromCallable(callable)方法了:

class RxUtils {
    /** As of RxJava 1.1.7, Observable.fromCallable is still @Beta, so just in case... */
    @Internal
    static <T> Observable<T> fromCallable(final Callable<T> callable) {
        return Observable.defer(new Func0<Observable<T>>() {

            @Override
            public Observable<T> call() {
                T result;
                try {
                    result = callable.call();
                } catch (Exception e) {
                    return Observable.error(e);
                }
                return Observable.just(result);
            }
        });
    }
}

上面的註釋說通過Observable.fromCallable也可以實現這樣的邏輯,也就是說代替Observable.defer()方法。
最後greendao是通過defer操作符來實現rx風格的。

四、defer和create操作符有什麼異同點?

通過分析greendao原始碼得知,他是通過defer來做的,我們是通過create操作符來做的。那兩者有什麼不同?

我們對defer操作符比較陌生,先看看它的原始碼:

    public static <T> Observable<T> defer(Func0<Observable<T>> observableFactory) {
        return create(new OnSubscribeDefer<T>(observableFactory));
    }

說白了就是呼叫了create(OnSubscribe<T> f) 方法:

    public static <T> Observable<T> create(OnSubscribe<T> f) {
        return new Observable<T>(hook.onCreate(f));
    }

其實我們上面的create操作也是呼叫過來這個方法。只是defer操作符傳遞的OnSubscribe是OnSubscribeDefer,那我們來看看這是什麼鬼?

public final class OnSubscribeDefer<T> implements OnSubscribe<T> {
    final Func0<? extends Observable<? extends T>> observableFactory;

    public OnSubscribeDefer(Func0<? extends Observable<? extends T>> observableFactory) {
        this.observableFactory = observableFactory;
    }

    @Override
    public void call(final Subscriber<? super T> s) {
        Observable<? extends T> o;
        try {
            o = observableFactory.call();
        } catch (Throwable t) {
            Exceptions.throwOrReport(t, s);
            return;
        }
        o.unsafeSubscribe(Subscribers.wrap(s));
    }

}

OnSubscribeDefer也是繼承自OnSubscribe,那麼他的call方法肯定也是在訂閱的時候被呼叫(就是說訂閱的時候才建立這個observable,並且每次訂閱都會建立一個新的observable)。
為什麼Greendao沒有使用create那種方式精確控制資料的發射?現在RxJava2.0對create操作符做出了一些限制,不能隨隨便便create了,這樣出現一些問題。具體的RxJava2.0的改動可以看看
他的github說明What’s-different-in-2.0

五、參考資料: