RxJava Schedulers <二十八>
Schedulers
observeOn和subscribeOn方法將Scheduler作為引數。顧名思義,Scheduler是一種用來安排執行各個操作的工具。將如何呼叫操作的細節取決於所使用的Scheduler的實現。您可以建立自己的Scheduler實現,但大多數時候您會發現RxJava已經為您提供了一組針對常見情況的Scheduler。您可以從Scheduler上的工廠方法獲取現有實現。
現有的Scheduler如下:
- immediate 同步執行Scheduler操作。
- trampoline 佇列在當前執行緒完成之後執行。
- newThread 為每個計劃的工作單元建立一個新執行緒。
- computation 用於cpu工作
- io 用於io工作
- test 用於測試和除錯
在當前實現中,computation和io排程器實際上不是單一實現。這種分離的關鍵是要有不同的例項,同時也記錄你的意圖。
許多Rx operators都在內部使用schedules。您到目前為止看到的Observable運算子,所有非同步運算子都有過載排程程式。您可以指定每個operator使用的schedules。
Advanced features of schedulers
用於Rx schedule的方法和實現不是特定於Rx的。實際上,它們比較標準,可以在沒有任何Rx程式碼的情況下使用。除了設計自定義非同步操作符之外,通常不必直接使用schedule。在自定義運算子中使用schedule不僅方便,而且還允許非同步運算子變得可測試。
createWorker()部分有點趣的,它返回一個Scheduler.Worker。worker接受操作並在單個執行緒上按順序執行它們。在某種程度上,worker本身就是一個schedule,但我們不會將其稱為schedule以避免混淆。
Scheduling an action
然後,該操作將排隊等待在分配了該worker的執行緒上執行。
正如您對schedule所期望的那樣,您還可以使用以下方法安排一次或重複執行的操作:
我們可以在這裡看到,執行的延遲是從排程時刻開始計算的。指定的時間不是任務之間的強制睡眠時間。如果有工作準備好執行,worker人可以同時工作。
Canceling work
Scheduler.Worker繼承啦Subscription。在worker上呼叫unsubscribe方法將導致佇列被清空並且所有待處理的任務都被取消。我們可以通過修改前面的例子來看到。
第二個任務永遠不會被執行,因為它之前的任務取消了一切。正在執行的操作將被中斷。在下一個示例中,我們將建立一個睡眠時間為2000毫秒的任務。在它開始執行後500ms我們取消了對工人的所有工作。這會導致InterruptedException
正如看到的那樣,schedule返回Subscription。您可以通過在安排時建立的Subscription取消單個任務。
ImmediateScheduler
ImmediateScheduler根本不做任何排程。scheduler只是同步執行操作,並在操作完成時返回。巢狀的排程請求將導致以遞迴方式執行操作。
輸出:
TrampolineScheduler
TrampolineScheduler的worker也是同步的,但不會巢狀任務。相反,它從初始任務開始,執行時排程的任何任務將在當前任務完成後排隊等候。
輸出:
NewThreadScheduler
NewThreadScheduler建立每個都有自己的執行緒的工作者。每個計劃任務都將在與該特定工作者相對應的執行緒上執行。
輸出:
下屆再續!
原文:https://github.com/Froussios/Intro-To-RxJava/blob/master/Part%204%20-%20Concurrency/1.%20Scheduling%20and%20threading.md