1. 程式人生 > >Rxjava(變換類)-concatMap

Rxjava(變換類)-concatMap

demo

        Observable.from(aa).concatMap(new Func1<Integer, Observable<Integer>>() {
            @Override
            public Observable<Integer> call(Integer number) {
                return Observable.just(number * number).subscribeOn(Schedulers.from(JobExecutor.getInstance()));
            }

        }).subscribe(new Action1<Integer>() {
            @Override
            public void call(Integer integer) {
                System.out.print(integer + ", ");
            }
        });

看下contctMap
    public final <R> Observable<R> concatMap(Func1<? super T, ? extends Observable<? extends R>> func) {
        if (this instanceof ScalarSynchronousObservable) {
            ScalarSynchronousObservable<T> scalar = (ScalarSynchronousObservable<T>) this;
            return scalar.scalarFlatMap(func);
        }
        return create(new OnSubscribeConcatMap<T, R>(this, func, 2, OnSubscribeConcatMap.IMMEDIATE));
    }

這裡 如果 是 ScalarSynchronousObservable 就跟flatMap一樣,我們看下不是ScalarSynchronousObservable型別

這裡會建立一個OnSubscribeConcatMap,這裡會把Func1儲存到mapper,另外兩個引數是 2, OnSubscribeConcatMap.IMMEDIATE

然後後面subscribe的時候會呼叫OnSubscribeConcatMap的call函式,我們看一下它的實現

public void call(Subscriber<? super R> child) {
        Subscriber<? super R> s;

        if (delayErrorMode == IMMEDIATE) {
            s = new SerializedSubscriber<R>(child);
        } else {
            s = child;
        }

        final ConcatMapSubscriber<T, R> parent = new ConcatMapSubscriber<T, R>(s, mapper, prefetch, delayErrorMode);

        child.add(parent);
        child.add(parent.inner);
        child.setProducer(new Producer() {
            @Override
            public void request(long n) {
                parent.requestMore(n);
            }
        });

        if (!child.isUnsubscribed()) {
            source.unsafeSubscribe(parent);
        }
    }
這裡delayErrorMode == IMMEDIATE,走if分支,建立一個SerializedSubscriber

然後以前面建立的SerializedSubscriber另加mapper, prefetch, delayErrorMode建立ConcatMapSubscriber

        public ConcatMapSubscriber(Subscriber<? super R> actual,
                Func1<? super T, ? extends Observable<? extends R>> mapper, int prefetch, int delayErrorMode) {
            this.actual = actual;
            this.mapper = mapper;
            this.delayErrorMode = delayErrorMode;
            this.arbiter = new ProducerArbiter();
            this.wip = new AtomicInteger();
            this.error = new AtomicReference<Throwable>();
            Queue<Object> q;
            if (UnsafeAccess.isUnsafeAvailable()) {
                q = new SpscArrayQueue<Object>(prefetch);
            } else {
                q = new SpscAtomicArrayQueue<Object>(prefetch);
            }
            this.queue = q;
            this.inner = new SerialSubscription();
            this.request(prefetch);
        }
這裡會建立一個ProducerArbiter,呼叫request,這裡prefetch是2
protected final void request(long n) {
        if (n < 0) {
            throw new IllegalArgumentException("number requested cannot be negative: " + n);
        }

        // if producer is set then we will request from it
        // otherwise we increase the requested count by n
        Producer producerToRequestFrom;
        synchronized (this) {
            if (producer != null) {
                producerToRequestFrom = producer;
            } else {
                addToRequested(n);
                return;
            }
        }
        // after releasing lock (we should not make requests holding a lock)
        producerToRequestFrom.request(n);
    }

producer為null,呼叫addToRequested

    private void addToRequested(long n) {
        if (requested == NOT_SET) {
            requested = n;
        } else {
            final long total = requested + n;
            // check if overflow occurred
            if (total < 0) {
                requested = Long.MAX_VALUE;
            } else {
                requested = total;
            }
        }
    }

這裡把requested設定為了2

回到前面的call函式

然後呼叫child 的setProducer,最終會呼叫到這裡Producer的request方法

