1. 程式人生 > 其它 >rxjs Observable of 操作符的單步除錯分析

rxjs Observable of 操作符的單步除錯分析

看這段最簡單的程式碼:

import { Observable, of } from 'rxjs';

const observable = of(1, 2, 3);

observable.subscribe((message) => {
  console.log(message);
}); 

輸出:

輸入的 1,2,3 被當成陣列處理,觸發 fromArray 函式呼叫:

因為不存在 scheduler 呼叫,所以進入 subscribeToArray 分支:

subscribeToArray 的實現在一個 subscribeToArray.ts 檔案裡,這個檔案的名稱空間如下:internal/util

注意,上圖第 8 行的 for 迴圈,顯然在 of 函式呼叫裡不會執行,而是 of 返回的 Observable 被 subscribe 時才執行。subscribeToArray 的邏輯就是,一個簡單的 for 迴圈,迴圈體內依次呼叫 subscriber 的 next 方法,最後呼叫 complete 方法。

subscribeToArray 返回的函式,儲存在 Observable 建構函式的 _subscribe 屬性內。

然後針對這個 of 返回的 Observable 例項,呼叫 subscribe 方法。

of Observable 例項的 _subscribe 方法,指向的就是剛剛 subscribeToArray 返回的函式:

直到 Observable 被 subscribe,這個函式體才得以執行。在函式體的 for 迴圈內,逐一呼叫 subscriber 的 next 方法:

subscriber 並不是應用開發人員建立的,而是 rxjs 框架內部維護和使用的。subscriber 有一個屬性 destination,指向 Safesubscriber,這個 safesubscriber 的 _next 屬性,指向的就是應用開發人員維護的回撥函式。

這段函式先後執行順序如下圖圖例所示:

subscribe 除了傳入一個單一的回撥函式之後,還支援 error 和 complete 處理。

看下面的例子:

import { Observable, of } from 'rxjs';

const observable = of(1, 2, 3);

observable.subscribe(
  (message) => {
    console.log(message);
  },
  () => {},
  () => {
    console.log('complete');
  }
);

complete 方法和 next 方法的執行邏輯類似,唯一區別是在 for 迴圈體執行完畢之後觸發:

由此可見,of 建立的 Observable 是 cold Observable.
如果是一個週期性發射資料的 Observable,我們還可以使用 unsubscribe 對其取消訂閱。看下面的程式碼:

import { interval } from 'rxjs';

const observable = interval(1000);
const subscription = observable.subscribe((x) => console.log(x));
setTimeout(() => {
  subscription.unsubscribe();
}, 4500);

輸出:

該 Observable 在輸出 4 個整數後停止發射值。