1. 程式人生 > >RxJava(11-執行緒排程Scheduler)

RxJava(11-執行緒排程Scheduler)

目錄:

  RxJava中 使用observeOnsubscribeOn操作符,你可以讓Observable在一個特定的排程器上執行,observeOn指示一個Observable在一個特定的排程器上呼叫觀察者的onNext, onErroronCompleted方法,subscribeOn則指示Observable將全部的處理過程(包括髮射資料和通知)放在特定的排程器上執行。

1. 使用示例

先看看下面的例子,體驗一下在RxJava中 如何使用執行緒的切換:

private void logThread(Object obj, Thread thread){
    Log
.v(TAG, "onNext:"+obj+" -"+Thread.currentThread().getName()); } Observable.OnSubscribe onSub = new Observable.OnSubscribe<Integer>() { @Override public void call(Subscriber<? super Integer> subscriber) { Log.v(TAG, "OnSubscribe -"+Thread.currentThread()); subscriber.
onNext(1); subscriber.onCompleted(); } }; Log.v(TAG, "--------------①-------------"); Observable.create(onSub) .subscribe(integer->logThread(integer, Thread.currentThread())); Log.v(TAG, "--------------②-------------"); Observable.create(onSub) .subscribeOn(Schedulers.
io()) .subscribe(integer->logThread(integer, Thread.currentThread())); Log.v(TAG, "--------------③-------------"); Observable.create(onSub) .subscribeOn(Schedulers.newThread()) .subscribe(integer->logThread(integer, Thread.currentThread())); Log.v(TAG, "--------------④-------------"); Observable.create(onSub) .subscribeOn(Schedulers.newThread()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(integer->logThread(integer, Thread.currentThread())); Log.v(TAG, "--------------⑤-------------"); Observable.create(onSub) .subscribeOn(Schedulers.newThread()) .observeOn(Schedulers.newThread()) .subscribe(integer->logThread(integer, Thread.currentThread())); Log.v(TAG, "--------------⑥-------------"); Observable.interval(100, TimeUnit.MILLISECONDS) .take(1) .subscribe(integer->logThread(integer, Thread.currentThread())); /* 輸出: --------------①------------- OnSubscribe -Thread[main,5,main] onNext:1 -Thread[main,5,main] --------------②------------- OnSubscribe -Thread[RxIoScheduler-2,5,main] onNext:1 -Thread[RxIoScheduler-2,5,main] --------------③------------- OnSubscribe -Thread[RxNewThreadScheduler-1,5,main] onNext:1 -Thread[RxNewThreadScheduler-1,5,main] --------------④------------- OnSubscribe -Thread[RxNewThreadScheduler-2,5,main] onNext:1 -Thread[main,5,main] --------------⑤------------- OnSubscribe -Thread[RxNewThreadScheduler-4,5,main] onNext:1 -Thread[RxNewThreadScheduler-3,5,main] --------------⑥------------- onNext:0 -RxComputationScheduler-3 */

從上面的輸出結果中,我們大概知道了下面幾點:
①. RxJava中已經封裝了多種排程器,不同的排程器可以指定在不同的執行緒中執行和觀察
②. create建立的Observable預設在當前執行緒(主執行緒)中執行任務流,並在當前執行緒觀察
③. interval建立的Observable會在一個叫Computation的執行緒中執行任務流和觀察任務流
④. 除了observeOn和subscribeOn ,使用其他建立或者變換操作符也有可能造成執行緒的切換

2. subscribeOn()原理

  subscribeOn()用來指定Observable在哪個執行緒中執行事件流,也就是指定ObservableOnSubscribe(計劃表)的call方法在那個執行緒發射資料。下面通過原始碼分析subscribeOn是怎樣實現執行緒的切換的。

下面看看subscribeOn方法:

public final Observable<T> subscribeOn(Scheduler scheduler) {
    if (this instanceof ScalarSynchronousObservable) {
        return ((ScalarSynchronousObservable<T>)this).scalarScheduleOn(scheduler);
    }
    return create(new OperatorSubscribeOn<T>(this, scheduler));
}

我們看到他建立了一個新的Observable,併為新的Observable建立了新的計劃表OperatorSubscribeOn物件,新的計劃表儲存了原始Observable物件和排程器scheduler。接著我們看看OperatorSubscribeOn

public final class OperatorSubscribeOn<T> implements Observable.OnSubscribe<T> {

    final Scheduler scheduler;   //排程器
    final Observable<T> source;  //原始Observable
    //①.原始觀察者訂閱了新的Observable後,將執行此call方法
    @Override
    public void call(final Subscriber<? super T> subscriber) {
        final Scheduler.Worker inner = scheduler.createWorker();
        subscriber.add(inner);
        //②. call方法中使用傳入的排程器建立的Worker物件的schedule方法切換執行緒
          inner.schedule(new Action0() {
            @Override
            public void call() {
                final Thread t = Thread.currentThread();
                //③ .建立了一個新的觀察者
                    Subscriber<T> s = new Subscriber<T>(subscriber) {
                    @Override
                    public void onNext(T t) {
                        //⑤. 新的觀察者收到資料後直接傳送給原始觀察者
                        subscriber.onNext(t);
                    }
                    ...
                };
                //④. 在切換的執行緒中,新的觀察者訂閱原始Observable,用來接收資料
                source.unsafeSubscribe(s);
            }
        });
    }
}

  上面原始碼中註釋已經寫的很清楚了,OperatorSubscribeOn其實就是一個普通的任務表,用於為新的Observable發射資料,只是不是真正的發射,它建立了一個新的觀察者訂閱原始Observable,這樣就可以接受原始Observable發射的資料,然後直接傳送給原始觀察者。

  在call方法中通過scheduler.createWorker().schedule()完成執行緒的切換,這裡就牽扯到兩個物件了,SchedulerWorker,不要著急,一個個的看,先看Scheduler,由於RxJava中有多種排程器,我們就看一個簡單的Schedulers.newThread(),其他排程器的思路是一樣的,下面一步一步看原始碼:

public final class Schedulers {
    //各種排程器物件
    private final Scheduler computationScheduler;
    private final Scheduler ioScheduler;
    private final Scheduler newThreadScheduler;
    //單例,Schedulers被載入的時候,上面的各種排程器物件已經初始化
    private static final Schedulers INSTANCE = new Schedulers();
    //構造方法
    private Schedulers() {
        RxJavaSchedulersHook hook = RxJavaPlugins.getInstance().getSchedulersHook();
        ...
        Scheduler nt = hook.getNewThreadScheduler();
        if (nt != null) {
            newThreadScheduler = nt;
        } else {
            //①.建立newThreadScheduler物件
            newThreadScheduler = RxJavaSchedulersHook.createNewThreadScheduler();
        }
    }
    //②. 獲取NewThreadScheduler物件
    public static Scheduler newThread() {
        return INSTANCE.newThreadScheduler;
    }
    ...
}

Schedulers中儲存了多種排程器物件,在Schedulers被載入的時候,他們就被初始化了,Schedulers就像是一個排程器的管理器,接著跟蹤RxJavaSchedulersHook.createNewScheduler(),最終會找到一個叫NewThreadScheduler的類:

public final class NewThreadScheduler extends Scheduler {
    private final ThreadFactory threadFactory;
    public NewThreadScheduler(ThreadFactory threadFactory) {
        this.threadFactory = threadFactory;
    }
    @Override
    public Worker createWorker() {
        return new NewThreadWorker(threadFactory);
    }
}

NewThreadScheduler就是我們呼叫subscribeOn(Schedulers.newThread() )傳入的排程器物件,每個排程器物件都有一個createWorker方法用於建立一個Worker物件,而NewThreadScheduler對應建立的Worker是一個叫NewThreadWorker的物件,在新產生的OperatorSubscribeOn計劃表中就是通過NewThreadWorker.schedule(Action0)實現執行緒的切換,下面我們跟蹤schedule(Action0)方法:

public class NewThreadWorker extends Scheduler.Worker implements Subscription {
    private final ScheduledExecutorService executor;   //
    public NewThreadWorker(ThreadFactory threadFactory) {
        //建立一個執行緒池
        ScheduledExecutorService exec = Executors.newScheduledThreadPool(1, threadFactory);
        executor = exec;
    }
    @Override
    public Subscription schedule(final Action0 action) {
        return schedule(action, 0, null);
    }
    @Override
    public Subscription schedule(final Action0 action, long delayTime, TimeUnit unit) {
        return scheduleActual(action, delayTime, unit);
    }
    //重要:worker.schedule()最終呼叫的是這個方法
    public ScheduledAction scheduleActual(final Action0 action, long delayTime, TimeUnit unit) {
        //return action;
        Action0 decoratedAction = schedulersHook.onSchedule(action);
        //ScheduledAction就是一個Runnable物件,在run()方法中呼叫了Action0.call()
        ScheduledAction run = new ScheduledAction(decoratedAction);
        Future<?> f;
        if (delayTime <= 0) {
            f = executor.submit(run);   //將Runnable物件放入執行緒池中
        } else {
            f = executor.schedule(run, delayTime, unit);  //延遲執行
        }
        run.add(f);

        return run;
    }
    ...
}

我們發現OperatorSubscribeOn計劃表中通過NewThreadWorker.schedule(Action0),將Action0放入到一個執行緒池中執行,這樣就實現了執行緒的切換。

通過上面的分析,我們知道subscribeOn是怎樣將任務表放入執行緒池中執行的,感覺還是有點繞,看下圖:

    這裡寫圖片描述

多次subscribeOn()的情況

我們發現,每次使用subscribeOn都會產生一個新的Observable,併產生一個新的計劃表OnSubscribe,目標Subscriber最後訂閱的將是最後一次subscribeOn產生的新的Observable。在每個新的OnSubscribecall方法中都會有一個產生一個新的執行緒,在這個新執行緒中訂閱上一級Observable,並建立一個新的Subscriber接受資料,最終原始Observable將在第一個新執行緒中發射資料,然後傳送給給下一個新的觀察者,直到傳送到目標觀察者,所以多次呼叫subscribeOn只有第一個起作用(這只是表面現象,其實每個subscribeOn都切換了執行緒,只是最終目標Observable是在第一個subscribeOn產生的執行緒中發射資料的)。看下圖:

    這裡寫圖片描述

  多次subscribeOn()只有第一個會起作用,後面的只是在第一個的基礎上在外面套了一層殼,就像下面的虛擬碼,最後執行是在第一個新執行緒中執行:

...
//第3個subscribeOn產生的新執行緒
new Thread(){
    @Override
    public void run() {
        Subscriber s1 = new Subscriber();
        //第2個subscribeOn產生的新執行緒
        new Thread(){
            @Override
            public void run() {
                Subscriber s2 = new Subscriber();
                //第1個subscribeOn產生的新執行緒
                new Thread(){
                    @Override
                    public void run() {
                        Subscriber<T> s3 = new Subscriber<T>(subscriber) {
                            @Override
                            public void onNext(T t) {
                                subscriber.onNext(t);
                            }
                            ...
                        };
                        //①. 最後一個新觀察者訂閱原始Observable
                        原始Observable.subscribe(s3);
                        //②. 原始Observable將在此執行緒中發射資料

                              //③. 最後一個新的觀察者s3接受資料

                              //④. s3收到資料後,直接傳送給s2,s2收到資料後傳給s1,...最後目標觀察者收到資料
                         } 
                }.start();
            }
        }.start();
    }
}.start();

3. observeOn原理

  observeOn呼叫的是lift操作符,lift操作符在上一篇部落格中講過。lift操作符建立了一個代理的Observable,用於接收原始Observable發射的資料,然後在Operator中對資料做一些處理後傳遞給目標Subscriber

  observeOn一樣建立了一個代理的Observable,並建立一個代理觀察者接受上一級Observable的資料,代理觀察者收到資料之後會開啟一個執行緒,在新的執行緒中,呼叫下一級觀察者的onNextonCompeteonError方法。

我們看看observeOn操作符的原始碼:

public final class OperatorObserveOn<T> implements Observable.Operator<T, T> {
    private final Scheduler scheduler;
    //建立代理觀察者,用於接收上一級Observable發射的資料
    @Override
    public Subscriber<? super T> call(Subscriber<? super T> child) {
        if (scheduler instanceof ImmediateScheduler) {
            return child;
        } else if (scheduler instanceof TrampolineScheduler) {
            return child;
        } else {
            ObserveOnSubscriber<T> parent = new ObserveOnSubscriber<T>(scheduler, child, delayError, bufferSize);
            parent.init();
            return parent;
        }
    }

    //代理觀察者
    private static final class ObserveOnSubscriber<T> extends Subscriber<T> implements Action0 {
        final Subscriber<? super T> child;
        final Scheduler.Worker recursiveScheduler;
        final NotificationLite<T> on;
        final Queue<Object> queue;
        //接受上一級Observable發射的資料
        @Override
        public void onNext(final T t) {
            if (isUnsubscribed() || finished) {
                return;
            }
            if (!queue.offer(on.next(t))) {
                onError(new MissingBackpressureException());
                return;
            }
            schedule();
        }
        @Override
        public void onCompleted() {
            ...
            schedule();
        }
        @Override
        public void onError(final Throwable e) {
            ...
            schedule();
        }
        //開啟新執行緒處理資料
        protected void schedule() {
            if (counter.getAndIncrement() == 0) {
                recursiveScheduler.schedule(this);
            }
        }
        // only execute this from schedule()
        //在新執行緒中將資料傳送給目標觀察者
        @Override
        public void call() {
            long missed = 1L;
            long currentEmission = emitted;
            final Queue<Object> q = this.queue;
            final Subscriber<? super T> localChild = this.child;
            final NotificationLite<T> localOn = this.on;
            for (;;) {
                while (requestAmount != currentEmission) {
                    ...
                    localChild.onNext(localOn.getValue(v));
                }
            }
        }
    }
}

可以發現,observeOn操作符對它後面的操作產生影響,比如下面一段程式碼:

Observable.just(100)
        .subscribeOn(Schedulers.computation())     //Computation執行緒中發射資料
        .map(integer -> {return "map1-"+integer;}) //Computation執行緒中接受資料
        .observeOn(Schedulers.io())          //②. 切換
        .map(integer -> {return "map2-"+integer;}) //io執行緒中接受資料,由②決定
        .observeOn(Schedulers.newThread())   //③. 切換
        .map(integer -> {return "map3-"+integer;}) //newThread執行緒中接受資料,由③決定
        .observeOn(AndroidSchedulers.mainThread()) //④. 切換
        .delay(1000, TimeUnit.MILLISECONDS)        //主執行緒中接受資料,由④決定
        .subscribe(str -> logThread(str, Thread.currentThread()));  //Computation執行緒中接受資料,由④決定

/*
    說明:最後目標觀察者將在Computation執行緒中接受資料,這取決於delay操作符,
    delay操作符是在Computation執行緒中執行的,執行完後就會將資料傳送給目標觀察者。
    而他上面的observeOn將決定於delay產生的代理觀察者在主執行緒中接受資料
 */


/*
 輸出:
 onNext:map3-map2-map1-100 -RxComputationScheduler-3
 */

只要涉及到lift操作符,其實就是生成了一套代理的Subscriber(觀察者)、Observable(被觀察者)和OnSubscribe(計劃表)。Observable最典型的特徵就是鏈式呼叫,我們暫且將每一步操作稱為一級。代理的OnSubscribe中的call方法就是讓代理Subscriber訂閱上一級Observable,直到訂閱到原始Observable發射資料,代理Subscriber收到資料後,可能對資料做一些操作也有可能切換執行緒,然後將資料傳送給下一級Subscriber,直到目標觀察者接收到資料,目標觀察者在那個執行緒接受資料取決於上一個Subscriber在哪一個執行緒呼叫目標觀察者的方法。示意圖如下:

    這裡寫圖片描述

4. 排程器的種類

RxJava中可用的排程器有下面幾種:

排程器型別 效果
Schedulers.computation( ) 用於計算任務,如事件迴圈或和回撥處理,不要用於IO操作(IO操作請使用Schedulers.io());預設執行緒數等於處理器的數量
Schedulers.from(executor) 使用指定的Executor作為排程器
Schedulers.immediate( ) 在當前執行緒立即開始執行任務
Schedulers.io( ) 用於IO密集型任務,如非同步阻塞IO操作,這個排程器的執行緒池會根據需要增長;對於普通的計算任務,請使用Schedulers.computation();Schedulers.io( )預設是一個CachedThreadScheduler,很像一個有執行緒快取的新執行緒排程器
Schedulers.newThread( ) 為每個任務建立一個新執行緒
Schedulers.trampoline( ) 當其它排隊的任務完成後,在當前執行緒排隊開始執行



RxAndroid中新增了一個:

排程器型別 效果
AndroidSchedulers.mainThread( ) 主執行緒,UI執行緒,可以用於更新介面

5. 各種操作符的預設排程器

  在之前學習各種操作符的時候,都會介紹xx操作符預設在xxx排程器上執行,當時可能不太注意這是什麼意思,下面總結了一些操作符預設的排程器:

操作符 排程器
buffer(timespan) computation
buffer(timespan, count) computation
buffer(timespan, timeshift) computation
debounce(timeout, unit) computation
delay(delay, unit) computation
delaySubscription(delay, unit) computation
interval computation
repeat trampoline
replay(time, unit) computation
replay(buffersize, time, unit) computation
replay(selector, time, unit) computation
replay(selector, buffersize, time, unit) computation
retrytrampolinesample(period, unit) computation
skip(time, unit) computation
skipLast(time, unit) computation
take(time, unit) computation
takeLast(time, unit) computation
takeLast(count, time, unit) computation
takeLastBuffer(time, unit) computation
takeLastBuffer(count, time, unit) computation
throttleFirst computation
throttleLast computation
throttleWithTimeout computation
timeInterval immediate
timeout(timeoutSelector) immediate
timeout(firstTimeoutSelector, timeoutSelector) immediate
timeout(timeoutSelector, other) immediate
timeout(timeout, timeUnit) computation
timeout(firstTimeoutSelector, timeoutSelector, other) immediate
timeout(timeout, timeUnit, other) computation
timer computation
timestamp immediate
window(timespan) computation
window(timespan, count) computation
window(timespan, timeshift) computation

原始碼下載: