RxJava 之四—— Lift()詳解
關於RxJava,從表面上看起來很容易使用,但是如果理解不夠深刻,使用過程中,往往會出現一些問題,所以我寫了五篇文章,從入門到精通,從簡單的使用到部分原始碼詳解,希望能給讀者一個質的飛躍:
RxJava最讓人興奮的就是它有各種各樣的操作符,什麼map呀,flatMap呀各種,我們今天要知其然知其所以然,那麼他們是如何實現功能的呢?
下面通過一個例子,逐步深入分析。最後面還會再進行一次總結
例子:
程式碼塊一:
Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
subscriber.onNext("hello");
}
})
.map(new Func1<String, String>() {
@Override
public String call(String s) {
return s + "word";
}
})
.subscribe(new Subscriber<String>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(String s) {
Log.d("rx", s);
}
});
變換的原理:lift()
這些變換map(),flatMap()雖然功能各有不同,但實質上都是針對事件序列的處理和再發送。而在 RxJava 的內部,它們是基於同一個基礎的變換方法: lift(Operator)。
我們先看下進行鏈式呼叫map之後,發生了什麼。
程式碼塊二:
public final <R> Observable<R> map(Func1<? super T, ? extends R> func) {
return lift(new OperatorMap<T, R>(func));
}
對,就是呼叫了lift()!,先來看一下OperatorMap這個類是個什麼東西,看似有點多,其實很簡單:
程式碼塊三:
public final class OperatorMap<T, R> implements Operator<R, T> {
final Func1<? super T, ? extends R> transformer;
public OperatorMap(Func1<? super T, ? extends R> transformer) {
this.transformer = transformer;
}
@Override
public Subscriber<? super T> call(final Subscriber<? super R> o) {
MapSubscriber<T, R> parent = new MapSubscriber<T, R>(o, transformer);
o.add(parent);
return parent;
}
static final class MapSubscriber<T, R> extends Subscriber<T> {
final Subscriber<? super R> actual;
final Func1<? super T, ? extends R> mapper;
boolean done;
public MapSubscriber(Subscriber<? super R> actual, Func1<? super T, ? extends R> mapper) {
this.actual = actual;
this.mapper = mapper;
}
@Override
public void onNext(T t) {
R result;
try {
result = mapper.call(t);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
unsubscribe();
onError(OnErrorThrowable.addValueAsLastCause(ex, t));
return;
}
actual.onNext(result);
}
@Override
public void onError(Throwable e) {
if (done) {
RxJavaPluginUtils.handleException(e);
return;
}
done = true;
actual.onError(e);
}
@Override
public void onCompleted() {
if (done) {
return;
}
actual.onCompleted();
}
@Override
public void setProducer(Producer p) {
actual.setProducer(p);
}
}
}
使用傳遞進來的Func1引數,生成一個Subscriber型別的類,呼叫OperatorMap的call()函式,將返回這個類。
這裡做個區分:
call()返回的Subscriber,我們把它命名為Subscriber\$2
原來的Subscriber,我們把它命名為Subscriber\$1
呼叫call()函式的時候需要傳入Subscriber$1,在Subscriber$2中的onNext()(程式碼塊三第30行)中,先使用Func1函式(程式碼塊三第34行)對引數進行處理,再呼叫Subscriber$1.onNext()(程式碼塊三第42行)
程式碼塊四:
分析一下lift()函式,主要就是使用程式碼塊一中,map()函式中的Func1()來生成一個新的Observable,後續的操作使用這個新的Observable。
public final <R> Observable<R> lift(final Operator<? extends R, ? super T> operator) {
return new Observable<R>(new OnSubscribeLift<T, R>(onSubscribe, operator));
}
這裡做個區分:
在lift() 中建立了的 Observable ,我們把它取名為後Observable\$2。
之前的原始 Observable,把它取名為Observable\$ 1;
程式碼塊五:
來看一下Observable$2使用的OnSubscribe
這裡做個區分:
Observable\$2使用的OnSubscribe,把它取名為OnSubscribe\$2,
原來的Observable\$1的OnSubscribe取名為OnSubscribe\$1
public final class OnSubscribeLift<T, R> implements OnSubscribe<R> {
static final RxJavaObservableExecutionHook hook = RxJavaPlugins.getInstance().getObservableExecutionHook();
final OnSubscribe<T> parent;
final Operator<? extends R, ? super T> operator;
public OnSubscribeLift(OnSubscribe<T> parent, Operator<? extends R, ? super T> operator) {
this.parent = parent;
this.operator = operator;
}
@Override
public void call(Subscriber<? super R> o) {
try {
Subscriber<? super T> st = hook.onLift(operator).call(o);
try {
// new Subscriber created and being subscribed with so 'onStart' it
st.onStart();
parent.call(st);
} catch (Throwable e) {
// localized capture of errors rather than it skipping all operators
// and ending up in the try/catch of the subscribe method which then
// prevents onErrorResumeNext and other similar approaches to error handling
Exceptions.throwIfFatal(e);
st.onError(e);
}
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
// if the lift function failed all we can do is pass the error to the final Subscriber
// as we don't have the operator available to us
o.onError(e);
}
}
}
分析一下這段程式碼:
- 第 17 行,
Subscriber<? super T> st = hook.onLift(operator).call(o);
這裡的call(),其實就是程式碼塊三中(第九行)的call()函式,返回的是一個Subscriber的父類 - 第 21 行,
parent.call(st);
這裡的parent是Observable$ 1的onSubScribe
subscribe()原始碼
Observable.subscribe(Subscriber) 的內部實現是這樣的(僅核心程式碼):
程式碼塊六
// 注意:這不是 subscribe() 的原始碼,而是將原始碼中與效能、相容性、擴充套件性有關的程式碼剔除後的核心程式碼。
// 如果需要看原始碼,可以去 RxJava 的 GitHub 倉庫下載。
public Subscription subscribe(Subscriber subscriber) {
subscriber.onStart();
onSubscribe.call(subscriber);
return subscriber;
}
對整個過程進行詳解:
Observable.subscribe() 原始碼(程式碼塊六)第三行的 onSubscribe 指的是 Observable 中的 onSubscribe 物件(在Observable.create()時傳入的),但是 lift() 之後的情況就複雜了點。
當含有 lift() 時:
- lift() 建立了一個 Observable ,我們把它取名為後Observable$2。之前的原始 Observable,把它取名為Observable$ 1;
- 同樣地,Observable$2 裡的新 OnSubscribe$2 加上之前的 Observable$1 中的原始 OnSubscribe$1,也就有了兩個 OnSubscribe;
- 當用戶呼叫經過 lift() ,再呼叫subscribe(), 使用的是Observable$2.subscribe() 。因此 subscribe()原始碼(程式碼塊六)中的onSubscribe.call(subscriber),也是呼叫Observable$2 .OnSubscribe$2物件(即在 lift() 中建立Observable時新建立的OnSubscribe(程式碼塊五的這個物件));
onSubscribe$2.call() 方法中的parent (onSubscribe的父類 ),是Observable$ 1 .onSubscribe$1 。
parent.call(st);(程式碼塊五第 行)使用的是Observable$ 1 .onSubscribe$1(任務計劃列表),執行新的任務Subscriber$2。
- Subscriber$2的onNext()任務中,先使用Func1函式(程式碼塊三第34行)對引數進行處理,再呼叫Subscriber$1.onNext()(程式碼塊三第42行)
這樣就實現了 lift() 過程,有點像一種代理機制,通過事件攔截和處理實現事件序列的變換。
網上關於這部分的理解,很多部落格都使用了這篇部落格給 Android 開發者的 RxJava 詳解的圖,有興趣的也可以去看看