rxjs Observable of 操作符的單步除錯分析
看這段最簡單的程式碼:
import { Observable, of } from 'rxjs';
const observable = of(1, 2, 3);
observable.subscribe((message) => {
console.log(message);
});
輸出:
輸入的 1,2,3 被當成陣列處理,觸發 fromArray 函式呼叫:
因為不存在 scheduler 呼叫,所以進入 subscribeToArray 分支:
subscribeToArray 的實現在一個 subscribeToArray.ts
檔案裡,這個檔案的名稱空間如下:internal/util
注意,上圖第 8 行的 for 迴圈,顯然在 of 函式呼叫裡不會執行,而是 of 返回的 Observable 被 subscribe 時才執行。subscribeToArray 的邏輯就是,一個簡單的 for 迴圈,迴圈體內依次呼叫 subscriber 的 next 方法,最後呼叫 complete 方法。
subscribeToArray 返回的函式,儲存在 Observable 建構函式的 _subscribe 屬性內。
然後針對這個 of 返回的 Observable 例項,呼叫 subscribe 方法。
of Observable 例項的 _subscribe 方法,指向的就是剛剛 subscribeToArray 返回的函式:
直到 Observable 被 subscribe,這個函式體才得以執行。在函式體的 for 迴圈內,逐一呼叫 subscriber 的 next 方法:
subscriber 並不是應用開發人員建立的,而是 rxjs 框架內部維護和使用的。subscriber 有一個屬性 destination,指向 Safesubscriber,這個 safesubscriber 的 _next 屬性,指向的就是應用開發人員維護的回撥函式。
這段函式先後執行順序如下圖圖例所示:
subscribe 除了傳入一個單一的回撥函式之後,還支援 error 和 complete 處理。
看下面的例子:
import { Observable, of } from 'rxjs'; const observable = of(1, 2, 3); observable.subscribe( (message) => { console.log(message); }, () => {}, () => { console.log('complete'); } );
complete 方法和 next 方法的執行邏輯類似,唯一區別是在 for 迴圈體執行完畢之後觸發:
由此可見,of 建立的 Observable 是 cold Observable.
如果是一個週期性發射資料的 Observable,我們還可以使用 unsubscribe 對其取消訂閱。看下面的程式碼:
import { interval } from 'rxjs';
const observable = interval(1000);
const subscription = observable.subscribe((x) => console.log(x));
setTimeout(() => {
subscription.unsubscribe();
}, 4500);
輸出:
該 Observable 在輸出 4 個整數後停止發射值。