1. 程式人生 > >RxJava 之四—— Lift()詳解

RxJava 之四—— Lift()詳解

關於RxJava,從表面上看起來很容易使用,但是如果理解不夠深刻,使用過程中,往往會出現一些問題,所以我寫了五篇文章,從入門到精通,從簡單的使用到部分原始碼詳解,希望能給讀者一個質的飛躍:

RxJava最讓人興奮的就是它有各種各樣的操作符,什麼map呀,flatMap呀各種,我們今天要知其然知其所以然,那麼他們是如何實現功能的呢?

下面通過一個例子,逐步深入分析。最後面還會再進行一次總結

例子:

程式碼塊一:

Observable.create(new Observable.OnSubscribe<String>() {
    @Override
    public
void call(Subscriber<? super String> subscriber) { subscriber.onNext("hello"); } }) .map(new Func1<String, String>() { @Override public String call(String s) { return s + "word"; } }) .subscribe(new Subscriber<String>() { @Override public void
onCompleted() { } @Override public void onError(Throwable e) { } @Override public void onNext(String s) { Log.d("rx", s); } });

變換的原理:lift()

這些變換map(),flatMap()雖然功能各有不同,但實質上都是針對事件序列的處理和再發送。而在 RxJava 的內部,它們是基於同一個基礎的變換方法: lift(Operator)。

我們先看下進行鏈式呼叫map之後,發生了什麼。

程式碼塊二:

public final <R> Observable<R> map(Func1<? super T, ? extends R> func) {
    return lift(new OperatorMap<T, R>(func));
}

對,就是呼叫了lift()!,先來看一下OperatorMap這個類是個什麼東西,看似有點多,其實很簡單:

程式碼塊三:

public final class OperatorMap<T, R> implements Operator<R, T> {

    final Func1<? super T, ? extends R> transformer;

    public OperatorMap(Func1<? super T, ? extends R> transformer) {
        this.transformer = transformer;
    }

    @Override
    public Subscriber<? super T> call(final Subscriber<? super R> o) {
        MapSubscriber<T, R> parent = new MapSubscriber<T, R>(o, transformer);
        o.add(parent);
        return parent;
    }

    static final class MapSubscriber<T, R> extends Subscriber<T> {

        final Subscriber<? super R> actual;

        final Func1<? super T, ? extends R> mapper;

        boolean done;

        public MapSubscriber(Subscriber<? super R> actual, Func1<? super T, ? extends R> mapper) {
            this.actual = actual;
            this.mapper = mapper;
        }

        @Override
        public void onNext(T t) {
            R result;

            try {
                result = mapper.call(t);
            } catch (Throwable ex) {
                Exceptions.throwIfFatal(ex);
                unsubscribe();
                onError(OnErrorThrowable.addValueAsLastCause(ex, t));
                return;
            }

            actual.onNext(result);
        }

        @Override
        public void onError(Throwable e) {
            if (done) {
                RxJavaPluginUtils.handleException(e);
                return;
            }
            done = true;

            actual.onError(e);
        }


        @Override
        public void onCompleted() {
            if (done) {
                return;
            }
            actual.onCompleted();
        }

        @Override
        public void setProducer(Producer p) {
            actual.setProducer(p);
        }
    }
}

使用傳遞進來的Func1引數,生成一個Subscriber型別的類,呼叫OperatorMap的call()函式,將返回這個類。

這裡做個區分:

call()返回的Subscriber,我們把它命名為Subscriber\$2
原來的Subscriber,我們把它命名為Subscriber\$1

呼叫call()函式的時候需要傳入Subscriber$1,在Subscriber$2中的onNext()(程式碼塊三第30行)中,先使用Func1函式(程式碼塊三第34行)對引數進行處理,再呼叫Subscriber$1.onNext()(程式碼塊三第42行)

程式碼塊四:

分析一下lift()函式,主要就是使用程式碼塊一中,map()函式中的Func1()來生成一個新的Observable,後續的操作使用這個新的Observable。

public final <R> Observable<R> lift(final Operator<? extends R, ? super T> operator) {
        return new Observable<R>(new OnSubscribeLift<T, R>(onSubscribe, operator));
}

這裡做個區分:

