RxJava(11-執行緒排程Scheduler)
目錄:
RxJava中 使用observeOn
和subscribeOn
操作符,你可以讓Observable
在一個特定的排程器上執行,observeOn
指示一個Observable
在一個特定的排程器上呼叫觀察者的onNext
, onError
和onCompleted
方法,subscribeOn
則指示Observable
將全部的處理過程(包括髮射資料和通知)放在特定的排程器上執行。
1. 使用示例
先看看下面的例子,體驗一下在RxJava中 如何使用執行緒的切換:
private void logThread(Object obj, Thread thread){
Log .v(TAG, "onNext:"+obj+" -"+Thread.currentThread().getName());
}
Observable.OnSubscribe onSub = new Observable.OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> subscriber) {
Log.v(TAG, "OnSubscribe -"+Thread.currentThread());
subscriber. onNext(1);
subscriber.onCompleted();
}
};
Log.v(TAG, "--------------①-------------");
Observable.create(onSub)
.subscribe(integer->logThread(integer, Thread.currentThread()));
Log.v(TAG, "--------------②-------------");
Observable.create(onSub)
.subscribeOn(Schedulers. io())
.subscribe(integer->logThread(integer, Thread.currentThread()));
Log.v(TAG, "--------------③-------------");
Observable.create(onSub)
.subscribeOn(Schedulers.newThread())
.subscribe(integer->logThread(integer, Thread.currentThread()));
Log.v(TAG, "--------------④-------------");
Observable.create(onSub)
.subscribeOn(Schedulers.newThread())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(integer->logThread(integer, Thread.currentThread()));
Log.v(TAG, "--------------⑤-------------");
Observable.create(onSub)
.subscribeOn(Schedulers.newThread())
.observeOn(Schedulers.newThread())
.subscribe(integer->logThread(integer, Thread.currentThread()));
Log.v(TAG, "--------------⑥-------------");
Observable.interval(100, TimeUnit.MILLISECONDS)
.take(1)
.subscribe(integer->logThread(integer, Thread.currentThread()));
/*
輸出:
--------------①-------------
OnSubscribe -Thread[main,5,main]
onNext:1 -Thread[main,5,main]
--------------②-------------
OnSubscribe -Thread[RxIoScheduler-2,5,main]
onNext:1 -Thread[RxIoScheduler-2,5,main]
--------------③-------------
OnSubscribe -Thread[RxNewThreadScheduler-1,5,main]
onNext:1 -Thread[RxNewThreadScheduler-1,5,main]
--------------④-------------
OnSubscribe -Thread[RxNewThreadScheduler-2,5,main]
onNext:1 -Thread[main,5,main]
--------------⑤-------------
OnSubscribe -Thread[RxNewThreadScheduler-4,5,main]
onNext:1 -Thread[RxNewThreadScheduler-3,5,main]
--------------⑥-------------
onNext:0 -RxComputationScheduler-3
*/
從上面的輸出結果中,我們大概知道了下面幾點:
①. RxJava中已經封裝了多種排程器,不同的排程器可以指定在不同的執行緒中執行和觀察
②. create建立的Observable預設在當前執行緒(主執行緒)中執行任務流,並在當前執行緒觀察
③. interval建立的Observable會在一個叫Computation的執行緒中執行任務流和觀察任務流
④. 除了observeOn和subscribeOn ,使用其他建立或者變換操作符也有可能造成執行緒的切換
2. subscribeOn()原理
subscribeOn()
用來指定Observable
在哪個執行緒中執行事件流,也就是指定Observable
中OnSubscribe
(計劃表)的call
方法在那個執行緒發射資料。下面通過原始碼分析subscribeOn
是怎樣實現執行緒的切換的。
下面看看subscribeOn
方法:
public final Observable<T> subscribeOn(Scheduler scheduler) {
if (this instanceof ScalarSynchronousObservable) {
return ((ScalarSynchronousObservable<T>)this).scalarScheduleOn(scheduler);
}
return create(new OperatorSubscribeOn<T>(this, scheduler));
}
我們看到他建立了一個新的Observable
,併為新的Observable
建立了新的計劃表OperatorSubscribeOn
物件,新的計劃表儲存了原始Observable
物件和排程器scheduler
。接著我們看看OperatorSubscribeOn
:
public final class OperatorSubscribeOn<T> implements Observable.OnSubscribe<T> {
final Scheduler scheduler; //排程器
final Observable<T> source; //原始Observable
//①.原始觀察者訂閱了新的Observable後,將執行此call方法
@Override
public void call(final Subscriber<? super T> subscriber) {
final Scheduler.Worker inner = scheduler.createWorker();
subscriber.add(inner);
//②. call方法中使用傳入的排程器建立的Worker物件的schedule方法切換執行緒
inner.schedule(new Action0() {
@Override
public void call() {
final Thread t = Thread.currentThread();
//③ .建立了一個新的觀察者
Subscriber<T> s = new Subscriber<T>(subscriber) {
@Override
public void onNext(T t) {
//⑤. 新的觀察者收到資料後直接傳送給原始觀察者
subscriber.onNext(t);
}
...
};
//④. 在切換的執行緒中,新的觀察者訂閱原始Observable,用來接收資料
source.unsafeSubscribe(s);
}
});
}
}
上面原始碼中註釋已經寫的很清楚了,OperatorSubscribeOn
其實就是一個普通的任務表,用於為新的Observable
發射資料,只是不是真正的發射,它建立了一個新的觀察者訂閱原始Observable
,這樣就可以接受原始Observable
發射的資料,然後直接傳送給原始觀察者。
在call
方法中通過scheduler.createWorker().schedule()
完成執行緒的切換,這裡就牽扯到兩個物件了,Scheduler
和Worker
,不要著急,一個個的看,先看Scheduler
,由於RxJava中有多種排程器,我們就看一個簡單的Schedulers.newThread()
,其他排程器的思路是一樣的,下面一步一步看原始碼:
public final class Schedulers {
//各種排程器物件
private final Scheduler computationScheduler;
private final Scheduler ioScheduler;
private final Scheduler newThreadScheduler;
//單例,Schedulers被載入的時候,上面的各種排程器物件已經初始化
private static final Schedulers INSTANCE = new Schedulers();
//構造方法
private Schedulers() {
RxJavaSchedulersHook hook = RxJavaPlugins.getInstance().getSchedulersHook();
...
Scheduler nt = hook.getNewThreadScheduler();
if (nt != null) {
newThreadScheduler = nt;
} else {
//①.建立newThreadScheduler物件
newThreadScheduler = RxJavaSchedulersHook.createNewThreadScheduler();
}
}
//②. 獲取NewThreadScheduler物件
public static Scheduler newThread() {
return INSTANCE.newThreadScheduler;
}
...
}
Schedulers
中儲存了多種排程器物件,在Schedulers
被載入的時候,他們就被初始化了,Schedulers
就像是一個排程器的管理器,接著跟蹤RxJavaSchedulersHook.createNewScheduler()
,最終會找到一個叫NewThreadScheduler
的類:
public final class NewThreadScheduler extends Scheduler {
private final ThreadFactory threadFactory;
public NewThreadScheduler(ThreadFactory threadFactory) {
this.threadFactory = threadFactory;
}
@Override
public Worker createWorker() {
return new NewThreadWorker(threadFactory);
}
}
NewThreadScheduler
就是我們呼叫subscribeOn(Schedulers.newThread() )
傳入的排程器物件,每個排程器物件都有一個createWorker
方法用於建立一個Worker
物件,而NewThreadScheduler
對應建立的Worker
是一個叫NewThreadWorker
的物件,在新產生的OperatorSubscribeOn
計劃表中就是通過NewThreadWorker.schedule(Action0)
實現執行緒的切換,下面我們跟蹤schedule(Action0)
方法:
public class NewThreadWorker extends Scheduler.Worker implements Subscription {
private final ScheduledExecutorService executor; //
public NewThreadWorker(ThreadFactory threadFactory) {
//建立一個執行緒池
ScheduledExecutorService exec = Executors.newScheduledThreadPool(1, threadFactory);
executor = exec;
}
@Override
public Subscription schedule(final Action0 action) {
return schedule(action, 0, null);
}
@Override
public Subscription schedule(final Action0 action, long delayTime, TimeUnit unit) {
return scheduleActual(action, delayTime, unit);
}
//重要:worker.schedule()最終呼叫的是這個方法
public ScheduledAction scheduleActual(final Action0 action, long delayTime, TimeUnit unit) {
//return action;
Action0 decoratedAction = schedulersHook.onSchedule(action);
//ScheduledAction就是一個Runnable物件,在run()方法中呼叫了Action0.call()
ScheduledAction run = new ScheduledAction(decoratedAction);
Future<?> f;
if (delayTime <= 0) {
f = executor.submit(run); //將Runnable物件放入執行緒池中
} else {
f = executor.schedule(run, delayTime, unit); //延遲執行
}
run.add(f);
return run;
}
...
}
我們發現OperatorSubscribeOn
計劃表中通過NewThreadWorker.schedule(Action0)
,將Action0
放入到一個執行緒池中執行,這樣就實現了執行緒的切換。
通過上面的分析,我們知道subscribeOn
是怎樣將任務表放入執行緒池中執行的,感覺還是有點繞,看下圖:
多次subscribeOn()的情況
我們發現,每次使用subscribeOn
都會產生一個新的Observable
,併產生一個新的計劃表OnSubscribe
,目標Subscriber最後訂閱的將是最後一次subscribeOn
產生的新的Observable
。在每個新的OnSubscribe
的call
方法中都會有一個產生一個新的執行緒,在這個新執行緒中訂閱上一級Observable
,並建立一個新的Subscriber
接受資料,最終原始Observable
將在第一個新執行緒中發射資料,然後傳送給給下一個新的觀察者,直到傳送到目標觀察者,所以多次呼叫subscribeOn
只有第一個起作用(這只是表面現象,其實每個subscribeOn
都切換了執行緒,只是最終目標Observable
是在第一個subscribeOn
產生的執行緒中發射資料的)。看下圖:
多次subscribeOn()
只有第一個會起作用,後面的只是在第一個的基礎上在外面套了一層殼,就像下面的虛擬碼,最後執行是在第一個新執行緒中執行:
...
//第3個subscribeOn產生的新執行緒
new Thread(){
@Override
public void run() {
Subscriber s1 = new Subscriber();
//第2個subscribeOn產生的新執行緒
new Thread(){
@Override
public void run() {
Subscriber s2 = new Subscriber();
//第1個subscribeOn產生的新執行緒
new Thread(){
@Override
public void run() {
Subscriber<T> s3 = new Subscriber<T>(subscriber) {
@Override
public void onNext(T t) {
subscriber.onNext(t);
}
...
};
//①. 最後一個新觀察者訂閱原始Observable
原始Observable.subscribe(s3);
//②. 原始Observable將在此執行緒中發射資料
//③. 最後一個新的觀察者s3接受資料
//④. s3收到資料後,直接傳送給s2,s2收到資料後傳給s1,...最後目標觀察者收到資料
}
}.start();
}
}.start();
}
}.start();
3. observeOn原理
observeOn
呼叫的是lift
操作符,lift
操作符在上一篇部落格中講過。lift
操作符建立了一個代理的Observable
,用於接收原始Observable
發射的資料,然後在Operator
中對資料做一些處理後傳遞給目標Subscriber
。
observeOn
一樣建立了一個代理的Observable
,並建立一個代理觀察者接受上一級Observable
的資料,代理觀察者收到資料之後會開啟一個執行緒,在新的執行緒中,呼叫下一級觀察者的onNext
、onCompete
、onError
方法。
我們看看observeOn
操作符的原始碼:
public final class OperatorObserveOn<T> implements Observable.Operator<T, T> {
private final Scheduler scheduler;
//建立代理觀察者,用於接收上一級Observable發射的資料
@Override
public Subscriber<? super T> call(Subscriber<? super T> child) {
if (scheduler instanceof ImmediateScheduler) {
return child;
} else if (scheduler instanceof TrampolineScheduler) {
return child;
} else {
ObserveOnSubscriber<T> parent = new ObserveOnSubscriber<T>(scheduler, child, delayError, bufferSize);
parent.init();
return parent;
}
}
//代理觀察者
private static final class ObserveOnSubscriber<T> extends Subscriber<T> implements Action0 {
final Subscriber<? super T> child;
final Scheduler.Worker recursiveScheduler;
final NotificationLite<T> on;
final Queue<Object> queue;
//接受上一級Observable發射的資料
@Override
public void onNext(final T t) {
if (isUnsubscribed() || finished) {
return;
}
if (!queue.offer(on.next(t))) {
onError(new MissingBackpressureException());
return;
}
schedule();
}
@Override
public void onCompleted() {
...
schedule();
}
@Override
public void onError(final Throwable e) {
...
schedule();
}
//開啟新執行緒處理資料
protected void schedule() {
if (counter.getAndIncrement() == 0) {
recursiveScheduler.schedule(this);
}
}
// only execute this from schedule()
//在新執行緒中將資料傳送給目標觀察者
@Override
public void call() {
long missed = 1L;
long currentEmission = emitted;
final Queue<Object> q = this.queue;
final Subscriber<? super T> localChild = this.child;
final NotificationLite<T> localOn = this.on;
for (;;) {
while (requestAmount != currentEmission) {
...
localChild.onNext(localOn.getValue(v));
}
}
}
}
}
可以發現,observeOn操作符對它後面的操作產生影響,比如下面一段程式碼:
Observable.just(100)
.subscribeOn(Schedulers.computation()) //Computation執行緒中發射資料
.map(integer -> {return "map1-"+integer;}) //Computation執行緒中接受資料
.observeOn(Schedulers.io()) //②. 切換
.map(integer -> {return "map2-"+integer;}) //io執行緒中接受資料,由②決定
.observeOn(Schedulers.newThread()) //③. 切換
.map(integer -> {return "map3-"+integer;}) //newThread執行緒中接受資料,由③決定
.observeOn(AndroidSchedulers.mainThread()) //④. 切換
.delay(1000, TimeUnit.MILLISECONDS) //主執行緒中接受資料,由④決定
.subscribe(str -> logThread(str, Thread.currentThread())); //Computation執行緒中接受資料,由④決定
/*
說明:最後目標觀察者將在Computation執行緒中接受資料,這取決於delay操作符,
delay操作符是在Computation執行緒中執行的,執行完後就會將資料傳送給目標觀察者。
而他上面的observeOn將決定於delay產生的代理觀察者在主執行緒中接受資料
*/
/*
輸出:
onNext:map3-map2-map1-100 -RxComputationScheduler-3
*/
只要涉及到lift
操作符,其實就是生成了一套代理的Subscriber
(觀察者)、Observable
(被觀察者)和OnSubscribe
(計劃表)。Observable
最典型的特徵就是鏈式呼叫,我們暫且將每一步操作稱為一級。代理的OnSubscribe
中的call
方法就是讓代理Subscriber
訂閱上一級Observable
,直到訂閱到原始Observable
發射資料,代理Subscriber
收到資料後,可能對資料做一些操作也有可能切換執行緒,然後將資料傳送給下一級Subscriber
,直到目標觀察者接收到資料,目標觀察者在那個執行緒接受資料取決於上一個Subscriber
在哪一個執行緒呼叫目標觀察者的方法。示意圖如下:
4. 排程器的種類
RxJava中可用的排程器有下面幾種:
排程器型別 | 效果 |
---|---|
Schedulers.computation( ) | 用於計算任務,如事件迴圈或和回撥處理,不要用於IO操作(IO操作請使用Schedulers.io());預設執行緒數等於處理器的數量 |
Schedulers.from(executor) | 使用指定的Executor作為排程器 |
Schedulers.immediate( ) | 在當前執行緒立即開始執行任務 |
Schedulers.io( ) | 用於IO密集型任務,如非同步阻塞IO操作,這個排程器的執行緒池會根據需要增長;對於普通的計算任務,請使用Schedulers.computation();Schedulers.io( )預設是一個CachedThreadScheduler,很像一個有執行緒快取的新執行緒排程器 |
Schedulers.newThread( ) | 為每個任務建立一個新執行緒 |
Schedulers.trampoline( ) | 當其它排隊的任務完成後,在當前執行緒排隊開始執行 |
在RxAndroid中新增了一個:
排程器型別 | 效果 |
---|---|
AndroidSchedulers.mainThread( ) | 主執行緒,UI執行緒,可以用於更新介面 |
5. 各種操作符的預設排程器
在之前學習各種操作符的時候,都會介紹xx操作符預設在xxx排程器上執行,當時可能不太注意這是什麼意思,下面總結了一些操作符預設的排程器:
操作符 | 排程器 |
---|---|
buffer(timespan) | computation |
buffer(timespan, count) | computation |
buffer(timespan, timeshift) | computation |
debounce(timeout, unit) | computation |
delay(delay, unit) | computation |
delaySubscription(delay, unit) | computation |
interval | computation |
repeat | trampoline |
replay(time, unit) | computation |
replay(buffersize, time, unit) | computation |
replay(selector, time, unit) | computation |
replay(selector, buffersize, time, unit) | computation |
retrytrampolinesample(period, unit) | computation |
skip(time, unit) | computation |
skipLast(time, unit) | computation |
take(time, unit) | computation |
takeLast(time, unit) | computation |
takeLast(count, time, unit) | computation |
takeLastBuffer(time, unit) | computation |
takeLastBuffer(count, time, unit) | computation |
throttleFirst | computation |
throttleLast | computation |
throttleWithTimeout | computation |
timeInterval | immediate |
timeout(timeoutSelector) | immediate |
timeout(firstTimeoutSelector, timeoutSelector) | immediate |
timeout(timeoutSelector, other) | immediate |
timeout(timeout, timeUnit) | computation |
timeout(firstTimeoutSelector, timeoutSelector, other) | immediate |
timeout(timeout, timeUnit, other) | computation |
timer | computation |
timestamp | immediate |
window(timespan) | computation |
window(timespan, count) | computation |
window(timespan, timeshift) | computation |