從而呼叫

 parent.requestMore(n);
這裡的parent就是我們剛剛建立的ConcatMapSubscriber,我們看下它的requestMore方法
        void requestMore(long n) {
            if (n > 0) {
                arbiter.request(n);
            } else
            if (n < 0) {
                throw new IllegalArgumentException("n >= 0 required but it was " + n);
            }
        }
arbiter就是前面建立的ProducerArbiter

 public void request(long n) {
        if (n < 0) {
            throw new IllegalArgumentException("n >= 0 required");
        }
        if (n == 0) {
            return;
        }
        synchronized (this) {
            if (emitting) {
                missedRequested += n;
                return;
            }
            emitting = true;
        }
        boolean skipFinal = false;
        try {
            long r = requested;
            long u = r + n;
            if (u < 0) {
                u = Long.MAX_VALUE;
            }
            requested = u;

            Producer p = currentProducer;
            if (p != null) {
                p.request(n);
            }

            emitLoop();
            skipFinal = true;
        } finally {
            if (!skipFinal) {
                synchronized (this) {
                    emitting = false;
                }
            }
        }
    }

這裡n為Long.MAX_VALUE

emitting設定為true,currentProducer為null,呼叫emitLoop

public void emitLoop() {
        for (;;) {
            long localRequested;
            long localProduced;
            Producer localProducer;
            synchronized (this) {
                localRequested = missedRequested;
                localProduced = missedProduced;
                localProducer = missedProducer;
                if (localRequested == 0L
                        && localProduced == 0L
                        && localProducer == null) {
                    emitting = false;
                    return;
                }
                missedRequested = 0L;
                missedProduced = 0L;
                missedProducer = null;
            }

            long r = requested;

            if (r != Long.MAX_VALUE) {
                long u = r + localRequested;
                if (u < 0 || u == Long.MAX_VALUE) {
                    r = Long.MAX_VALUE;
                    requested = r;
                } else {
                    long v = u - localProduced;
                    if (v < 0) {
                        throw new IllegalStateException("more produced than requested");
                    }
                    r = v;
                    requested = v;
                }
            }
            if (localProducer != null) {
                if (localProducer == NULL_PRODUCER) {
                    currentProducer = null;
                } else {
                    currentProducer = localProducer;
                    localProducer.request(r);
                }
            } else {
                Producer p = currentProducer;
                if (p != null && localRequested != 0L) {
                    p.request(localRequested);
                }
            }
        }
    }
滿足
if (localRequested == 0L
                        && localProduced == 0L
                        && localProducer == null)
把emitting設定為false返回

再次回到call函式,呼叫

 source.unsafeSubscribe(parent);
呼叫source的call函式,這裡的source是OnSubscribeFromIterable
public void call(final Subscriber<? super T> o) {
        Iterator<? extends T> it;
        boolean b;

        try {
            it = is.iterator();

            b = it.hasNext();
        } catch (Throwable ex) {
            Exceptions.throwOrReport(ex, o);
            return;
        }

        if (!o.isUnsubscribed()) {
            if (!b) {
                o.onCompleted();
            } else {
                o.setProducer(new IterableProducer<T>(o, it));
            }
        }
    }
先獲取值,然後呼叫setProducer,這裡new了一個IterableProducer,最終會呼叫它的request方法,這裡的o是ConcatMapSubscriber
       @Override
        public void request(long n) {
            if (get() == Long.MAX_VALUE) {
                // already started with fast-path
                return;
            }
            if (n == Long.MAX_VALUE && compareAndSet(0, Long.MAX_VALUE)) {
                fastPath();
            } else
            if (n > 0 && BackpressureUtils.getAndAddRequest(this, n) == 0L) {
                slowPath(n);
            }

        }
這裡n為2,前面初始化ConcatMapSubscriber的時候設定的

所以這裡走的是slowPath

