Rxjava執行緒操作
阿新 • • 發佈:2019-01-06
(一):Rxjava中的執行緒
Rxjava是一個非同步執行緒庫,非同步是它的一大特色。預設情況下rxjava全部的執行是順序同步的,但是在實際開發中,我們更希望的是Observable發射資料,操作加工資料是在後臺處理的,而響應是在當前執行緒實現的,這裡會涉及到使用多執行緒來操作rxjava,我們可以使用rxjava的排程器(Scheduler)來實現。
(二):Scheduler
Scheduler是rxjava中對執行緒控制器的一個抽象,rxjava內建了多個Scheduler,它基本滿足絕大多數使用場景。
Scheduler | 作用 |
---|---|
single | 使用定長為1的執行緒池,重複利用這個執行緒 |
newThread | 每次都啟用新執行緒,並在新執行緒中執行 |
computation | 使用固定長度的執行緒池,大小為CPU的核數,適用於CPU密集型計算 |
io | 適合I/O操作(讀寫檔案,讀寫資料庫,網路訪問等) |
trampoline | 直接在當前執行緒執行,如果當前執行緒有其他任務正在執行,會先暫停其他任務 |
Schedulers.from | 將java.util.concurrent.Executor轉換成一個排程器例項,即可以自定義一個Executor來作為排程器 |
Schedulers
Schedulers是rxjava中的一個靜態工廠類,通過Schedulers可以獲得多種不同的Scheduler
Schedulers.io(), Schedulers.newThead(); Schedulers.computation(), Schedulers.single(),Schedulers.trampoline()
還可以自定義Scheduler,支援使用Executor來作為排程器
//Schedulers自定義scheduler原始碼
@NonNull
public static Scheduler from(#NonNull Executor executor){
return enw ExecutorScheduler(executor);
}
(三):執行緒排程
預設情況Observable與Observer處於同一個執行緒,可以使用SubscribeOn()和observeOn()來指定執行的執行緒
1:subscribeOn
它接受一個Scheduler引數,用來指定對資料傳送,資料處理執行在特定的執行緒排程器Scheduler上。多次執行subscribeOn,只有一個起作用。
2:observeOn
接受一個Scheduler引數,用來指定下游訂閱者操作執行在特定的執行緒排程器上。
Observable.just("song")
.subscribeOn(Schedulers.io()) //指定發射資料發生在io特性的執行緒排程器上,也就是發射是在Io新執行緒中
.observeOn(Schedulers.newThread()) //指定訂閱者是執行在新的執行緒上。不是主執行緒
.subscribe(s -> System.out.println(s));