1. 程式人生 > >RxJava原始碼解析04-變換過程(flatmap)

RxJava原始碼解析04-變換過程(flatmap)

flatmap

public final <R> Observable<R> flatMap(Func1<? super T, ? extends Observable<? extends R>> func) {
    //...
    return merge(map(func));
}  

在flatmap中,首先會呼叫map()將func轉換為Observable型別,由於這裡傳入的泛型是Observable<? extends R>>,故會返回Observable<Observable<? extends R>>>

型別,具體的轉換在上一篇文章上已經講過了。

獲得Observable<Observable<? extends R>>>後,會呼叫merge(),來看下merge操作符的定義

將多個Observables的輸出合併,就好像它們是一個單個的Observable一樣。  

所以這裡也就是把原來的Observable和map()新生成的Observable進行合併,從而生成新的Observable。

直接來看merge的核心程式碼

//Observable類
public static <T> Observable<T> merge(Observable<? extends Observable<? extends T>> source) {
    //...
    return source.lift(OperatorMerge.<T>instance(false));
}

public final <R> Observable<R> lift(final Operator<? extends R, ? super T> operator) {
    return unsafeCreate(new OnSubscribeLift<T, R>(onSubscribe, operator));
}  

可以看到lift()傳參的時候,呼叫了OperatorMerge.<T>instance(false),這個最終會返回OperatorMerge例項。
這個類我們之後再來看。

再回到Observable#lift()

    public final <R> Observable<R> lift(final Operator<? extends R, ? super T> operator) {
    return unsafeCreate(new OnSubscribeLift<T, R>(onSubscribe, operator));
}

這裡會把傳入onSubscribe和operator生成一個OnSubscribeLift 。OnSubscribeLift 會把資料先交由operator#call()呼叫後,再給onSubscribe呼叫,這個我們上一篇文章中已經知道了。

最終通過unsafeCreate返回一個新的Observable。

主要邏輯在operator#call()

    public final class OperatorMerge<T> implements Operator<T, Observable<? extends T>> {
    @Override
    public Subscriber<Observable<? extends T>> call(final Subscriber<? super T> child) {
        MergeSubscriber<T> subscriber = new MergeSubscriber<T>(child, delayErrors, maxConcurrent);
        MergeProducer<T> producer = new MergeProducer<T>(subscriber);
        subscriber.producer = producer;

        child.add(subscriber);
        child.setProducer(producer);

        return subscriber;
    }

    //...
}  

call方法這裡會建立一個MergeSubscriber,並且會呼叫setProducer()

public void setProducer(Producer p) {
    //...
    if (toRequest == NOT_SET) {
        producer.request(Long.MAX_VALUE);
    } else {
        producer.request(toRequest);
    }
}  

即呼叫MergeProducer的request()

 public void request(long n) {
        if (n > 0) {
            if (get() == Long.MAX_VALUE) {
                return;
            }
            BackpressureUtils.getAndAddRequest(this, n);
            subscriber.emit();
        } else if (n < 0) {
            throw new IllegalArgumentException("n >= 0 required");
        }
    }  

可以看到,最終呼叫了MergeSubscriber的emit()。emit會從queue中不斷獲取資料t並傳給Subscriber child去執行。從而把Observable的資料逐個交給Subscriber的OnNext、onCompleted、onError進行呼叫。

 void emitLoop() {
        boolean skipFinal = false;
        try {
            final Subscriber<? super T> child = this.child;
            for (;;) {
                // eagerly check if child unsubscribed or we reached a terminal state.
                if (checkTerminate()) {
                    skipFinal = true;
                    return;
                }
                Queue<Object> svq = queue;

                long r = producer.get();
                boolean unbounded = r == Long.MAX_VALUE;

                // count the number of 'completed' sources to replenish them in batches
                int replenishMain = 0;

                // try emitting as many scalars as possible
                if (svq != null) {
                    for (;;) {
                        int scalarEmission = 0;
                        Object o = null;
                        while (r > 0) {
                            o = svq.poll();
                            // eagerly check if child unsubscribed or we reached a terminal state.
                            if (checkTerminate()) {
                                skipFinal = true;
                                return;
                            }
                            if (o == null) {
                                break;
                            }
                            T v = NotificationLite.getValue(o);
                            // if child throws, report bounce it back immediately
                            try {
                                child.onNext(v);
                            } catch (Throwable t) {
                                if (!delayErrors) {
                                    Exceptions.throwIfFatal(t);
                                    skipFinal = true;
                                    unsubscribe();
                                    child.onError(t);
                                    return;
                                }
                                getOrCreateErrorQueue().offer(t);
                            }
                            replenishMain++;
                            scalarEmission++;
                            r--;
                        }
                        if (scalarEmission > 0) {
                            if (unbounded) {
                                r = Long.MAX_VALUE;
                            } else {
                                r = producer.produced(scalarEmission);
                            }
                        }
                        if (r == 0L || o == null) {
                            break;
                        }
                    }
                }

                /*
                 * We need to read done before innerSubscribers because innerSubscribers are added
                 * before done is set to true. If it were the other way around, we could read an empty
                 * innerSubscribers, get paused and then read a done flag but an async producer
                 * might have added more subscribers between the two.
                 */
                boolean d = done;
                // re-read svq because it could have been created
                // asynchronously just before done was set to true.
                svq = queue;
                // read the current set of inner subscribers
                InnerSubscriber<?>[] inner = innerSubscribers;
                int n = inner.length;

                // check if upstream is done, there are no scalar values
                // and no active inner subscriptions
                if (d && (svq == null || svq.isEmpty()) && n == 0) {
                    Queue<Throwable> e = errors;
                    if (e == null || e.isEmpty()) {
                        child.onCompleted();
                    } else {
                        reportError();
                    }
                    skipFinal = true;
                    return;
                }

                boolean innerCompleted = false;
                if (n > 0) {
                    // let's continue the round-robin emission from last location
                    long startId = lastId;
                    int index = lastIndex;

                    // in case there were changes in the array or the index
                    // no longer points to the inner with the cached id
                    if (n <= index || inner[index].id != startId) {
                        if (n <= index) {
                            index = 0;
                        }
                        // try locating the inner with the cached index
                        int j = index;
                        for (int i = 0; i < n; i++) {
                            if (inner[j].id == startId) {
                                break;
                            }
                            // wrap around in round-robin fashion
                            j++;
                            if (j == n) {
                                j = 0;
                            }
                        }
                        // if we found it again, j will point to it
                        // otherwise, we continue with the replacement at j
                        index = j;
                        lastIndex = j;
                        lastId = inner[j].id;
                    }

                    int j = index;
                    // loop through all sources once to avoid delaying any new sources too much
                    for (int i = 0; i < n; i++) {
                        // eagerly check if child unsubscribed or we reached a terminal state.
                        if (checkTerminate()) {
                            skipFinal = true;
                            return;
                        }
                        @SuppressWarnings("unchecked")
                        InnerSubscriber<T> is = (InnerSubscriber<T>)inner[j];

                        Object o = null;
                        for (;;) {
                            int produced = 0;
                            while (r > 0) {
                                // eagerly check if child unsubscribed or we reached a terminal state.
                                if (checkTerminate()) {
                                    skipFinal = true;
                                    return;
                                }
                                RxRingBuffer q = is.queue;
                                if (q == null) {
                                    break;
                                }
                                o = q.poll();
                                if (o == null) {
                                    break;
                                }
                                T v = NotificationLite.getValue(o);
                                // if child throws, report bounce it back immediately
                                try {
                                    child.onNext(v);
                                } catch (Throwable t) {
                                    skipFinal = true;
                                    Exceptions.throwIfFatal(t);
                                    try {
                                        child.onError(t);
                                    } finally {
                                        unsubscribe();
                                    }
                                    return;
                                }
                                r--;
                                produced++;
                            }
                            if (produced > 0) {
                                if (!unbounded) {
                                    r = producer.produced(produced);
                                } else {
                                    r = Long.MAX_VALUE;
                                }
                                is.requestMore(produced);
                            }
                            // if we run out of requests or queued values, break
                            if (r == 0 || o == null) {
                                break;
                            }
                        }
                        boolean innerDone = is.done;
                        RxRingBuffer innerQueue = is.queue;
                        if (innerDone && (innerQueue == null || innerQueue.isEmpty())) {
                            removeInner(is);
                            if (checkTerminate()) {
                                skipFinal = true;
                                return;
                            }
                            replenishMain++;
                            innerCompleted = true;
                        }
                        // if we run out of requests, don't try the other sources
                        if (r == 0) {
                            break;
                        }

                        // wrap around in round-robin fashion
                        j++;
                        if (j == n) {
                            j = 0;
                        }
                    }
                    // if we run out of requests or just completed a round, save the index and id
                    lastIndex = j;
                    lastId = inner[j].id;
                }

                if (replenishMain > 0) {
                    request(replenishMain);
                }
                // if one or more inner completed, loop again to see if we can terminate the whole stream
                if (innerCompleted) {
                    continue;
                }
                // in case there were updates to the state, we loop again
                synchronized (this) {
                    if (!missed) {
                        skipFinal = true;
                        emitting = false;
                        break;
                    }
                    missed = false;
                }
            }
        } finally {
            if (!skipFinal) {
                synchronized (this) {
                    emitting = false;
                }
            }
        }
    }

至此,完成了flatmap變換過程的解析。

總結

flatmap的變換實際上就是先通過map()將Fun1轉換為Observable< Observable<T>>,再交由merge()將兩個Observable合併,生成新的Observable。當執行時,內部會通過不斷獲取佇列的資料,再交由Subscriber進行OnNext、onCompleted、onError的呼叫。