RxJava threading<二十七>
因為Rx針對非同步系統設計,並且Rx也自然支援多執行緒,所以新的Rx開發人員有時會假設Rx預設是多執行緒的。在其他任何事情之前,重要的是澄清Rx預設是單執行緒的。
除非另有說明,否則每次呼叫onNext / onError / onCompleted都會同步執行整個運算子鏈,包括最終subscriber的操作。我們可以看到比以下示例:
輸出:
我們在這裡看到,我們從3個不同的執行緒中呼叫了onNext。每次訂閱者的操作都在第一個onNext呼叫來自的同一個執行緒上執行。無論我們將多少個連結在一起,情況也是如此。除非我們另有請求,否則該值將同步。
subscribeOn 和 observeOn
subscribeOn和observeOn允許您控制訂閱的呼叫和通知的接收(什麼執行緒將在您的觀察者上呼叫onNext / onError / onCompleted)。
在Rx中,您不直接處理執行緒。而是將它們包裝在名為Scheduler的策略中。我們稍後會看到更多。
subscribeOn
使用subscribeOn,您可以決定執行Observable.create的Scheduler。即使你不是自己呼叫create,也有內在的等價。請考慮以下示例
輸出:
我們在這裡看到,不僅一切都在同一個執行緒上執行,它實際上是順序的:在完成訂閱)observable(包括執行create的lambda引數的主體)之前,subscribe不會解除阻塞。在lambda中對onNext的呼叫會執行整個運算子鏈,一直到println。實際上,訂閱已建立的observable是阻塞的。
無論您要求什麼,一些可觀察者都會建立自己的執行緒。例如,Observable.interval是非同步的。在這種情況下,subscribeOn將指定執行建立資源的函式的執行緒,這通常是沒有用的。它使您無法控制將租用的資源。
輸出:
observeOn
observeOn控制管道的另一側。值的建立和傳送將正常工作,但是您的觀察器的操作將在排程程式策略指定的不同執行緒上呼叫。
輸出:
與subscribeOn不同,observeOn的效果不會跳轉到管道的開頭。它只是改變了它之後的運算子的執行緒。您可以將其視為攔截事件並更改鏈的其餘部分的執行緒。這是一個例子:
輸出;
我們可以在這裡看到事件從呼叫onNext的執行緒開始並保持在該執行緒上,直到它們遇到observeOn運算子。之後,他們繼續新執行緒。這樣,您可以將不同的執行緒策略分配給Rx管道的不同部分。
unsubscribeOn
正如我們所看到的,一些可觀察物件依賴於訂閱租用的資源,並在訂閱結束時釋放。通常,釋放資源很便宜。在特殊情況下,您需要取消訂閱操作以不阻止或特定地發生在特殊執行緒上,您可以指定將使用unsubscribeOn執行這些操作的排程程式。
輸出:
using方法執行3個函式,一個租用資源,一個使用它,另一個釋放它。使用unsubscribeOn,我們隻影響了釋放資源的功能。
下節再續!
原文:https://github.com/Froussios/Intro-To-RxJava/blob/master/Part%204%20-%20Concurrency/1.%20Scheduling%20and%20threading.md