1. 程式人生 > >RxJava Schedulers <二十八>

RxJava Schedulers <二十八>

Schedulers

observeOn和subscribeOn方法將Scheduler作為引數。顧名思義,Scheduler是一種用來安排執行各個操作的工具。將如何呼叫操作的細節取決於所使用的Scheduler的實現。您可以建立自己的Scheduler實現,但大多數時候您會發現RxJava已經為您提供了一組針對常見情況的Scheduler。您可以從Scheduler上的工廠方法獲取現有實現。

現有的Scheduler如下:

  • immediate 同步執行Scheduler操作。
  • trampoline 佇列在當前執行緒完成之後執行。
  • newThread 為每個計劃的工作單元建立一個新執行緒。
  • computation 用於cpu工作
  • io 用於io工作
  • test 用於測試和除錯

在當前實現中,computation和io排程器實際上不是單一實現。這種分離的關鍵是要有不同的例項,同時也記錄你的意圖。

許多Rx operators都在內部使用schedules。您到目前為止看到的Observable運算子,所有非同步運算子都有過載排程程式。您可以指定每個operator使用的schedules。

Advanced features of schedulers

用於Rx schedule的方法和實現不是特定於Rx的。實際上,它們比較標準,可以在沒有任何Rx程式碼的情況下使用。除了設計自定義非同步操作符之外,通常不必直接使用schedule。在自定義運算子中使用schedule不僅方便,而且還允許非同步運算子變得可測試。

createWorker()部分有點趣的,它返回一個Scheduler.Worker。worker接受操作並在單個執行緒上按順序執行它們。在某種程度上,worker本身就是一個schedule,但我們不會將其稱為schedule以避免混淆。

Scheduling an action

然後,該操作將排隊等待在分配了該worker的執行緒上執行。

正如您對schedule所期望的那樣,您還可以使用以下方法安排一次或重複執行的操作:

我們可以在這裡看到,執行的延遲是從排程時刻開始計算的。指定的時間不是任務之間的強制睡眠時間。如果有工作準備好執行,worker人可以同時工作。

Canceling work

Scheduler.Worker繼承啦Subscription。在worker上呼叫unsubscribe方法將導致佇列被清空並且所有待處理的任務都被取消。我們可以通過修改前面的例子來看到。

第二個任務永遠不會被執行,因為它之前的任務取消了一切。正在執行的操作將被中斷。在下一個示例中,我們將建立一個睡眠時間為2000毫秒的任務。在它開始執行後500ms我們取消了對工人的所有工作。這會導致InterruptedException

正如看到的那樣,schedule返回Subscription。您可以通過在安排時建立的Subscription取消單個任務。

ImmediateScheduler

ImmediateScheduler根本不做任何排程。scheduler只是同步執行操作,並在操作完成時返回。巢狀的排程請求將導致以遞迴方式執行操作。

輸出:

TrampolineScheduler

TrampolineScheduler的worker也是同步的,但不會巢狀任務。相反,它從初始任務開始,執行時排程的任何任務將在當前任務完成後排隊等候。

輸出:

NewThreadScheduler

NewThreadScheduler建立每個都有自己的執行緒的工作者。每個計劃任務都將在與該特定工作者相對應的執行緒上執行。

輸出:

下屆再續!

原文:https://github.com/Froussios/Intro-To-RxJava/blob/master/Part%204%20-%20Concurrency/1.%20Scheduling%20and%20threading.md