RxJava 併發之執行緒排程
由於 Rx 目標是用在非同步系統上並且 Rx 支援多執行緒處理,所以很多 Rx 開發者認為預設情況下 Rx 就是多執行緒的。 其實實際情況不是這樣的,Rx 預設是單執行緒的。
除非你明確的指定執行緒,否則所有 onNext/onError/onCompleted 以及各個操作函式的呼叫都是在同一個執行緒中完成的。例如下面的示例:
final BehaviorSubject<Integer> subject = BehaviorSubject.create();
subject.subscribe(i -> {
System.out.println("Received " + i + " on " + Thread.currentThread().getId());
});
int[] i = {1}; // naughty side-effects for examples only ;)
Runnable r = () -> {
synchronized(i) {
System.out.println("onNext(" + i[0] + ") on " + Thread.currentThread().getId());
subject.onNext(i[0]++);
}
};
r.run(); // Execute on main thread
new Thread(r).start();
new Thread(r).start();
結果:
onNext(1) on 1
Received 1 on 1
onNext(2) on 11
Received 2 on 11
onNext(3) on 12
Received 3 on 12
上面在三個執行緒中分別呼叫 subject 的onNext 函式。和 Runnable 中的執行緒是同一個執行緒。不管用多少個操作函式串聯呼叫,結果都是同一個執行緒。
subscribeOn 和 observeOn
subscribeOn 和 observeOn 分別用來控制 subscription 的呼叫執行緒和 接受事件通知(Observer 的 onNext/onError/onCompleted 函式)的執行緒。
public final Observable<T> observeOn(Scheduler scheduler)
public final Observable<T> subscribeOn(Scheduler scheduler)
在Rx 中你並不直接和 執行緒 打交道,而是通過 Scheduler 來處理多執行緒。
subscribeOn
subscribeOn 用來指定 Observable.create 中的程式碼在那個 Scheduler 中執行。即使你沒有呼叫 create 函式,但是內部也有一個 create 實現。例如:
System.out.println("Main: " + Thread.currentThread().getId());
Observable.create(o -> {
System.out.println("Created on " + Thread.currentThread().getId());
o.onNext(1);
o.onNext(2);
o.onCompleted();
})
//.subscribeOn(Schedulers.newThread())
.subscribe(i -> {
System.out.println("Received " + i + " on " + Thread.currentThread().getId());
});
System.out.println("Finished main: " + Thread.currentThread().getId());
結果:
Main: 1
Created on 1
Received 1 on 1
Received 2 on 1
Finished main: 1
可以看到上面的程式碼是在同一個執行緒中執行,並且是按循序執行的。subscribe 執行完後(包括create 函式裡面的 Lambda 表示式的程式碼)才繼續執行後面的程式碼。
如果你把上面的註釋掉的程式碼 .subscribeOn(Schedulers.newThread()) 啟用,這結果是這樣的:
Main: 1
Finished main: 1
Created on 11
Received 1 on 11
Received 2 on 11
這樣 create 裡面的 Lambda 表示式程式碼將會在 Schedulers.newThread() 返回的執行緒中執行。subscribe 不再是阻塞的了。後面的程式碼可以立即執行,而不用等待 subscribe 返回。
有些 Observable 內部會使用它們自己建立的執行緒。例如 Observable.interval 就是非同步的。這種情況下,無需指定新的執行緒。
System.out.println("Main: " + Thread.currentThread().getId());
Observable.interval(100, TimeUnit.MILLISECONDS)
.subscribe(i -> {
System.out.println("Received " + i + " on " + Thread.currentThread().getId());
});
System.out.println("Finished main: " + Thread.currentThread().getId());
結果:
Main: 1
Finished main: 1
Received 0 on 11
Received 1 on 11
Received 2 on 11
observeOn
observeOn 控制資料流的另外一端。你的 observer 如何收到事件。也就是在那個執行緒中回撥 observer 的 onNext/onError/onCompleted 函式。
Observable.create(o -> {
System.out.println("Created on " + Thread.currentThread().getId());
o.onNext(1);
o.onNext(2);
o.onCompleted();
})
.observeOn(Schedulers.newThread())
.subscribe(i ->
System.out.println("Received " + i + " on " + Thread.currentThread().getId()));
結果:
Created on 1
Received 1 on 13
Received 2 on 13
observeOn 隻影響呼叫該函式以後的操作函式。你可以認為 observeOn 只是攔截了資料流並且對後續的操作有作用。例如:
Observable.create(o -> {
System.out.println("Created on " + Thread.currentThread().getId());
o.onNext(1);
o.onNext(2);
o.onCompleted();
})
.doOnNext(i ->
System.out.println("Before " + i + " on " + Thread.currentThread().getId()))
.observeOn(Schedulers.newThread())
.doOnNext(i ->
System.out.println("After " + i + " on " + Thread.currentThread().getId()))
.subscribe();
結果:
Created on 1
Before 1 on 1
Before 2 on 1
After 1 on 13
After 2 on 13
可以看到在遇到 observeOn 之前,所有的操作發生在一個執行緒,之後在另外一個執行緒。這樣可以在 Rx 資料流中不同地方設定不同的執行緒。
如果你知道資料流處理在那些情況需要很長時間,則可以通過這個操作來避免阻塞生產者執行緒。 比如在 Android 開發過程中的 UI 執行緒,如果在該執行緒中讀取檔案,可能會導致 UI 卡死(ANR)無響應,通過該函式可以指定讀取檔案在另外一個執行緒中執行。
unsubscribeOn
有些 Observable 會依賴一些資源,當該 Observable 完成後釋放這些資源。如果釋放資源比較耗時的話,可以通過 unsubscribeOn 來指定 釋放資原始碼執行的執行緒。
Observable<Object> source = Observable.using(
() -> {
System.out.println("Subscribed on " + Thread.currentThread().getId());
return Arrays.asList(1,2);
},
(ints) -> {
System.out.println("Producing on " + Thread.currentThread().getId());
return Observable.from(ints);
},
(ints) -> {
System.out.println("Unubscribed on " + Thread.currentThread().getId());
}
);
source
.unsubscribeOn(Schedulers.newThread())
.subscribe(System.out::println);
結果:
Subscribed on 1
Producing on 1
1
2
Unubscribed on 11
Schedulers
observeOn 和 subscribeOn 的引數為一個 Scheduler 物件。Scheduler 是用來協調任務執行的。 RxJava 包含了一些常用的 Scheduler,你也可以自定義 Scheduler。 通過呼叫 Schedulers 的工廠函式來獲取標準的預定義的 Scheduler。
RxJava 內建的 Scheduler 有:
- immediate 同步執行
- trampoline 把任務放到當前執行緒的佇列中,等當前任務執行完了,再繼續執行佇列中的任務
- newThread 對於每個任務建立一個新的執行緒去執行
- computation 計算執行緒,用於需要大量 CPU 計算的任務
- io 用於執行 io 操作的任務
- test 用於測試和除錯
當前 computation 和 io 的實現是類似的,他們兩個主要用來確保呼叫的場景,相當於文件說明,來表明你當前的任務是何種型別的。
大部分的 Rx 操作函式內部都使用了schedulers 。並且大部分的 Observable 操作函式也都有一個使用 Scheduler 引數的過載函式。通過過載函式可以指定該操作函式執行的執行緒。
scheduler 的高階特性
Rx scheduler 的使用場景並沒有限定在 Rx 中,也可以在普通 Java 程式碼中使用。
執行一個任務
Scheduler 有個 createWorker 函式,用來建立一個可以執行的任務(Scheduler.Worker)。然後可以排程該任務:
Scheduler.Worker worker = scheduler.createWorker();
worker.schedule(
() -> System.out.println("Action"));
上面的任務被分配到其指定的執行緒中了。
還可以重複執行任務,或者只執行一次,也可以推遲任務執行:
Subscription schedule(
Action0 action,
long delayTime,
java.util.concurrent.TimeUnit unit)
Subscription schedulePeriodically(
Action0 action,
long initialDelay,
long period,
java.util.concurrent.TimeUnit unit)
Scheduler scheduler = Schedulers.newThread();
long start = System.currentTimeMillis();
Scheduler.Worker worker = scheduler.createWorker();
worker.schedule(
() -> System.out.println(System.currentTimeMillis()-start),
5, TimeUnit.SECONDS);
worker.schedule(
() -> System.out.println(System.currentTimeMillis()-start),
5, TimeUnit.SECONDS);
結果:
5033
5035
上面示例中可以看到,推遲執行是從排程開始的時候計算時間的。
取消任務
Scheduler.Worker 繼承至 Subscription。呼叫 unsubscribe 函式可以取消佇列中的任務:
Scheduler scheduler = Schedulers.newThread();
long start = System.currentTimeMillis();
Scheduler.Worker worker = scheduler.createWorker();
worker.schedule(
() -> {
System.out.println(System.currentTimeMillis()-start);
worker.unsubscribe();
},
5, TimeUnit.SECONDS);
worker.schedule(
() -> System.out.println(System.currentTimeMillis()-start),
5, TimeUnit.SECONDS);
結果:
5032
第一個任務中呼叫了 unsubscribe,這樣第二個任務被取消了。下面一個示例演示任務沒有執行完,被取消的情況,會丟擲一個 InterruptedException 異常:
Scheduler scheduler = Schedulers.newThread();
long start = System.currentTimeMillis();
Scheduler.Worker worker = scheduler.createWorker();
worker.schedule(() -> {
try {
Thread.sleep(2000);
System.out.println("Action completed");
} catch (InterruptedException e) {
System.out.println("Action interrupted");
}
});
Thread.sleep(500);
worker.unsubscribe();
結果:
Action interrupted
schedule 返回的是一個 Subscription 物件,可以在該物件上呼叫取消操作,這樣可以只取消這一個任務,而不是取消所有任務。
RxJava 中現有的 scheduler
ImmediateScheduler
ImmediateScheduler 並沒有做任何執行緒排程。只是同步的執行任務。巢狀呼叫會導致任務被遞迴執行:
Scheduler scheduler = Schedulers.immediate();
Scheduler.Worker worker = scheduler.createWorker();
worker.schedule(() -> {
System.out.println("Start");
worker.schedule(() -> System.out.println("Inner"));
System.out.println("End");
});
結果:
Start
Inner
End
TrampolineScheduler
TrampolineScheduler 也是同步執行,但是不巢狀任務。而是把後來的任務新增到任務佇列中,等前面的任務執行完了 再執行後面的。
Scheduler scheduler = Schedulers.trampoline();
Scheduler.Worker worker = scheduler.createWorker();
worker.schedule(() -> {
System.out.println("Start");
worker.schedule(() -> System.out.println("Inner"));
System.out.println("End");
});
結果:
Start
End
Inner
TrampolineScheduler 把任務安排到第一次執行任務的那個執行緒中執行。這樣,第一次呼叫 schedule 的操作是阻塞的,直到佇列執行完。後續的任務,會在這個執行緒中一個一個的執行,並且後續的呼叫不會阻塞。
NewThreadScheduler
NewThreadScheduler 給每個任務建立一個新的執行緒。
定義一個列印執行緒資訊的輔助函式:
public static void printThread(String message) {
System.out.println(message + " on " + Thread.currentThread().getId());
}
示例:
printThread("Main");
Scheduler scheduler = Schedulers.newThread();
Scheduler.Worker worker = scheduler.createWorker();
worker.schedule(() -> {
printThread("Start");
worker.schedule(() -> printThread("Inner"));
printThread("End");
});
Thread.sleep(500);
worker.schedule(() -> printThread("Again"));
結果:
Main on 1
Start on 11
End on 11
Inner on 11
Again on 11