Rxjava(結合類)-Merge
合併多個Observables的發射物,Merge可能會讓合併的Observables發射的資料交錯(有一個類似的操作符Concat不會讓數 據交錯,它會按順序一個接著一個發射多個Observables的發射物
看兩個demo
Observable.merge(Observable.create(new Observable.OnSubscribe<Integer>() { @Override public void call(Subscriber<? super Integer> subscriber) { try { System.out.println("start 1"); int i = 5; while (i > 0) { subscriber.onNext(i); Thread.sleep(800); i--; } System.out.println("onCompleted"); subscriber.onCompleted(); } catch (InterruptedException e) { e.printStackTrace(); } } }).observeOn(Schedulers.from(JobExecutor.getInstance())), Observable.create(new Observable.OnSubscribe<Integer>() { @Override public void call(Subscriber<? super Integer> subscriber) { try { int i = 95; System.out.println("start 2"); while (i > 90) { subscriber.onNext(i); Thread.sleep(600); i--; } System.out.println("onCompleted2"); subscriber.onCompleted(); } catch (InterruptedException e) { e.printStackTrace(); } } }).observeOn(Schedulers.from(JobExecutor.getInstance()))).subscribe(new Action1<Integer>() { @Override public void call(Integer integer) { System.out.println(integer); } });
輸出:
start 1
5
4
3
2
1
onCompleted
start 2
95
94
93
92
91
onCompleted2
demo2
我們把observeOn換成subscribeOn
輸出Observable.merge(Observable.create(new Observable.OnSubscribe<Integer>() { @Override public void call(Subscriber<? super Integer> subscriber) { try { System.out.println("start 1"); int i = 5; while (i > 0) { subscriber.onNext(i); Thread.sleep(800); i--; } System.out.println("onCompleted"); subscriber.onCompleted(); } catch (InterruptedException e) { e.printStackTrace(); } } }).subscribeOn(Schedulers.from(JobExecutor.getInstance())), Observable.create(new Observable.OnSubscribe<Integer>() { @Override public void call(Subscriber<? super Integer> subscriber) { try { int i = 95; System.out.println("start 2"); while (i > 90) { subscriber.onNext(i); Thread.sleep(600); i--; } System.out.println("onCompleted2"); subscriber.onCompleted(); } catch (InterruptedException e) { e.printStackTrace(); } } }).subscribeOn(Schedulers.from(JobExecutor.getInstance()))).subscribe(new Action1<Integer>() { @Override public void call(Integer integer) { System.out.println(integer); } });
start 2
start 1
95
5
94
4
93
3
92
91
2
onCompleted2
1
onCompleted
可以看到,當我們merge的兩個observable是在同一執行緒 順序執行時,輸出也是按順序的,當我們兩個Observable可以並行執行時,則按他們的發射順序列印,當然,我們也可以用timer進行來進行發射
我們看下merge的實現
public static <T> Observable<T> merge(Observable<? extends T> t1, Observable<? extends T> t2) { return merge(new Observable[] { t1, t2 }); }
public static <T> Observable<T> merge(Observable<? extends T>[] sequences) {
return merge(from(sequences));
}
先看下from
public static <T> Observable<T> from(T[] array) {
int n = array.length;
if (n == 0) {
return empty();
} else
if (n == 1) {
return just(array[0]);
}
return create(new OnSubscribeFromArray<T>(array));
}
這裡建立了一個OnSubscribeFromArray
繼續分析merge
public static <T> Observable<T> merge(Observable<? extends Observable<? extends T>> source) {
if (source.getClass() == ScalarSynchronousObservable.class) {
return ((ScalarSynchronousObservable<T>)source).scalarFlatMap((Func1)UtilityFunctions.identity());
}
return source.lift(OperatorMerge.<T>instance(false));
}
呼叫lift,傳遞的operator是OperatorMerge,source是前面建立的Observable(OnSubscribeFromArray)
然後訂閱的時候會呼叫OnSubscribeLift的call
public void call(Subscriber<? super R> o) {
try {
Subscriber<? super T> st = RxJavaHooks.onObservableLift(operator).call(o);
try {
// new Subscriber created and being subscribed with so 'onStart' it
st.onStart();
parent.call(st);
} catch (Throwable e) {
// localized capture of errors rather than it skipping all operators
// and ending up in the try/catch of the subscribe method which then
// prevents onErrorResumeNext and other similar approaches to error handling
Exceptions.throwIfFatal(e);
st.onError(e);
}
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
// if the lift function failed all we can do is pass the error to the final Subscriber
// as we don't have the operator available to us
o.onError(e);
}
}
這裡的operatro是OperatorMerge
public Subscriber<Observable<? extends T>> call(final Subscriber<? super T> child) {
MergeSubscriber<T> subscriber = new MergeSubscriber<T>(child, delayErrors, maxConcurrent);
MergeProducer<T> producer = new MergeProducer<T>(subscriber);
subscriber.producer = producer;
child.add(subscriber);
child.setProducer(producer);
return subscriber;
}
返回了一個MergeSubscriber,並建立了MergeProducer
回到前面的call,這裡的parent是OnSubscribeFromArray
public void call(Subscriber<? super T> child) {
child.setProducer(new FromArrayProducer<T>(child, array));
}
建立一個FromArrayProducer,setProducer時候會呼叫它的request方法
@Override
public void request(long n) {
if (n < 0) {
throw new IllegalArgumentException("n >= 0 required but it was " + n);
}
if (n == Long.MAX_VALUE) {
if (BackpressureUtils.getAndAddRequest(this, n) == 0) {
fastPath();
}
} else
if (n != 0) {
if (BackpressureUtils.getAndAddRequest(this, n) == 0) {
slowPath(n);
}
}
}
呼叫fastPath
void fastPath() {
final Subscriber<? super T> child = this.child;
for (T t : array) {
if (child.isUnsubscribed()) {
return;
}
child.onNext(t);
}
if (child.isUnsubscribed()) {
return;
}
child.onCompleted();
}
array裡面的是我們merge函式裡面建立的Observable
這裡的child是前面建立的MergeSubscriber
呼叫它的onNext
public void onNext(Observable<? extends T> t) {
if (t == null) {
return;
}
if (t == Observable.empty()) {
emitEmpty();
} else
if (t instanceof ScalarSynchronousObservable) {
tryEmit(((ScalarSynchronousObservable<? extends T>)t).get());
} else {
InnerSubscriber<T> inner = new InnerSubscriber<T>(this, uniqueId++);
addInner(inner);
t.unsafeSubscribe(inner);
emit();
}
}
這裡最終會建立InnerSubscriber
呼叫unsafeSubscribe,最終呼叫t(OperatorSubscribeOn)的call
public void call(final Subscriber<? super T> subscriber) {
final Worker inner = scheduler.createWorker();
subscriber.add(inner);
inner.schedule(new Action0() {
@Override
public void call() {
final Thread t = Thread.currentThread();
Subscriber<T> s = new Subscriber<T>(subscriber) {
@Override
public void onNext(T t) {
subscriber.onNext(t);
}
@Override
public void onError(Throwable e) {
try {
subscriber.onError(e);
} finally {
inner.unsubscribe();
}
}
@Override
public void onCompleted() {
try {
subscriber.onCompleted();
} finally {
inner.unsubscribe();
}
}
@Override
public void setProducer(final Producer p) {
subscriber.setProducer(new Producer() {
@Override
public void request(final long n) {
if (t == Thread.currentThread()) {
p.request(n);
} else {
inner.schedule(new Action0() {
@Override
public void call() {
p.request(n);
}
});
}
}
});
}
};
source.unsafeSubscribe(s);
}
});
}
這裡呼叫schedule,最終會啟動一個執行緒呼叫這裡的call函式,回到fastPath,繼續處理merge中的下一個Observable
然後當上面schedule的執行緒起來後,呼叫這裡的call
這裡又定義了一個Subscriber,然後呼叫source訂閱它,這裡的source是我們merge中subscribeOn之前的Observable,這裡就會呼叫到我們demo中
public void call(Subscriber<? super Integer> subscriber) {
try {
System.out.println("thread ID:" + Thread.currentThread());
int i = 5;
while (i > 0) {
subscriber.onNext(i);
Thread.sleep(8000);
i--;
}
System.out.println("onCompleted");
subscriber.onCompleted();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
這裡傳遞的subscriber是剛剛schedule建立的s,呼叫它的onNext
public void onNext(T t) {
subscriber.onNext(t);
}
這裡的subscriber是InnerSubscriber,呼叫它的onNext
@Override
public void onNext(T t) {
parent.tryEmit(this, t);
}
這裡的parent是MergeSubscriber,呼叫它的tryEmit
void tryEmit(InnerSubscriber<T> subscriber, T value) {
boolean success = false;
long r = producer.get();
if (r != 0L) {
synchronized (this) {
// if nobody is emitting and child has available requests
r = producer.get();
if (!emitting && r != 0L) {
emitting = true;
success = true;
}
}
}
if (success) {
RxRingBuffer subscriberQueue = subscriber.queue;
if (subscriberQueue == null || subscriberQueue.isEmpty()) {
emitScalar(subscriber, value, r);
} else {
queueScalar(subscriber, value);
emitLoop();
}
} else {
queueScalar(subscriber, value);
emit();
}
}
呼叫emitScalar
protected void emitScalar(InnerSubscriber<T> subscriber, T value, long r) {
boolean skipFinal = false;
try {
try {
child.onNext(value);
} catch (Throwable t) {
if (!delayErrors) {
Exceptions.throwIfFatal(t);
skipFinal = true;
subscriber.unsubscribe();
subscriber.onError(t);
return;
}
getOrCreateErrorQueue().offer(t);
}
if (r != Long.MAX_VALUE) {
producer.produced(1);
}
subscriber.requestMore(1);
// check if some state changed while emitting
synchronized (this) {
skipFinal = true;
if (!missed) {
emitting = false;
return;
}
missed = false;
}
} finally {
if (!skipFinal) {
synchronized (this) {
emitting = false;
}
}
}
/*
* In the synchronized block below request(1) we check
* if there was a concurrent emission attempt and if there was,
* we stay in emission mode and enter the emission loop
* which will take care all the queued up state and
* emission possibilities.
*/
emitLoop();
}
child.onNext(value);最終會呼叫到我們的訂閱者。
同樣,當另一個執行緒啟動的時候也會回撥call,最終呼叫到我們merge中設定的回撥,所以兩個merge中的Observable可能會交錯發射。
相關推薦
Rxjava(結合類)-Merge
合併多個Observables的發射物,Merge可能會讓合併的Observables發射的資料交錯(有一個類似的操作符Concat不會讓數 據交錯,它會按順序一個接著一個發射多個Observables的發射物 看兩個demo Observable.merge(Obser
Rxjava結合操作符—merge、 Join
1、merge merge可以合併多個發射物 Javadoc: merge(Iterable) Javadoc: merge(Iterable,int) Javadoc: merge(Observable[]) Javadoc: merge(Obser
Retrofit與Rxjava結合使用例項
環境配置 在Module:app的build.gradle下新增如下依賴,然後sync now。下面這些依賴有些沒有用到,暫時都新增進去不會有錯。 dependencies { implem
Android:RxJava 結合 Retrofit 優雅實現 網路請求輪詢
前言 Rxjava,由於其基於事件流的鏈式呼叫、邏輯簡潔 & 使用簡單的特點,深受各大 Android開發者的歡迎。 RxJava如此受歡迎的原因,在於其提供了豐富 & 功能強大的操作符,幾乎能完成所有的功能需求 今天,我將為
王學崗RxJava(十六)——merge,zip,join等組合方法
組合就是多輸入,單輸出,大家看具體的例子 package com.example.acer.rxjavatest; import android.os.Bundle; import android.support.v7.app.AppCompatActi
Android:RxJava 結合 Retrofit 全面實現 網路請求出錯重連
前言 Rxjava,由於其基於事件流的鏈式呼叫、邏輯簡潔 & 使用簡單的特點,深受各大 Android開發者的歡迎。 RxJava如此受歡迎的原因,在於其提供了豐富 & 功能強大的操作符,幾乎能完成所有的功能需求 今天,我將為
RxJava結合Retrofit如何避免覆蓋http請求(終端當前http請求)
unSubscribe("goodsListSubscribe");//取消前一次的http請求,避免頻繁請求導致響應次序及資料混亂 Subscription subscribe = goodsDAL.queryGoodsList(firstCategoryId, s
Rxjava(變換類)-concatMap
demo Observable.from(aa).concatMap(new Func1<Integer, Observable<Integer>>() { @Override pub
Rxjava(過濾類)-Filter
只發射通過了謂詞測試的資料項 Observable.range(1, 10).filter(new Func1<Integer, Boolean>() { @Override public Boolean ca
Retrofit和RxJava結合使用
使用Retrofit的時候就不得不提到RxJava,RxJava是一個基於觀察者模式的非同步實現。關於RxJava的入門學習,強烈推薦《給Android開發者的RxJava詳解》 正如上篇部落格所說,得益於Retrofit中靈活的Converter
設計模式:抽象工廠模式,結合類圖秒懂!
通過前篇文章《設計模式:工廠模式,解除耦合的利器》的介紹,我們對工廠模式有了深入的瞭解,今天繼續介紹一種特殊的工廠模式,也就是抽象工廠模式。 定義 抽象工廠模式:提供一個建立一系列相關或相互依賴物件的介面,而無須指定它們具體的類。抽象工廠模式又稱為Kit模式,屬於物件建立型模式,是工廠方法模式的升級版,在
android開發之merge結合include優化布局
ted com match clas you title example ews 文件的 merge結合include優化android布局,效果不知道。個人感覺使用上也有非常大的局限。只是還是了解一下。記錄下來。 布局文件都要有根節點,但androi
【WIP】對象的類型與動態結合
this center 變量 ogr 改變 inter text bool class 創建: 2018/01/21 動態結合(多態) 動態結合 呼出同一個方法,根據呼出方不同執行的處理也不同 //---------------------
python小知識-__call__和類裝飾器的結合使用
python get 都沒有 IV ini ble 自定義 裝飾器 介紹 class Decorator(): def __init__(self, f): print(‘run in init......‘) self.f = f
OC 建立TableView基類並結合MJRefresh實現上拉重新整理,下拉載入
1.建立繼承自UITableView的基類BaseTableView: #import <UIKit/UIKit.h> @class BaseTableView; @protocol BaseTableViewDelegate <NSObject> @option
yii2 關於helper類 ArrayHelper::merge()
$arr1 = [ 'name' => 'terry', 'age' => 15, 'friend'=> [ 'zhangsan','lisi' ], 'work' =>[ 'aa' => 11, 'bb'
簡單Java類和資料庫操作及javafx的結合小專案
先圖為上 秦時明月漢時關,萬里長征人未還,妙呀,甚是.. 1.開始 1.專案目的: 開發工具: Idea + Mysql + JAVASE 1.其實簡單來說就是實現兩張資料表的基本操作, 1.新增 2. 刪除 3.修改
Android RxJava操作符的學習---功能性操作符--網路請求出錯重連(結合Retrofit)
1. 需求場景 2. 功能說明 功能需求說明 功能邏輯 例項說明 在本例子中:採用Get方法對 金山詞霸API 傳送網路請求 通過 斷開網路連線 模擬 網路異常錯誤(恢復網路即可
Android RxJava操作符的學習---功能性操作符--(有條件)網路請求輪詢(結合Retrofit)
1. 需求場景 2. 功能說明 採用Get方法對 金山詞霸API 按規定時間重複傳送網路請求,從而模擬 輪詢 需求實現 停止輪詢的條件 = 當輪詢到第4次時 採用 Gson 進行資料解析 3.
初識RxJava(六)判斷類 操作符
前言: 之前的 5 篇筆記已經將 RxJava 的操作符寫的差不多了,今天週五,雖然明天休息了,但是,生命不息學習不止,今天筆者來記錄一下 RxJava 的判斷類 操作符 。 正文: 1、all 操作符 1)、作用 判斷 被觀察者 發射的資料是否滿足規定條件,滿足的話,觀察者接收為 true、反之為