1. 程式人生 > >Rxjava執行緒操作

Rxjava執行緒操作

(一):Rxjava中的執行緒

Rxjava是一個非同步執行緒庫,非同步是它的一大特色。預設情況下rxjava全部的執行是順序同步的,但是在實際開發中,我們更希望的是Observable發射資料,操作加工資料是在後臺處理的,而響應是在當前執行緒實現的,這裡會涉及到使用多執行緒來操作rxjava,我們可以使用rxjava的排程器(Scheduler)來實現。

(二):Scheduler

Scheduler是rxjava中對執行緒控制器的一個抽象,rxjava內建了多個Scheduler,它基本滿足絕大多數使用場景。

Scheduler 作用
single 使用定長為1的執行緒池,重複利用這個執行緒
newThread 每次都啟用新執行緒,並在新執行緒中執行
computation 使用固定長度的執行緒池,大小為CPU的核數,適用於CPU密集型計算
io 適合I/O操作(讀寫檔案,讀寫資料庫,網路訪問等)
trampoline 直接在當前執行緒執行,如果當前執行緒有其他任務正在執行,會先暫停其他任務
Schedulers.from 將java.util.concurrent.Executor轉換成一個排程器例項,即可以自定義一個Executor來作為排程器

Schedulers

Schedulers是rxjava中的一個靜態工廠類,通過Schedulers可以獲得多種不同的Scheduler
Schedulers.io(), Schedulers.newThead(); Schedulers.computation(), Schedulers.single(),Schedulers.trampoline()
還可以自定義Scheduler,支援使用Executor來作為排程器

//Schedulers自定義scheduler原始碼
@NonNull
public static Scheduler from(#NonNull Executor executor){
    return enw ExecutorScheduler(executor);
}

(三):執行緒排程

預設情況Observable與Observer處於同一個執行緒,可以使用SubscribeOn()和observeOn()來指定執行的執行緒
1:subscribeOn
它接受一個Scheduler引數,用來指定對資料傳送,資料處理執行在特定的執行緒排程器Scheduler上。多次執行subscribeOn,只有一個起作用。
2:observeOn
接受一個Scheduler引數,用來指定下游訂閱者操作執行在特定的執行緒排程器上。

Observable.just("song")
    .subscribeOn(Schedulers.io())  //指定發射資料發生在io特性的執行緒排程器上,也就是發射是在Io新執行緒中
    .observeOn(Schedulers.newThread()) //指定訂閱者是執行在新的執行緒上。不是主執行緒
    .subscribe(s -> System.out.println(s));