RxJava原始碼解析04-變換過程(flatmap)
flatmap
public final <R> Observable<R> flatMap(Func1<? super T, ? extends Observable<? extends R>> func) {
//...
return merge(map(func));
}
在flatmap中,首先會呼叫map()
將func轉換為Observable
型別,由於這裡傳入的泛型是Observable<? extends R>>
,故會返回Observable<Observable<? extends R>>>
獲得Observable<Observable<? extends R>>>
後,會呼叫merge()
,來看下merge操作符的定義
將多個Observables的輸出合併,就好像它們是一個單個的Observable一樣。
所以這裡也就是把原來的Observable和map()
新生成的Observable進行合併,從而生成新的Observable。
直接來看merge的核心程式碼
//Observable類 public static <T> Observable<T> merge(Observable<? extends Observable<? extends T>> source) { //... return source.lift(OperatorMerge.<T>instance(false)); } public final <R> Observable<R> lift(final Operator<? extends R, ? super T> operator) { return unsafeCreate(new OnSubscribeLift<T, R>(onSubscribe, operator)); }
可以看到lift()
傳參的時候,呼叫了OperatorMerge.<T>instance(false)
,這個最終會返回OperatorMerge例項。
這個類我們之後再來看。
再回到Observable#lift()
public final <R> Observable<R> lift(final Operator<? extends R, ? super T> operator) { return unsafeCreate(new OnSubscribeLift<T, R>(onSubscribe, operator)); }
這裡會把傳入onSubscribe和operator生成一個OnSubscribeLift 。OnSubscribeLift 會把資料先交由operator#call()
呼叫後,再給onSubscribe呼叫,這個我們上一篇文章中已經知道了。
最終通過unsafeCreate返回一個新的Observable。
主要邏輯在operator#call()
public final class OperatorMerge<T> implements Operator<T, Observable<? extends T>> {
@Override
public Subscriber<Observable<? extends T>> call(final Subscriber<? super T> child) {
MergeSubscriber<T> subscriber = new MergeSubscriber<T>(child, delayErrors, maxConcurrent);
MergeProducer<T> producer = new MergeProducer<T>(subscriber);
subscriber.producer = producer;
child.add(subscriber);
child.setProducer(producer);
return subscriber;
}
//...
}
call方法這裡會建立一個MergeSubscriber,並且會呼叫setProducer()
public void setProducer(Producer p) {
//...
if (toRequest == NOT_SET) {
producer.request(Long.MAX_VALUE);
} else {
producer.request(toRequest);
}
}
即呼叫MergeProducer的request()
public void request(long n) {
if (n > 0) {
if (get() == Long.MAX_VALUE) {
return;
}
BackpressureUtils.getAndAddRequest(this, n);
subscriber.emit();
} else if (n < 0) {
throw new IllegalArgumentException("n >= 0 required");
}
}
可以看到,最終呼叫了MergeSubscriber的emit()。emit會從queue中不斷獲取資料t並傳給Subscriber child去執行。從而把Observable的資料逐個交給Subscriber的OnNext、onCompleted、onError進行呼叫。
void emitLoop() {
boolean skipFinal = false;
try {
final Subscriber<? super T> child = this.child;
for (;;) {
// eagerly check if child unsubscribed or we reached a terminal state.
if (checkTerminate()) {
skipFinal = true;
return;
}
Queue<Object> svq = queue;
long r = producer.get();
boolean unbounded = r == Long.MAX_VALUE;
// count the number of 'completed' sources to replenish them in batches
int replenishMain = 0;
// try emitting as many scalars as possible
if (svq != null) {
for (;;) {
int scalarEmission = 0;
Object o = null;
while (r > 0) {
o = svq.poll();
// eagerly check if child unsubscribed or we reached a terminal state.
if (checkTerminate()) {
skipFinal = true;
return;
}
if (o == null) {
break;
}
T v = NotificationLite.getValue(o);
// if child throws, report bounce it back immediately
try {
child.onNext(v);
} catch (Throwable t) {
if (!delayErrors) {
Exceptions.throwIfFatal(t);
skipFinal = true;
unsubscribe();
child.onError(t);
return;
}
getOrCreateErrorQueue().offer(t);
}
replenishMain++;
scalarEmission++;
r--;
}
if (scalarEmission > 0) {
if (unbounded) {
r = Long.MAX_VALUE;
} else {
r = producer.produced(scalarEmission);
}
}
if (r == 0L || o == null) {
break;
}
}
}
/*
* We need to read done before innerSubscribers because innerSubscribers are added
* before done is set to true. If it were the other way around, we could read an empty
* innerSubscribers, get paused and then read a done flag but an async producer
* might have added more subscribers between the two.
*/
boolean d = done;
// re-read svq because it could have been created
// asynchronously just before done was set to true.
svq = queue;
// read the current set of inner subscribers
InnerSubscriber<?>[] inner = innerSubscribers;
int n = inner.length;
// check if upstream is done, there are no scalar values
// and no active inner subscriptions
if (d && (svq == null || svq.isEmpty()) && n == 0) {
Queue<Throwable> e = errors;
if (e == null || e.isEmpty()) {
child.onCompleted();
} else {
reportError();
}
skipFinal = true;
return;
}
boolean innerCompleted = false;
if (n > 0) {
// let's continue the round-robin emission from last location
long startId = lastId;
int index = lastIndex;
// in case there were changes in the array or the index
// no longer points to the inner with the cached id
if (n <= index || inner[index].id != startId) {
if (n <= index) {
index = 0;
}
// try locating the inner with the cached index
int j = index;
for (int i = 0; i < n; i++) {
if (inner[j].id == startId) {
break;
}
// wrap around in round-robin fashion
j++;
if (j == n) {
j = 0;
}
}
// if we found it again, j will point to it
// otherwise, we continue with the replacement at j
index = j;
lastIndex = j;
lastId = inner[j].id;
}
int j = index;
// loop through all sources once to avoid delaying any new sources too much
for (int i = 0; i < n; i++) {
// eagerly check if child unsubscribed or we reached a terminal state.
if (checkTerminate()) {
skipFinal = true;
return;
}
@SuppressWarnings("unchecked")
InnerSubscriber<T> is = (InnerSubscriber<T>)inner[j];
Object o = null;
for (;;) {
int produced = 0;
while (r > 0) {
// eagerly check if child unsubscribed or we reached a terminal state.
if (checkTerminate()) {
skipFinal = true;
return;
}
RxRingBuffer q = is.queue;
if (q == null) {
break;
}
o = q.poll();
if (o == null) {
break;
}
T v = NotificationLite.getValue(o);
// if child throws, report bounce it back immediately
try {
child.onNext(v);
} catch (Throwable t) {
skipFinal = true;
Exceptions.throwIfFatal(t);
try {
child.onError(t);
} finally {
unsubscribe();
}
return;
}
r--;
produced++;
}
if (produced > 0) {
if (!unbounded) {
r = producer.produced(produced);
} else {
r = Long.MAX_VALUE;
}
is.requestMore(produced);
}
// if we run out of requests or queued values, break
if (r == 0 || o == null) {
break;
}
}
boolean innerDone = is.done;
RxRingBuffer innerQueue = is.queue;
if (innerDone && (innerQueue == null || innerQueue.isEmpty())) {
removeInner(is);
if (checkTerminate()) {
skipFinal = true;
return;
}
replenishMain++;
innerCompleted = true;
}
// if we run out of requests, don't try the other sources
if (r == 0) {
break;
}
// wrap around in round-robin fashion
j++;
if (j == n) {
j = 0;
}
}
// if we run out of requests or just completed a round, save the index and id
lastIndex = j;
lastId = inner[j].id;
}
if (replenishMain > 0) {
request(replenishMain);
}
// if one or more inner completed, loop again to see if we can terminate the whole stream
if (innerCompleted) {
continue;
}
// in case there were updates to the state, we loop again
synchronized (this) {
if (!missed) {
skipFinal = true;
emitting = false;
break;
}
missed = false;
}
}
} finally {
if (!skipFinal) {
synchronized (this) {
emitting = false;
}
}
}
}
至此,完成了flatmap變換過程的解析。
總結
flatmap的變換實際上就是先通過map()
將Fun1轉換為Observable< Observable<T>>
,再交由merge()
將兩個Observable合併,生成新的Observable。當執行時,內部會通過不斷獲取佇列的資料,再交由Subscriber進行OnNext、onCompleted、onError的呼叫。