1. 程式人生 > >RxJava threading<二十七>

RxJava threading<二十七>

因為Rx針對非同步系統設計,並且Rx也自然支援多執行緒,所以新的Rx開發人員有時會假設Rx預設是多執行緒的。在其他任何事情之前,重要的是澄清Rx預設是單執行緒的。

除非另有說明,否則每次呼叫onNext / onError / onCompleted都會同步執行整個運算子鏈,包括最終subscriber的操作。我們可以看到比以下示例:

輸出:

我們在這裡看到,我們從3個不同的執行緒中呼叫了onNext。每次訂閱者的操作都在第一個onNext呼叫來自的同一個執行緒上執行。無論我們將多少個連結在一起,情況也是如此。除非我們另有請求,否則該值將同步。

subscribeOn 和 observeOn

subscribeOn和observeOn允許您控制訂閱的呼叫和通知的接收(什麼執行緒將在您的觀察者上呼叫onNext / onError / onCompleted)。

在Rx中,您不直接處理執行緒。而是將它們包裝在名為Scheduler的策略中。我們稍後會看到更多。

subscribeOn

使用subscribeOn,您可以決定執行Observable.create的Scheduler。即使你不是自己呼叫create,也有內在的等價。請考慮以下示例

輸出:

我們在這裡看到,不僅一切都在同一個執行緒上執行,它實際上是順序的:在完成訂閱)observable(包括執行create的lambda引數的主體)之前,subscribe不會解除阻塞。在lambda中對onNext的呼叫會執行整個運算子鏈,一直到println。實際上,訂閱已建立的observable是阻塞的。

無論您要求什麼,一些可觀察者都會建立自己的執行緒。例如,Observable.interval是非同步的。在這種情況下,subscribeOn將指定執行建立資源的函式的執行緒,這通常是沒有用的。它使您無法控制將租用的資源。

輸出:

observeOn

observeOn控制管道的另一側。值的建立和傳送將正常工作,但是您的觀察器的操作將在排程程式策略指定的不同執行緒上呼叫。

輸出:

與subscribeOn不同,observeOn的效果不會跳轉到管道的開頭。它只是改變了它之後的運算子的執行緒。您可以將其視為攔截事件並更改鏈的其餘部分的執行緒。這是一個例子:

輸出;

我們可以在這裡看到事件從呼叫onNext的執行緒開始並保持在該執行緒上,直到它們遇到observeOn運算子。之後,他們繼續新執行緒。這樣,您可以將不同的執行緒策略分配給Rx管道的不同部分。

unsubscribeOn

正如我們所看到的,一些可觀察物件依賴於訂閱租用的資源,並在訂閱結束時釋放。通常,釋放資源很便宜。在特殊情況下,您需要取消訂閱操作以不阻止或特定地發生在特殊執行緒上,您可以指定將使用unsubscribeOn執行這些操作的排程程式。

輸出:

using方法執行3個函式,一個租用資源,一個使用它,另一個釋放它。使用unsubscribeOn,我們隻影響了釋放資源的功能。

下節再續!

原文:https://github.com/Froussios/Intro-To-RxJava/blob/master/Part%204%20-%20Concurrency/1.%20Scheduling%20and%20threading.md