Rxjava(變換類)-concatMap
demo
Observable.from(aa).concatMap(new Func1<Integer, Observable<Integer>>() { @Override public Observable<Integer> call(Integer number) { return Observable.just(number * number).subscribeOn(Schedulers.from(JobExecutor.getInstance())); } }).subscribe(new Action1<Integer>() { @Override public void call(Integer integer) { System.out.print(integer + ", "); } });
看下contctMap
public final <R> Observable<R> concatMap(Func1<? super T, ? extends Observable<? extends R>> func) { if (this instanceof ScalarSynchronousObservable) { ScalarSynchronousObservable<T> scalar = (ScalarSynchronousObservable<T>) this; return scalar.scalarFlatMap(func); } return create(new OnSubscribeConcatMap<T, R>(this, func, 2, OnSubscribeConcatMap.IMMEDIATE)); }
這裡 如果 是 ScalarSynchronousObservable 就跟flatMap一樣,我們看下不是ScalarSynchronousObservable型別
這裡會建立一個OnSubscribeConcatMap,這裡會把Func1儲存到mapper,另外兩個引數是 2, OnSubscribeConcatMap.IMMEDIATE
然後後面subscribe的時候會呼叫OnSubscribeConcatMap的call函式,我們看一下它的實現
這裡delayErrorMode == IMMEDIATE,走if分支,建立一個SerializedSubscriberpublic void call(Subscriber<? super R> child) { Subscriber<? super R> s; if (delayErrorMode == IMMEDIATE) { s = new SerializedSubscriber<R>(child); } else { s = child; } final ConcatMapSubscriber<T, R> parent = new ConcatMapSubscriber<T, R>(s, mapper, prefetch, delayErrorMode); child.add(parent); child.add(parent.inner); child.setProducer(new Producer() { @Override public void request(long n) { parent.requestMore(n); } }); if (!child.isUnsubscribed()) { source.unsafeSubscribe(parent); } }
然後以前面建立的SerializedSubscriber另加mapper, prefetch, delayErrorMode建立ConcatMapSubscriber
public ConcatMapSubscriber(Subscriber<? super R> actual,
Func1<? super T, ? extends Observable<? extends R>> mapper, int prefetch, int delayErrorMode) {
this.actual = actual;
this.mapper = mapper;
this.delayErrorMode = delayErrorMode;
this.arbiter = new ProducerArbiter();
this.wip = new AtomicInteger();
this.error = new AtomicReference<Throwable>();
Queue<Object> q;
if (UnsafeAccess.isUnsafeAvailable()) {
q = new SpscArrayQueue<Object>(prefetch);
} else {
q = new SpscAtomicArrayQueue<Object>(prefetch);
}
this.queue = q;
this.inner = new SerialSubscription();
this.request(prefetch);
}
這裡會建立一個ProducerArbiter,呼叫request,這裡prefetch是2
protected final void request(long n) {
if (n < 0) {
throw new IllegalArgumentException("number requested cannot be negative: " + n);
}
// if producer is set then we will request from it
// otherwise we increase the requested count by n
Producer producerToRequestFrom;
synchronized (this) {
if (producer != null) {
producerToRequestFrom = producer;
} else {
addToRequested(n);
return;
}
}
// after releasing lock (we should not make requests holding a lock)
producerToRequestFrom.request(n);
}
producer為null,呼叫addToRequested
private void addToRequested(long n) {
if (requested == NOT_SET) {
requested = n;
} else {
final long total = requested + n;
// check if overflow occurred
if (total < 0) {
requested = Long.MAX_VALUE;
} else {
requested = total;
}
}
}
這裡把requested設定為了2
回到前面的call函式
然後呼叫child 的setProducer,最終會呼叫到這裡Producer的request方法
從而呼叫
parent.requestMore(n);
這裡的parent就是我們剛剛建立的ConcatMapSubscriber,我們看下它的requestMore方法
void requestMore(long n) {
if (n > 0) {
arbiter.request(n);
} else
if (n < 0) {
throw new IllegalArgumentException("n >= 0 required but it was " + n);
}
}
arbiter就是前面建立的ProducerArbiter public void request(long n) {
if (n < 0) {
throw new IllegalArgumentException("n >= 0 required");
}
if (n == 0) {
return;
}
synchronized (this) {
if (emitting) {
missedRequested += n;
return;
}
emitting = true;
}
boolean skipFinal = false;
try {
long r = requested;
long u = r + n;
if (u < 0) {
u = Long.MAX_VALUE;
}
requested = u;
Producer p = currentProducer;
if (p != null) {
p.request(n);
}
emitLoop();
skipFinal = true;
} finally {
if (!skipFinal) {
synchronized (this) {
emitting = false;
}
}
}
}
這裡n為Long.MAX_VALUE
emitting設定為true,currentProducer為null,呼叫emitLoop
public void emitLoop() {
for (;;) {
long localRequested;
long localProduced;
Producer localProducer;
synchronized (this) {
localRequested = missedRequested;
localProduced = missedProduced;
localProducer = missedProducer;
if (localRequested == 0L
&& localProduced == 0L
&& localProducer == null) {
emitting = false;
return;
}
missedRequested = 0L;
missedProduced = 0L;
missedProducer = null;
}
long r = requested;
if (r != Long.MAX_VALUE) {
long u = r + localRequested;
if (u < 0 || u == Long.MAX_VALUE) {
r = Long.MAX_VALUE;
requested = r;
} else {
long v = u - localProduced;
if (v < 0) {
throw new IllegalStateException("more produced than requested");
}
r = v;
requested = v;
}
}
if (localProducer != null) {
if (localProducer == NULL_PRODUCER) {
currentProducer = null;
} else {
currentProducer = localProducer;
localProducer.request(r);
}
} else {
Producer p = currentProducer;
if (p != null && localRequested != 0L) {
p.request(localRequested);
}
}
}
}
滿足if (localRequested == 0L
&& localProduced == 0L
&& localProducer == null)
把emitting設定為false返回
再次回到call函式,呼叫
source.unsafeSubscribe(parent);
呼叫source的call函式,這裡的source是OnSubscribeFromIterable
public void call(final Subscriber<? super T> o) {
Iterator<? extends T> it;
boolean b;
try {
it = is.iterator();
b = it.hasNext();
} catch (Throwable ex) {
Exceptions.throwOrReport(ex, o);
return;
}
if (!o.isUnsubscribed()) {
if (!b) {
o.onCompleted();
} else {
o.setProducer(new IterableProducer<T>(o, it));
}
}
}
先獲取值,然後呼叫setProducer,這裡new了一個IterableProducer,最終會呼叫它的request方法,這裡的o是ConcatMapSubscriber
@Override
public void request(long n) {
if (get() == Long.MAX_VALUE) {
// already started with fast-path
return;
}
if (n == Long.MAX_VALUE && compareAndSet(0, Long.MAX_VALUE)) {
fastPath();
} else
if (n > 0 && BackpressureUtils.getAndAddRequest(this, n) == 0L) {
slowPath(n);
}
}
這裡n為2,前面初始化ConcatMapSubscriber的時候設定的
所以這裡走的是slowPath
void slowPath(long n) {
// backpressure is requested
final Subscriber<? super T> o = this.o;
final Iterator<? extends T> it = this.it;
long r = n;
long e = 0;
for (;;) {
while (e != r) {
if (o.isUnsubscribed()) {
return;
}
T value;
try {
value = it.next();
} catch (Throwable ex) {
Exceptions.throwOrReport(ex, o);
return;
}
o.onNext(value);
if (o.isUnsubscribed()) {
return;
}
boolean b;
try {
b = it.hasNext();
} catch (Throwable ex) {
Exceptions.throwOrReport(ex, o);
return;
}
if (!b) {
if (!o.isUnsubscribed()) {
o.onCompleted();
}
return;
}
e++;
}
r = get();
if (e == r) {
r = BackpressureUtils.produced(this, e);
if (r == 0L) {
break;
}
e = 0L;
}
}
}
之類呼叫o的onNext,o是ConcatMapSubscriber
@Override
public void onNext(T t) {
if (!queue.offer(NotificationLite.next(t))) {
unsubscribe();
onError(new MissingBackpressureException());
} else {
drain();
}
}
queue.offer會把當前值入佇列呼叫drain
void drain() {
if (wip.getAndIncrement() != 0) {
return;
}
final int delayErrorMode = this.delayErrorMode;
for (;;) {
if (actual.isUnsubscribed()) {
return;
}
if (!active) {
if (delayErrorMode == BOUNDARY) {
if (error.get() != null) {
Throwable ex = ExceptionsUtils.terminate(error);
if (!ExceptionsUtils.isTerminated(ex)) {
actual.onError(ex);
}
return;
}
}
boolean mainDone = done;
Object v = queue.poll();
boolean empty = v == null;
if (mainDone && empty) {
Throwable ex = ExceptionsUtils.terminate(error);
if (ex == null) {
actual.onCompleted();
} else
if (!ExceptionsUtils.isTerminated(ex)) {
actual.onError(ex);
}
return;
}
if (!empty) {
Observable<? extends R> source;
try {
source = mapper.call(NotificationLite.<T>getValue(v));
} catch (Throwable mapperError) {
Exceptions.throwIfFatal(mapperError);
drainError(mapperError);
return;
}
if (source == null) {
drainError(new NullPointerException("The source returned by the mapper was null"));
return;
}
if (source != Observable.empty()) {
if (source instanceof ScalarSynchronousObservable) {
ScalarSynchronousObservable<? extends R> scalarSource = (ScalarSynchronousObservable<? extends R>) source;
active = true;
arbiter.setProducer(new ConcatMapInnerScalarProducer<T, R>(scalarSource.get(), this));
} else {
ConcatMapInnerSubscriber<T, R> innerSubscriber = new ConcatMapInnerSubscriber<T, R>(this);
inner.set(innerSubscriber);
if (!innerSubscriber.isUnsubscribed()) {
active = true;
source.unsafeSubscribe(innerSubscriber);
} else {
return;
}
}
request(1);
} else {
request(1);
continue;
}
}
}
if (wip.decrementAndGet() == 0) {
break;
}
}
}
active開始為false,進入後會把它設定為true,這裡如果active為true,則表示前一個操作未完成,直接返回(前一個操作什麼時候完成後面會講)出佇列,主要的是呼叫mapper的call函式
public Observable<Integer> call(Integer number) {
return Observable.just(number * number).subscribeOn(Schedulers.from(JobExecutor.getInstance()));
}
最終會建立一個ScalarAsyncOnSubscribe它的onSubscribe是OperatorSubscribeOn
回到drain函式,source不是ScalarSynchronousObservable,走else分支
ConcatMapInnerSubscriber<T, R> innerSubscriber = new ConcatMapInnerSubscriber<T, R>(this);
inner.set(innerSubscriber);
if (!innerSubscriber.isUnsubscribed()) {
active = true;
source.unsafeSubscribe(innerSubscriber);
} else {
return;
}
}
request(1);
建立一個ConcatMapInnerSubscriber並設定給inner,inner是我們初始化時建立的SerialSubscription
並走if分支,active設定為true,呼叫unsafeSubscribe,最終呼叫ScalarAsyncOnSubscribe的call函式
public void call(Subscriber<? super T> s) {
s.setProducer(new ScalarAsyncProducer<T>(s, value, onSchedule));
}
這裡建立了一個ScalarAsyncProducer,呼叫setProducer
@Override
public void setProducer(Producer p) {
parent.arbiter.setProducer(p);
}
這裡的arbiter是ProducerArbiter,
public void setProducer(Producer newProducer) {
synchronized (this) {
if (emitting) {
missedProducer = newProducer == null ? NULL_PRODUCER : newProducer;
return;
}
emitting = true;
}
boolean skipFinal = false;
try {
currentProducer = newProducer;
if (newProducer != null) {
newProducer.request(requested);
}
emitLoop();
skipFinal = true;
} finally {
if (!skipFinal) {
synchronized (this) {
emitting = false;
}
}
}
}
主要呼叫
newProducer.request(requested);
newProducer為ScalarAsyncProducer @Override
public void request(long n) {
if (n < 0L) {
throw new IllegalArgumentException("n >= 0 required but it was " + n);
}
if (n != 0 && compareAndSet(false, true)) {
actual.add(onSchedule.call(this));
}
}
我們看一下onSchedule
w.schedule(new Action0() {
@Override
public void call() {
try {
a.call();
} finally {
w.unsubscribe();
}
}
});
------------------------------------------------------------------------------------------------------------
這裡的a是ScalarSynchronousObservable,這裡已經在另外一個執行緒執行了
我們看一下他的call
@Override
public void call() {
Subscriber<? super T> a = actual;
if (a.isUnsubscribed()) {
return;
}
T v = value;
try {
a.onNext(v);
} catch (Throwable e) {
Exceptions.throwOrReport(e, a, v);
return;
}
if (a.isUnsubscribed()) {
return;
}
a.onCompleted();
}
這裡的a是ConcatMapInnerSubscriber,他的onNext函式我們就不去分析了,我們看一下他的onComplete函式
public void onCompleted() {
parent.innerCompleted(produced);
}
這裡parent為ConcatMapSubscriber
void innerCompleted(long produced) {
if (produced != 0L) {
arbiter.produced(produced);
}
active = false;
drain();
}
這裡會把active 設定為false重新呼叫drain函式,如果佇列中有值,則繼續執行佇列中的請求
----------------------------------------------------------------------------------
我們重新回到前面第一個執行緒在schedule啟動一個執行緒之後,繼續回到slowPath,然後處理下一個請求,把值入佇列,如果當前有請求執行,則返回。這樣所有的執行都是一個接著一個執行的。
所以最終的輸出結果也是順序的
4, 9, 16, 25, 36, 49, 64, 81, 100,
這裡在除錯的時候為了 延遲一個操作執行的時間,模擬一個操作長時間執行,drain中active 為false狀態,我們可以在sursurib中新增sleep
ArrayList aa = new ArrayList<>(Arrays.asList(2, 3, 4, 5, 6, 7, 8, 9, 10));
Observable.from(aa).concatMap(new Func1<Integer, Observable<Integer>>() {
@Override
public Observable<Integer> call(Integer number) {
return Observable.just(number * number).subscribeOn(Schedulers.from(JobExecutor.getInstance()));
}
}).subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
System.out.print(integer + ", ");
try {
Thread.sleep(60000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});