void slowPath(long n) {
            // backpressure is requested
            final Subscriber<? super T> o = this.o;
            final Iterator<? extends T> it = this.it;

            long r = n;
            long e = 0;

            for (;;) {
                while (e != r) {
                    if (o.isUnsubscribed()) {
                        return;
                    }

                    T value;

                    try {
                        value = it.next();
                    } catch (Throwable ex) {
                        Exceptions.throwOrReport(ex, o);
                        return;
                    }

                    o.onNext(value);

                    if (o.isUnsubscribed()) {
                        return;
                    }

                    boolean b;

                    try {
                        b = it.hasNext();
                    } catch (Throwable ex) {
                        Exceptions.throwOrReport(ex, o);
                        return;
                    }

                    if (!b) {
                        if (!o.isUnsubscribed()) {
                            o.onCompleted();
                        }
                        return;
                    }

                    e++;
                }

                r = get();
                if (e == r) {
                    r = BackpressureUtils.produced(this, e);
                    if (r == 0L) {
                        break;
                    }
                    e = 0L;
                }
            }

        }
之類呼叫o的onNext,o是ConcatMapSubscriber
        @Override
        public void onNext(T t) {
            if (!queue.offer(NotificationLite.next(t))) {
                unsubscribe();
                onError(new MissingBackpressureException());
            } else {
                drain();
            }
        }
queue.offer會把當前值入佇列呼叫drain
void drain() {
            if (wip.getAndIncrement() != 0) {
                return;
            }

            final int delayErrorMode = this.delayErrorMode;

            for (;;) {
                if (actual.isUnsubscribed()) {
                    return;
                }

                if (!active) {
                    if (delayErrorMode == BOUNDARY) {
                        if (error.get() != null) {
                            Throwable ex = ExceptionsUtils.terminate(error);
                            if (!ExceptionsUtils.isTerminated(ex)) {
                                actual.onError(ex);
                            }
                            return;
                        }
                    }

                    boolean mainDone = done;
                    Object v = queue.poll();
                    boolean empty = v == null;

                    if (mainDone && empty) {
                        Throwable ex = ExceptionsUtils.terminate(error);
                        if (ex == null) {
                            actual.onCompleted();
                        } else
                        if (!ExceptionsUtils.isTerminated(ex)) {
                            actual.onError(ex);
                        }
                        return;
                    }

                    if (!empty) {

                        Observable<? extends R> source;

                        try {
                            source = mapper.call(NotificationLite.<T>getValue(v));
                        } catch (Throwable mapperError) {
                            Exceptions.throwIfFatal(mapperError);
                            drainError(mapperError);
                            return;
                        }

                        if (source == null) {
                            drainError(new NullPointerException("The source returned by the mapper was null"));
                            return;
                        }

                        if (source != Observable.empty()) {

                            if (source instanceof ScalarSynchronousObservable) {
                                ScalarSynchronousObservable<? extends R> scalarSource = (ScalarSynchronousObservable<? extends R>) source;

                                active = true;

                                arbiter.setProducer(new ConcatMapInnerScalarProducer<T, R>(scalarSource.get(), this));
                            } else {
                                ConcatMapInnerSubscriber<T, R> innerSubscriber = new ConcatMapInnerSubscriber<T, R>(this);
                                inner.set(innerSubscriber);

                                if (!innerSubscriber.isUnsubscribed()) {
                                    active = true;

                                    source.unsafeSubscribe(innerSubscriber);
                                } else {
                                    return;
                                }
                            }
                            request(1);
                        } else {
                            request(1);
                            continue;
                        }
                    }
                }
                if (wip.decrementAndGet() == 0) {
                    break;
                }
            }
        }
active開始為false,進入後會把它設定為true,這裡如果active為true,則表示前一個操作未完成,直接返回(前一個操作什麼時候完成後面會講)
出佇列,主要的是呼叫mapper的call函式
public Observable<Integer> call(Integer number) {
                return Observable.just(number * number).subscribeOn(Schedulers.from(JobExecutor.getInstance()));
            }
最終會建立一個ScalarAsyncOnSubscribe它的onSubscribe是OperatorSubscribeOn  

回到drain函式,source不是ScalarSynchronousObservable,走else分支

 ConcatMapInnerSubscriber<T, R> innerSubscriber = new ConcatMapInnerSubscriber<T, R>(this);
                                inner.set(innerSubscriber);

                                if (!innerSubscriber.isUnsubscribed()) {
                                    active = true;

                                    source.unsafeSubscribe(innerSubscriber);
                                } else {
                                    return;
                                }
                            }
                            request(1);