在lift() 中建立了的 Observable ,我們把它取名為後Observable\$2。
之前的原始 Observable,把它取名為Observable\$ 1

程式碼塊五:

來看一下Observable$2使用的OnSubscribe

這裡做個區分:

Observable\$2使用的OnSubscribe,把它取名為OnSubscribe\$2,
原來的Observable\$1的OnSubscribe取名為OnSubscribe\$1
public final class OnSubscribeLift<T, R> implements OnSubscribe<R> {

    static final RxJavaObservableExecutionHook hook = RxJavaPlugins.getInstance().getObservableExecutionHook();

    final OnSubscribe<T> parent;

    final Operator<? extends R, ? super T> operator;

    public OnSubscribeLift(OnSubscribe<T> parent, Operator<? extends R, ? super T> operator) {
        this.parent = parent;
        this.operator = operator;
    }

    @Override
    public void call(Subscriber<? super R> o) {
        try {
            Subscriber<? super T> st = hook.onLift(operator).call(o);
            try {
                // new Subscriber created and being subscribed with so 'onStart' it
                st.onStart();
                parent.call(st);
            } catch (Throwable e) {
                // localized capture of errors rather than it skipping all operators 
                // and ending up in the try/catch of the subscribe method which then
                // prevents onErrorResumeNext and other similar approaches to error handling
                Exceptions.throwIfFatal(e);
                st.onError(e);
            }
        } catch (Throwable e) {
            Exceptions.throwIfFatal(e);
            // if the lift function failed all we can do is pass the error to the final Subscriber
            // as we don't have the operator available to us
            o.onError(e);
        }
    }
}

分析一下這段程式碼:

  1. 第 17 行,Subscriber<? super T> st = hook.onLift(operator).call(o);這裡的call(),其實就是程式碼塊三中(第九行)的call()函式,返回的是一個Subscriber的父類
  2. 第 21 行,parent.call(st);這裡的parent是Observable$ 1的onSubScribe

subscribe()原始碼

Observable.subscribe(Subscriber) 的內部實現是這樣的(僅核心程式碼):

程式碼塊六

// 注意:這不是 subscribe() 的原始碼,而是將原始碼中與效能、相容性、擴充套件性有關的程式碼剔除後的核心程式碼。
// 如果需要看原始碼,可以去 RxJava 的 GitHub 倉庫下載。
public Subscription subscribe(Subscriber subscriber) {
    subscriber.onStart();
    onSubscribe.call(subscriber);
    return subscriber;
}

對整個過程進行詳解:

  • Observable.subscribe() 原始碼(程式碼塊六)第三行的 onSubscribe 指的是 Observable 中的 onSubscribe 物件(在Observable.create()時傳入的),但是 lift() 之後的情況就複雜了點。

  • 當含有 lift() 時:

    1. lift() 建立了一個 Observable ,我們把它取名為後Observable$2。之前的原始 Observable,把它取名為Observable$ 1;
    2. 同樣地,Observable$2 裡的新 OnSubscribe$2 加上之前的 Observable$1 中的原始 OnSubscribe$1,也就有了兩個 OnSubscribe;
    3. 當用戶呼叫經過 lift() ,再呼叫subscribe(), 使用的是Observable$2.subscribe() 。因此 subscribe()原始碼(程式碼塊六)中的onSubscribe.call(subscriber),也是呼叫Observable$2 .OnSubscribe$2物件(即在 lift() 中建立Observable時新建立的OnSubscribe(程式碼塊五的這個物件));
      • onSubscribe$2.call() 方法中的parent (onSubscribe的父類 ),是Observable$ 1 .onSubscribe$1 。

      • parent.call(st);(程式碼塊五第 行)使用的是Observable$ 1 .onSubscribe$1(任務計劃列表),執行新的任務Subscriber$2。

    4. Subscriber$2的onNext()任務中,先使用Func1函式(程式碼塊三第34行)對引數進行處理,再呼叫Subscriber$1.onNext()(程式碼塊三第42行)

這樣就實現了 lift() 過程,有點像一種代理機制,通過事件攔截和處理實現事件序列的變換。

這裡寫圖片描述

網上關於這部分的理解,很多部落格都使用了這篇部落格給 Android 開發者的 RxJava 詳解的圖,有興趣的也可以去看看