1. 程式人生 > >Rxjava(結合類)-Merge

Rxjava(結合類)-Merge

合併多個Observables的發射物,Merge可能會讓合併的Observables發射的資料交錯(有一個類似的操作符Concat不會讓數 據交錯,它會按順序一個接著一個發射多個Observables的發射物


看兩個demo

Observable.merge(Observable.create(new Observable.OnSubscribe<Integer>() {

            @Override
            public void call(Subscriber<? super Integer> subscriber) {
                try {
                    System.out.println("start 1");
                    int i = 5;
                    while (i > 0) {
                        subscriber.onNext(i);
                        Thread.sleep(800);
                        i--;
                    }
                    System.out.println("onCompleted");

                    subscriber.onCompleted();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }).observeOn(Schedulers.from(JobExecutor.getInstance())), Observable.create(new Observable.OnSubscribe<Integer>() {

            @Override
            public void call(Subscriber<? super Integer> subscriber) {
                try {
                    int i = 95;
                    System.out.println("start 2");
                    while (i > 90) {
                        subscriber.onNext(i);

                        Thread.sleep(600);
                        i--;
                    }
                    System.out.println("onCompleted2");

                    subscriber.onCompleted();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }).observeOn(Schedulers.from(JobExecutor.getInstance()))).subscribe(new Action1<Integer>() {
            @Override
            public void call(Integer integer) {
                System.out.println(integer);
            }
        });

輸出:

start 1
5
4
3
2
1
onCompleted
start 2
95
94
93
92
91
onCompleted2

demo2

我們把observeOn換成subscribeOn

      Observable.merge(Observable.create(new Observable.OnSubscribe<Integer>() {

            @Override
            public void call(Subscriber<? super Integer> subscriber) {
                try {
                    System.out.println("start 1");
                    int i = 5;
                    while (i > 0) {
                        subscriber.onNext(i);
                        Thread.sleep(800);
                        i--;
                    }
                    System.out.println("onCompleted");

                    subscriber.onCompleted();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }).subscribeOn(Schedulers.from(JobExecutor.getInstance())), Observable.create(new Observable.OnSubscribe<Integer>() {

            @Override
            public void call(Subscriber<? super Integer> subscriber) {
                try {
                    int i = 95;
                    System.out.println("start 2");
                    while (i > 90) {
                        subscriber.onNext(i);

                        Thread.sleep(600);
                        i--;
                    }
                    System.out.println("onCompleted2");

                    subscriber.onCompleted();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }).subscribeOn(Schedulers.from(JobExecutor.getInstance()))).subscribe(new Action1<Integer>() {
            @Override
            public void call(Integer integer) {
                System.out.println(integer);
            }
        });
輸出
start 2
start 1
95
5
94
4
93
3
92
91
2
onCompleted2
1
onCompleted

可以看到,當我們merge的兩個observable是在同一執行緒 順序執行時,輸出也是按順序的,當我們兩個Observable可以並行執行時,則按他們的發射順序列印,當然,我們也可以用timer進行來進行發射

我們看下merge的實現

    public static <T> Observable<T> merge(Observable<? extends T> t1, Observable<? extends T> t2) {
        return merge(new Observable[] { t1, t2 });
    }
    public static <T> Observable<T> merge(Observable<? extends T>[] sequences) {
        return merge(from(sequences));
    }
先看下from
    public static <T> Observable<T> from(T[] array) {
        int n = array.length;
        if (n == 0) {
            return empty();
        } else
        if (n == 1) {
            return just(array[0]);
        }
        return create(new OnSubscribeFromArray<T>(array));
    }
這裡建立了一個OnSubscribeFromArray

繼續分析merge

    public static <T> Observable<T> merge(Observable<? extends Observable<? extends T>> source) {
        if (source.getClass() == ScalarSynchronousObservable.class) {
            return ((ScalarSynchronousObservable<T>)source).scalarFlatMap((Func1)UtilityFunctions.identity());
        }
        return source.lift(OperatorMerge.<T>instance(false));
    }
呼叫lift,傳遞的operator是OperatorMerge,source是前面建立的Observable(OnSubscribeFromArray)

然後訂閱的時候會呼叫OnSubscribeLift的call

 public void call(Subscriber<? super R> o) {
        try {
            Subscriber<? super T> st = RxJavaHooks.onObservableLift(operator).call(o);
            try {
                // new Subscriber created and being subscribed with so 'onStart' it
                st.onStart();
                parent.call(st);
            } catch (Throwable e) {
                // localized capture of errors rather than it skipping all operators
                // and ending up in the try/catch of the subscribe method which then
                // prevents onErrorResumeNext and other similar approaches to error handling
                Exceptions.throwIfFatal(e);
                st.onError(e);
            }
        } catch (Throwable e) {
            Exceptions.throwIfFatal(e);
            // if the lift function failed all we can do is pass the error to the final Subscriber
            // as we don't have the operator available to us
            o.onError(e);
        }
    }

這裡的operatro是OperatorMerge
    public Subscriber<Observable<? extends T>> call(final Subscriber<? super T> child) {
        MergeSubscriber<T> subscriber = new MergeSubscriber<T>(child, delayErrors, maxConcurrent);
        MergeProducer<T> producer = new MergeProducer<T>(subscriber);
        subscriber.producer = producer;

        child.add(subscriber);
        child.setProducer(producer);

        return subscriber;
    }

返回了一個MergeSubscriber,並建立了MergeProducer

回到前面的call,這裡的parent是OnSubscribeFromArray

   public void call(Subscriber<? super T> child) {
        child.setProducer(new FromArrayProducer<T>(child, array));
    }
建立一個FromArrayProducer,setProducer時候會呼叫它的request方法
 @Override
        public void request(long n) {
            if (n < 0) {
                throw new IllegalArgumentException("n >= 0 required but it was " + n);
            }
            if (n == Long.MAX_VALUE) {
                if (BackpressureUtils.getAndAddRequest(this, n) == 0) {
                    fastPath();
                }
            } else
            if (n != 0) {
                if (BackpressureUtils.getAndAddRequest(this, n) == 0) {
                    slowPath(n);
                }
            }
        }
呼叫fastPath
void fastPath() {
            final Subscriber<? super T> child = this.child;

            for (T t : array) {
                if (child.isUnsubscribed()) {
                    return;
                }

                child.onNext(t);
            }

            if (child.isUnsubscribed()) {
                return;
            }
            child.onCompleted();
        }
array裡面的是我們merge函式裡面建立的Observable

這裡的child是前面建立的MergeSubscriber

呼叫它的onNext

 public void onNext(Observable<? extends T> t) {
            if (t == null) {
                return;
            }
            if (t == Observable.empty()) {
                emitEmpty();
            } else
            if (t instanceof ScalarSynchronousObservable) {
                tryEmit(((ScalarSynchronousObservable<? extends T>)t).get());
            } else {
                InnerSubscriber<T> inner = new InnerSubscriber<T>(this, uniqueId++);
                addInner(inner);
                t.unsafeSubscribe(inner);
                emit();
            }
        }
這裡最終會建立InnerSubscriber

呼叫unsafeSubscribe,最終呼叫t(OperatorSubscribeOn)的call

public void call(final Subscriber<? super T> subscriber) {
        final Worker inner = scheduler.createWorker();
        subscriber.add(inner);

        inner.schedule(new Action0() {
            @Override
            public void call() {
                final Thread t = Thread.currentThread();

                Subscriber<T> s = new Subscriber<T>(subscriber) {
                    @Override
                    public void onNext(T t) {
                        subscriber.onNext(t);
                    }

                    @Override
                    public void onError(Throwable e) {
                        try {
                            subscriber.onError(e);
                        } finally {
                            inner.unsubscribe();
                        }
                    }

                    @Override
                    public void onCompleted() {
                        try {
                            subscriber.onCompleted();
                        } finally {
                            inner.unsubscribe();
                        }
                    }

                    @Override
                    public void setProducer(final Producer p) {
                        subscriber.setProducer(new Producer() {
                            @Override
                            public void request(final long n) {
                                if (t == Thread.currentThread()) {
                                    p.request(n);
                                } else {
                                    inner.schedule(new Action0() {
                                        @Override
                                        public void call() {
                                            p.request(n);
                                        }
                                    });
                                }
                            }
                        });
                    }
                };

                source.unsafeSubscribe(s);
            }
        });
    }
這裡呼叫schedule,最終會啟動一個執行緒呼叫這裡的call函式,回到fastPath,繼續處理merge中的下一個Observable

然後當上面schedule的執行緒起來後,呼叫這裡的call

這裡又定義了一個Subscriber,然後呼叫source訂閱它,這裡的source是我們merge中subscribeOn之前的Observable,這裡就會呼叫到我們demo中

public void call(Subscriber<? super Integer> subscriber) {
                try {
                    System.out.println("thread ID:" + Thread.currentThread());
                    int i = 5;
                    while (i > 0) {
                        subscriber.onNext(i);
                        Thread.sleep(8000);
                        i--;
                    }
                    System.out.println("onCompleted");

                    subscriber.onCompleted();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }

這裡傳遞的subscriber是剛剛schedule建立的s,呼叫它的onNext

 public void onNext(T t) {
                        subscriber.onNext(t);
                    }

這裡的subscriber是InnerSubscriber,呼叫它的onNext
 @Override
        public void onNext(T t) {
            parent.tryEmit(this, t);
        }
這裡的parent是MergeSubscriber,呼叫它的tryEmit
void tryEmit(InnerSubscriber<T> subscriber, T value) {
            boolean success = false;
            long r = producer.get();
            if (r != 0L) {
                synchronized (this) {
                    // if nobody is emitting and child has available requests
                    r = producer.get();
                    if (!emitting && r != 0L) {
                        emitting = true;
                        success = true;
                    }
                }
            }
            if (success) {
                RxRingBuffer subscriberQueue = subscriber.queue;
                if (subscriberQueue == null || subscriberQueue.isEmpty()) {
                    emitScalar(subscriber, value, r);
                } else {
                    queueScalar(subscriber, value);
                    emitLoop();
                }
            } else {
                queueScalar(subscriber, value);
                emit();
            }
        }
呼叫emitScalar
 protected void emitScalar(InnerSubscriber<T> subscriber, T value, long r) {
            boolean skipFinal = false;
            try {
                try {
                    child.onNext(value);
                } catch (Throwable t) {
                    if (!delayErrors) {
                        Exceptions.throwIfFatal(t);
                        skipFinal = true;
                        subscriber.unsubscribe();
                        subscriber.onError(t);
                        return;
                    }
                    getOrCreateErrorQueue().offer(t);
                }
                if (r != Long.MAX_VALUE) {
                    producer.produced(1);
                }
                subscriber.requestMore(1);
                // check if some state changed while emitting
                synchronized (this) {
                    skipFinal = true;
                    if (!missed) {
                        emitting = false;
                        return;
                    }
                    missed = false;
                }
            } finally {
                if (!skipFinal) {
                    synchronized (this) {
                        emitting = false;
                    }
                }
            }
            /*
             * In the synchronized block below request(1) we check
             * if there was a concurrent emission attempt and if there was,
             * we stay in emission mode and enter the emission loop
             * which will take care all the queued up state and
             * emission possibilities.
             */
            emitLoop();
        }

child.onNext(value);最終會呼叫到我們的訂閱者。

同樣,當另一個執行緒啟動的時候也會回撥call,最終呼叫到我們merge中設定的回撥,所以兩個merge中的Observable可能會交錯發射。




相關推薦

Rxjava(結合)-Merge

合併多個Observables的發射物,Merge可能會讓合併的Observables發射的資料交錯(有一個類似的操作符Concat不會讓數 據交錯,它會按順序一個接著一個發射多個Observables的發射物 看兩個demo Observable.merge(Obser

Rxjava結合操作符—merge、 Join

1、merge merge可以合併多個發射物 Javadoc: merge(Iterable) Javadoc: merge(Iterable,int) Javadoc: merge(Observable[]) Javadoc: merge(Obser

Retrofit與Rxjava結合使用例項

環境配置 在Module:app的build.gradle下新增如下依賴,然後sync now。下面這些依賴有些沒有用到,暫時都新增進去不會有錯。 dependencies { implem

Android:RxJava 結合 Retrofit 優雅實現 網路請求輪詢

前言 Rxjava,由於其基於事件流的鏈式呼叫、邏輯簡潔 & 使用簡單的特點,深受各大 Android開發者的歡迎。 RxJava如此受歡迎的原因,在於其提供了豐富 & 功能強大的操作符,幾乎能完成所有的功能需求 今天,我將為

王學崗RxJava(十六)——merge,zip,join等組合方法

組合就是多輸入,單輸出,大家看具體的例子 package com.example.acer.rxjavatest; import android.os.Bundle; import android.support.v7.app.AppCompatActi

Android:RxJava 結合 Retrofit 全面實現 網路請求出錯重連

前言 Rxjava,由於其基於事件流的鏈式呼叫、邏輯簡潔 & 使用簡單的特點,深受各大 Android開發者的歡迎。 RxJava如此受歡迎的原因,在於其提供了豐富 & 功能強大的操作符,幾乎能完成所有的功能需求 今天,我將為

RxJava結合Retrofit如何避免覆蓋http請求(終端當前http請求)

unSubscribe("goodsListSubscribe");//取消前一次的http請求,避免頻繁請求導致響應次序及資料混亂 Subscription subscribe = goodsDAL.queryGoodsList(firstCategoryId, s

Rxjava(變換)-concatMap

demo Observable.from(aa).concatMap(new Func1<Integer, Observable<Integer>>() { @Override pub

Rxjava(過濾)-Filter

只發射通過了謂詞測試的資料項 Observable.range(1, 10).filter(new Func1<Integer, Boolean>() { @Override public Boolean ca

Retrofit和RxJava結合使用

使用Retrofit的時候就不得不提到RxJava,RxJava是一個基於觀察者模式的非同步實現。關於RxJava的入門學習,強烈推薦《給Android開發者的RxJava詳解》 正如上篇部落格所說,得益於Retrofit中靈活的Converter

設計模式:抽象工廠模式,結合圖秒懂!

通過前篇文章《設計模式:工廠模式,解除耦合的利器》的介紹,我們對工廠模式有了深入的瞭解,今天繼續介紹一種特殊的工廠模式,也就是抽象工廠模式。 定義 抽象工廠模式:提供一個建立一系列相關或相互依賴物件的介面,而無須指定它們具體的類。抽象工廠模式又稱為Kit模式,屬於物件建立型模式,是工廠方法模式的升級版,在

android開發之merge結合include優化布局

ted com match clas you title example ews 文件的 merge結合include優化android布局,效果不知道。個人感覺使用上也有非常大的局限。只是還是了解一下。記錄下來。 布局文件都要有根節點,但androi

【WIP】對象的型與動態結合

this center 變量 ogr 改變 inter text bool class 創建: 2018/01/21 動態結合(多態) 動態結合 呼出同一個方法,根據呼出方不同執行的處理也不同 //---------------------

python小知識-__call__和裝飾器的結合使用

python get 都沒有 IV ini ble 自定義 裝飾器 介紹 class Decorator(): def __init__(self, f): print(‘run in init......‘) self.f = f

OC 建立TableView基結合MJRefresh實現上拉重新整理,下拉載入

1.建立繼承自UITableView的基類BaseTableView: #import <UIKit/UIKit.h> @class BaseTableView; @protocol BaseTableViewDelegate <NSObject> @option

yii2 關於helper ArrayHelper::merge()

$arr1 = [ 'name' => 'terry', 'age' => 15, 'friend'=> [ 'zhangsan','lisi' ], 'work' =>[ 'aa' => 11, 'bb'

簡單Java和資料庫操作及javafx的結合小專案

先圖為上   秦時明月漢時關,萬里長征人未還,妙呀,甚是..   1.開始 1.專案目的:   開發工具: Idea + Mysql + JAVASE   1.其實簡單來說就是實現兩張資料表的基本操作,     1.新增     2. 刪除     3.修改

Android RxJava操作符的學習---功能性操作符--網路請求出錯重連(結合Retrofit)

1. 需求場景   2. 功能說明 功能需求說明     功能邏輯  例項說明 在本例子中:採用Get方法對 金山詞霸API 傳送網路請求 通過 斷開網路連線 模擬 網路異常錯誤(恢復網路即可

Android RxJava操作符的學習---功能性操作符--(有條件)網路請求輪詢(結合Retrofit)

1. 需求場景   2. 功能說明 採用Get方法對 金山詞霸API 按規定時間重複傳送網路請求,從而模擬 輪詢 需求實現 停止輪詢的條件 = 當輪詢到第4次時 採用 Gson 進行資料解析   3.

初識RxJava(六)判斷 操作符

前言: 之前的 5 篇筆記已經將 RxJava 的操作符寫的差不多了,今天週五,雖然明天休息了,但是,生命不息學習不止,今天筆者來記錄一下 RxJava 的判斷類 操作符 。 正文: 1、all 操作符 1)、作用 判斷 被觀察者 發射的資料是否滿足規定條件,滿足的話,觀察者接收為 true、反之為