1. 程式人生 > >RxJava(一) create操作符的用法和原始碼分析

RxJava(一) create操作符的用法和原始碼分析

RxJava系列文章目錄導讀:

1 create操作符的基本使用

顧名思義,Create操作符是用來建立一個Observable的。下面來看一個簡單的程式碼段:

Observable.create(new Observable.OnSubscribe<String>() {
        @Override
        public void call(Subscriber<? super String> subscriber) {
            //Emit Data
        }
})

create方法接收一個引數Observable.OnSubscribe

來看下它的原始碼:

    /**
     * Invoked when Observable.subscribe is called.
     */
    public interface OnSubscribe<T> extends Action1<Subscriber<? super T>> {
        // cover for generics insanity
    }

Observable.OnSubscribe 說白了就是一個繼承了Action1介面的介面:

public interface Action1<T
> extends Action {
void call(T t); }
/**
 * All Action interfaces extend from this.
 * <p>
 * Marker interface to allow instanceof checks.
 */
public interface Action extends Function {

}
/**
 * All Func and Action interfaces extend from this.
 * <p>
 * Marker interface to allow instanceof checks.
 */
public interface Function { }

它們的繼承關係如下:
Observable.OnSubscribe <- Action1 <- Action <- Function

create()方法也就是個工廠方法:

public static <T> Observable<T> create(OnSubscribe<T> f) {
    return new Observable<T>(hook.onCreate(f));
}

通過OnSubscribe的原始碼的註釋
Invoked when Observable.subscribe is called. 意思是 當Observable被訂閱(subscribe)
OnSubscribe介面的call方法會被執行。

知道如何建立(create)Observable, 接下來我們看下如何訂閱它:

Observable.create(new Observable.OnSubscribe<String>() {
    @Override
    public void call(Subscriber<? super String> subscriber) {
        for (int i = 0; i < 5; i++) {
            printLog(tvLogs, "Emit Data:", i + "");
            subscriber.onNext("" + i);
        }
    }
})
.subscribe(new Action1<String>() {
    @Override
    public void call(String s) {
        //showToast(s);
        printLog(tvLogs, "Consume Data:", s);
    }
});

當呼叫了subscribe方法 Observable.OnSubscribe的call方法就會被執行,在Observable.OnSubscribe的call方法中迴圈了呼叫了5次subscriber.onNext,在subscribe的Action1回撥就會接受5次回撥。

Emit Data:'0' , Thread Name:RxCachedThreadScheduler-1
Emit Data:'1' , Thread Name:RxCachedThreadScheduler-1
Emit Data:'2' , Thread Name:RxCachedThreadScheduler-1
Emit Data:'3' , Thread Name:RxCachedThreadScheduler-1
Emit Data:'4' , Thread Name:RxCachedThreadScheduler-1
Consume Data:'0' , Thread Name:main
Consume Data:'1' , Thread Name:main
Consume Data:'2' , Thread Name:main
Consume Data:'3' , Thread Name:main
Consume Data:'4' , Thread Name:main

從輸出的日誌可以看到,我們還列印了Thread Name執行緒的名稱,我們可以控制傳送資料、消費資料所在的執行緒。

.observeOn(AndroidSchedulers.mainThread())
.subscribeOn(Schedulers.io())

subscribeOn 設定Observable的call方法所在的執行緒 【生產資料】

observeOn 設定subscribe的call方法所在的執行緒【消費資料】

2 從原始碼角度分析create()和subscribe()如何協同工作的

從上面的分析我們知道,create方法就是一個簡單的工廠方法:

public static <T> Observable<T> create(OnSubscribe<T> f) {
    return new Observable<T>(hook.onCreate(f));
}

直接new一個Observable 接收的引數由hook.onCreate方法返回(該方法也很簡單):

public <T> OnSubscribe<T> onCreate(OnSubscribe<T> f) {
    return f;
}
protected Observable(OnSubscribe<T> f) {
    this.onSubscribe = f;
}

總結下來一句話:create操作符建立Observable,Observable通過構造方法 儲存了我們傳進來的OnSubscribe 說白了就是Action1.

下面來看看Observable的subscribe方法的原始碼:

public final Subscription subscribe(final Action1<? super T> onNext) {
        if (onNext == null) {
            throw new IllegalArgumentException("onNext can not be null");
        }
        return subscribe(new Subscriber<T>() {

            @Override
            public final void onCompleted() {
                // do nothing
            }

            @Override
            public final void onError(Throwable e) {
                throw new OnErrorNotImplementedException(e);
            }

            @Override
            public final void onNext(T args) {
                onNext.call(args);
            }
        });
    }

從原始碼可以看出subscribe方法並沒有直接呼叫傳進來引數的方法(沒有直接呼叫onNext.call())。
而是通過subscribe(Subscriber)方法, subscribe(Subscriber)方法又是呼叫了Observable的私有靜態方法:private static <T> Subscription subscribe(Subscriber<? super T> subscriber, Observable<T> observable) 。下面是該方法的原始碼片段:

private static <T> Subscription subscribe(Subscriber<? super T> subscriber, Observable<T> observable) {

        //remove some code ....


        try {
            // allow the hook to intercept and/or decorate
            hook.onSubscribeStart(observable, observable.onSubscribe).call(subscriber);
            return hook.onSubscribeReturn(subscriber);
        } catch (Throwable e) {
            // special handling for certain Throwable/Error/Exception types
            Exceptions.throwIfFatal(e);
            // if an unhandled error occurs executing the onSubscribe we will propagate it
            try {
                subscriber.onError(hook.onSubscribeError(e));
            } catch (Throwable e2) {
                Exceptions.throwIfFatal(e2);
                // if this happens it means the onError itself failed (perhaps an invalid function implementation)
                // so we are unable to propagate the error correctly and will just throw
                RuntimeException r = new RuntimeException("Error occurred attempting to subscribe [" + e.getMessage() + "] and then again while trying to pass to onError.", e2);
                // TODO could the hook be the cause of the error in the on error handling.
                hook.onSubscribeError(r);
                // TODO why aren't we throwing the hook's return value.
                throw r;
            }
            return Subscriptions.unsubscribed();
        }
    }

我們看關鍵部分就可以了:hook.onSubscribeStart(observable, observable.onSubscribe).call(subscriber);
看看hook.onSubscribeStart原始碼:

  public <T> OnSubscribe<T> onSubscribeStart(Observable<? extends T> observableInstance, final OnSubscribe<T> onSubscribe) {
      // pass-thru by default
      return onSubscribe;
  }

很簡單,直接返回傳遞過來的引數(onSubscribe)。 這個OnSubscribe就是我們Observable.create(OnSubscribe)傳遞進去的OnSubscribe,然後呼叫OnSubscribe的call。
所以上面的程式碼可以簡化為(便於理解):observable.onSubscribe.call(subscriber).

至此,驗證了那句話,只有當Observable被訂閱OnSubscribe的call(subscriber)方法才會被執行。

我們知道了OnSubscribe的call(subscriber)執行的時機,那麼是如何把生產的資料傳遞了Observable.subscribe方法的回撥的呢?
我們通過Observable.subscribe原始碼得知,傳遞進來的回撥(Action1),是通過Subscriber來執行Action1的回撥,Subscriber又是Observable.create()引數的回撥。

Observable.create(new Observable.OnSubscribe<String>() {
        @Override
        public void call(Subscriber<? super String> subscriber) {
            //Emit Data
        }
})

所以Subscriber是Observable.OnSubscribe的回撥和Observable.subscribe(Action1..)的Action1之間通訊的橋樑。

Subscriber有三個方法:

  • onCompleted();
  • void onError(Throwable e);
  • void onNext(T t);

既然Subscriber是Observable.create(params)引數的回撥和Observable.subscribe()引數回撥的通訊橋樑,Subscriber有三個方法,那麼Observable.subscribe肯定也有三個與之對應回撥,通過原始碼知道Observable.subscribe有很多過載方法:

  • public final Subscription subscribe(final Action1
總結:Subscriber是Observable.create(Observable.OnSubscribe)引數回撥和Observable.subscribe(Action1,[Action1,Action0])引數回撥的通訊橋樑.

需要你注意的是:如果呼叫了void onError(Throwable e)方法,那麼onNext和onCompleted都不會執行。

下面用程式碼來表示他們之間的關係:

Observable.create(new Observable.OnSubscribe<String>() {
        @Override
        public void call(Subscriber<? super String> subscriber) {
            for (int i = 0; i < 5; i++) {
                printLog(tvLogs, "Emit Data:", i + "");
                subscriber.onNext("" + i);//對應subscribe方法的第一個引數
                if (condition) {
                    subscriber.onError(Throwable);//對應subscribe方法的第二個引數
                }
            }
            subscriber.onCompleted(); //對應subscribe方法的第三個引數
        }
    })
    .observeOn(AndroidSchedulers.mainThread())
    .subscribeOn(Schedulers.io())
    .subscribe(new Action1<String>() {
        @Override
        public void call(String s) {
            //showToast(s);
            printLog(tvLogs, "Consume Data:", s);
            //onNext
        }
    }, new Action1<Throwable>() {
        @Override
        public void call(Throwable throwable) {
            //onError
        }
    }, new Action0() {
        @Override
        public void call() {
            //onCompleted
        }
    });
總結:
1. 只有當Observable被訂閱OnSubscribe的call(subscriber)方法才會被執行
2. onCompleted方法裡會把Subscription取消訂閱(unsubscribe)
3. 如果呼叫了void onError(Throwable e)方法,那麼onNext和onCompleted都不會執行。會在onError呼叫之前,把Subscription取消註冊。
4. 整個事件流不管是正常結束(onComplete)還是出現了異常(onError),Subscription都會被取消註冊(unsubscribe)。
   但是,由於我們可能執行一些耗時操作,介面又被關閉了,所以還需要把subscription取消註冊
5. Subscriber是Observable.create(Observable.OnSubscribe)引數回撥和Observable.subscribe(Action1,[Action1,Action0])引數回撥的通訊橋樑.