建立一個ConcatMapInnerSubscriber並設定給inner,inner是我們初始化時建立的SerialSubscription

並走if分支,active設定為true,呼叫unsafeSubscribe,最終呼叫ScalarAsyncOnSubscribe的call函式

 public void call(Subscriber<? super T> s) {
            s.setProducer(new ScalarAsyncProducer<T>(s, value, onSchedule));
        }

這裡建立了一個ScalarAsyncProducer,呼叫setProducer
        @Override
        public void setProducer(Producer p) {
            parent.arbiter.setProducer(p);
        }

這裡的arbiter是ProducerArbiter,
public void setProducer(Producer newProducer) {
        synchronized (this) {
            if (emitting) {
                missedProducer = newProducer == null ? NULL_PRODUCER : newProducer;
                return;
            }
            emitting = true;
        }
        boolean skipFinal = false;
        try {
            currentProducer = newProducer;
            if (newProducer != null) {
                newProducer.request(requested);
            }

            emitLoop();
            skipFinal = true;
        } finally {
            if (!skipFinal) {
                synchronized (this) {
                    emitting = false;
                }
            }
        }
    }

主要呼叫
 newProducer.request(requested);
newProducer為ScalarAsyncProducer
        @Override
        public void request(long n) {
            if (n < 0L) {
                throw new IllegalArgumentException("n >= 0 required but it was " + n);
            }
            if (n != 0 && compareAndSet(false, true)) {
                actual.add(onSchedule.call(this));
            }
        }
我們看一下onSchedule
      w.schedule(new Action0() {
                        @Override
                        public void call() {
                            try {
                                a.call();
                            } finally {
                                w.unsubscribe();
                            }
                        }
                    });

------------------------------------------------------------------------------------------------------------

這裡的a是ScalarSynchronousObservable,這裡已經在另外一個執行緒執行了

我們看一下他的call

 @Override
        public void call() {
            Subscriber<? super T> a = actual;
            if (a.isUnsubscribed()) {
                return;
            }
            T v = value;
            try {
                a.onNext(v);
            } catch (Throwable e) {
                Exceptions.throwOrReport(e, a, v);
                return;
            }
            if (a.isUnsubscribed()) {
                return;
            }
            a.onCompleted();
        }
這裡的a是ConcatMapInnerSubscriber,他的onNext函式我們就不去分析了,我們看一下他的onComplete函式
 public void onCompleted() {
            parent.innerCompleted(produced);
        }
這裡parent為ConcatMapSubscriber
     void innerCompleted(long produced) {
            if (produced != 0L) {
                arbiter.produced(produced);
            }
            active = false;
            drain();
        }

這裡會把active 設定為false重新呼叫drain函式,如果佇列中有值,則繼續執行佇列中的請求
----------------------------------------------------------------------------------

我們重新回到前面第一個執行緒在schedule啟動一個執行緒之後,繼續回到slowPath,然後處理下一個請求,把值入佇列,如果當前有請求執行,則返回。這樣所有的執行都是一個接著一個執行的。

所以最終的輸出結果也是順序的

4, 9, 16, 25, 36, 49, 64, 81, 100, 
這裡在除錯的時候為了 延遲一個操作執行的時間,模擬一個操作長時間執行,drain中active 為false狀態,我們可以在sursurib中新增sleep
      ArrayList aa = new ArrayList<>(Arrays.asList(2, 3, 4, 5, 6, 7, 8, 9, 10));
        Observable.from(aa).concatMap(new Func1<Integer, Observable<Integer>>() {
            @Override
            public Observable<Integer> call(Integer number) {
                return Observable.just(number * number).subscribeOn(Schedulers.from(JobExecutor.getInstance()));
            }

        }).subscribe(new Action1<Integer>() {
            @Override
            public void call(Integer integer) {
                System.out.print(integer + ", ");
                try {
                    Thread.sleep(60000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        });