1. 程式人生 > 其它 >RxJS switchMap, mergeMap, concatMap,exhaustMap 的比較

RxJS switchMap, mergeMap, concatMap,exhaustMap 的比較

原文:Comprehensive Guide to Higher-Order RxJs Mapping Operators: switchMap, mergeMap, concatMap (and exhaustMap)

我們日常發現的一些最常用的 RxJs 操作符是 RxJs 高階對映操作符:switchMap、mergeMap、concatMap 和exhaustMap。

例如,我們程式中的大部分網路呼叫都將使用這些運算子之一完成,因此熟悉它們對於編寫幾乎所有反應式程式至關重要。

知道在給定情況下使用哪個運算子(以及為什麼)可能有點令人困惑,我們經常想知道這些運算子是如何真正工作的,以及為什麼它們會這樣命名。

這些運算子可能看起來不相關,但我們真的很想一口氣學習它們,因為選擇錯誤的運算子可能會意外地導致我們程式中的微妙問題。

Why are the mapping operators a bit confusing?

這樣做是有原因的:為了理解這些操作符,我們首先需要了解每個內部使用的 Observable 組合策略。

與其試圖自己理解switchMap,不如先了解什麼是Observable切換; 我們需要先學習 Observable 連線等,而不是直接深入 concatMap。

這就是我們在這篇文章中要做的事情,我們將按邏輯順序學習 concat、merge、switch 和exhaust 策略及其對應的對映運算子:concatMap、mergeMap、switchMap 和exhaustMap。

我們將結合使用 marble 圖和一些實際示例(包括執行程式碼)來解釋這些概念。

最後,您將確切地知道這些對映運算子中的每一個是如何工作的,何時使用,為什麼使用,以及它們名稱的原因。

The RxJs Map Operator

讓我們從頭開始,介紹這些對映運算子的一般作用。

正如運算子的名稱所暗示的那樣,他們正在做某種對映:但究竟是什麼被映射了? 我們先來看看 RxJs Map 操作符的彈珠圖:

How the base Map Operator works

使用 map 運算子,我們可以獲取輸入流(值為 1、2、3),並從中建立派生的對映輸出流(值為 10、20、30)。

底部輸出流的值是通過獲取輸入流的值並將它們應用到一個函式來獲得的:這個函式只是將這些值乘以 10。

所以 map 操作符就是對映輸入 observable 的值。 以下是我們如何使用它來處理 HTTP 請求的示例:

const http$ : Observable<Course[]> = this.http.get('/api/courses');

http$
    .pipe(
        tap(() => console.log('HTTP request executed')),
        map(res => Object.values(res['payload']))
    )
    .subscribe(
        courses => console.log("courses", courses)
    );

在這個例子中,我們正在建立一個 HTTP observable 來進行後端呼叫,我們正在訂閱它。 observable 將發出後端 HTTP 響應的值,它是一個 JSON 物件。

在這種情況下,HTTP 響應將資料包裝在有效負載屬性中,因此為了獲取資料,我們應用了 RxJs 對映運算子。 然後對映函式將對映 JSON 響應負載並提取負載屬性的值。

既然我們已經回顧了基本對映的工作原理,現在讓我們來談談高階對映。

What is Higher-Order Observable Mapping?

在高階對映中,我們不是將像 1 這樣的普通值對映到另一個像 10 這樣的值,而是將一個值對映到一個 Observable 中!

結果是一個高階的 Observable。 它只是一個 Observable,但它的值本身也是 Observable,我們可以單獨訂閱。

這聽起來可能有些牽強,但實際上,這種型別的對映一直在發生。 讓我們舉一個這種型別對映的實際例子。 假設例如,我們有一個 Angular Reactive Form,它通過 Observable 隨時間發出有效的表單值:

@Component({
    selector: 'course-dialog',
    templateUrl: './course-dialog.component.html'
})
export class CourseDialogComponent implements AfterViewInit {

    form: FormGroup;
    course:Course;

    @ViewChild('saveButton') saveButton: ElementRef;

    constructor(
        private fb: FormBuilder,
        private dialogRef: MatDialogRef<CourseDialogComponent>,
        @Inject(MAT_DIALOG_DATA) course:Course) {

        this.course = course;

        this.form = fb.group({
            description: [course.description, 
                          Validators.required],
            category: [course.category, Validators.required],
            releasedAt: [moment(), Validators.required],
            longDescription: [course.longDescription,
                              Validators.required]
        });
    }
}

Reactive Form 提供了一個 Observable this.form.valueChanges,它在使用者與表單互動時發出最新的表單值。 這將是我們的源 Observable。

我們想要做的是在這些值隨著時間的推移發出時至少儲存其中一些值,以實現表單草稿預儲存功能。 這樣,隨著使用者填寫表單,資料會逐漸儲存,從而避免由於意外重新載入而丟失整個表單資料。

Why Higher-Order Observables?

為了實現表單草稿儲存功能,我們需要獲取表單值,然後建立第二個執行後端儲存的 HTTP observable,然後訂閱它。

我們可以嘗試手動完成所有這些,但是我們會陷入巢狀的訂閱反模式:

this.form.valueChanges
    .subscribe(
       formValue => {
      
           const httpPost$ = 
                 this.http.put(`/api/course/${courseId}`, formValue);

           httpPost$.subscribe(
               res => ... handle successful save ...
               err => ... handle save error ...
           );

       }        
    );

正如我們所見,這會導致我們的程式碼很快在多個級別巢狀,這是我們在使用 RxJs 時首先要避免的問題之一。

讓我們稱這個新的 httpPost$ Observable 為內部 Observable,因為它是在內部巢狀程式碼塊中建立的。

Avoiding nested subscriptions

我們希望以更方便的方式完成所有這些過程:我們希望獲取表單值,並將其對映到儲存 Observable 中。 這將有效地建立一個高階 Observable,其中每個值對應一個儲存請求。

然後我們希望透明地訂閱這些網路 Observable 中的每一個,並且一次性直接接收網路響應,以避免任何巢狀。

如果我們有某種更高階的 RxJs 對映運算子,我們就可以做到這一切! 那為什麼我們需要四個不同的操作符呢?

為了理解這一點,想象一下如果 valueChanges observable 快速連續發出多個表單值並且儲存操作需要一些時間來完成,會發生什麼情況:

  • 我們應該等待一個儲存請求完成後再進行另一次儲存嗎?
  • 我們應該並行進行多次儲存嗎?
  • 我們應該取消正在進行的儲存並開始新的儲存嗎?
  • 當一個已經在進行中時,我們應該忽略新的儲存嘗試嗎?

在探索這些用例中的每一個之前,讓我們回到上面的巢狀訂閱程式碼。

在巢狀訂閱示例中,我們實際上是並行觸發儲存操作,這不是我們想要的,因為沒有強有力的保證後端將按順序處理儲存,並且最後一個有效的表單值確實是儲存在 後端。

讓我們看看如何確保僅在上一次儲存完成後才完成儲存請求。

Understanding Observable Concatenation

為了實現順序儲存,我們將引入 Observable 連線的新概念。 在此程式碼示例中,我們使用 concat() RxJs 函式連線兩個示例 observable:

const series1$ = of('a', 'b');

const series2$ = of('x', 'y');

const result$ = concat(series1$, series2$);

result$.subscribe(console.log);

在使用 of 建立函式建立了兩個 Observables series1$ 和 series2$ 之後,我們建立了第三個 result$ Observable,它是串聯 series1$ 和 series2$ 的結果。

這是該程式的控制檯輸出,顯示了結果 Observable 發出的值:

a
b
x
y

如我們所見,這些值是將 series1$ 的值與 series2$ 的值連線在一起的結果。 但這裡有一個問題:這個例子能工作的原因是因為這些 Observable 正在完成!!

of() 函式將建立 Observables,它發出傳遞給 of() 的值,然後在發出所有值後完成 Observables。

Observable Concatenation Marble Diagram

你注意到第一個 Observable 的值 b 後面的豎線了嗎?這標誌著第一個具有值 a 和 b (series1$) 的 Observable 完成的時間點。

讓我們按照時間表逐步分解這裡發生的事情:

  • 兩個 Observables series1$ 和 series2$ 被傳遞給 concat() 函式
  • concat() 然後將訂閱第一個 Observable series1$,但不會訂閱第二個 Observable series2$(這對於理解串聯至關重要)
  • source1$ 發出值 a,該值立即反映在輸出 result$ Observable 中
  • 注意 source2$ Observable 還沒有發出值,因為它還沒有被訂閱
  • 然後 source1$ 將發出 b 值,該值反映在輸出中
  • 然後 source1$ 將完成,只有在此之後 concat() 現在訂閱 source2$
  • 然後 source2$ 值將開始反映在輸出中,直到 source2$ 完成
  • 當 source2$ 完成時, result$ Observable 也將完成
  • 請注意,我們可以將任意數量的 Observable 傳遞給 concat(),而不僅僅是本示例中的兩個

The key point about Observable Concatenation

正如我們所看到的,Observable 連線就是關於 Observable 的完成! 我們取第一個 Observable 並使用它的值,等待它完成,然後我們使用下一個 Observable,依此類推,直到所有 Observable 完成。

回到我們的高階 Observable 對映示例,讓我們看看串聯的概念如何幫助我們。

Using Observable Concatenation to implement sequential saves

正如我們所見,為了確保我們的表單值按順序儲存,我們需要獲取每個表單值並將其對映到 httpPost$ Observable。

然後我們需要訂閱它,但我們希望在訂閱下一個 httpPost$ Observable 之前完成儲存。

In order to ensure sequentiality, we need to concatenate the multiple httpPost$ Observables together!

然後我們將訂閱每個 httpPost$ 並按順序處理每個請求的結果。 最後,我們需要的是一個混合了以下內容的運算子:

  • 一個高階對映操作(獲取表單值並將其轉換為 httpPost$ Observable)

  • 使用 concat() 操作,將多個 httpPost$ Observables 連線在一起以確保在前一個正在進行的儲存首先完成之前不會進行下一個 HTTP 儲存。

我們需要的是恰當命名的 RxJs concatMap Operator,它混合了高階對映和 Observable 連線。

The RxJs concatMap Operator

程式碼如下:

this.form.valueChanges
    .pipe(
        concatMap(formValue => this.http.put(`/api/course/${courseId}`, 
                                             formValue))
    )
    .subscribe(
       saveResult =>  ... handle successful save ...,
        err => ... handle save error ...      
    );

正如我們所見,使用像 concatMap 這樣的高階對映運算子的第一個好處是現在我們不再有巢狀訂閱。

通過使用 concatMap,現在所有表單值都將按順序傳送到後端,如 Chrome DevTools Network 選項卡中所示:

Breaking down the concatMap network log diagram

正如我們所見,只有在上一次儲存完成後才會啟動一個儲存 HTTP 請求。 以下是 concatMap 運算子如何確保請求始終按順序發生:

  • concatMap 正在獲取每個表單值並將其轉換為儲存的 HTTP Observable,稱為內部 Observable

  • concatMap 然後訂閱內部 Observable 並將其輸出傳送到結果 Observable 第二個表單值可能比在後端儲存前一個表單值更快

  • 如果發生這種情況,新的表單值將不會立即對映到 HTTP 請求

  • 相反, concatMap 將等待先前的 HTTP Observable 完成,然後將新值對映到 HTTP Observable,訂閱它並因此觸發下一次儲存

Observable Merging

將 Observable 串聯應用於一系列 HTTP 儲存操作似乎是確保儲存按預期順序發生的好方法。

但是在其他情況下,我們希望並行執行,而不需要等待前一個內部 Observable 完成。

為此,我們有合併 Observable 組合策略! 與 concat 不同,Merge 不會在訂閱下一個 Observable 之前等待 Observable 完成。

相反,merge 同時訂閱每個合併的 Observable,然後隨著多個值隨著時間的推移到達,它將每個源 Observable 的值輸出到組合結果 Observable。

Practical Merge Example

為了明確合併不依賴於完成,讓我們合併兩個從未完成的 Observables,因為它們是 interval Observables:

const series1$ = interval(1000).pipe(map(val => val*10));

const series2$ = interval(1000).pipe(map(val => val*100));

const result$ = merge(series1$, series2$);

result$.subscribe(console.log);

使用 interval() 建立的 Observable 將每隔一秒發出值 0、1、2 等,並且永遠不會完成。

請注意,我們對這些區間 Observable 應用了幾個 map 運算子,只是為了更容易在控制檯輸出中區分它們。

以下是控制檯中可見的前幾個值:

0
0
10
100
20
200
30
300

Merging and Observable Completion

正如我們所見,合併的源 Observable 的值在發出時立即顯示在結果 Observable 中。 如果合併的 Observable 之一完成,merge 將繼續發出其他 Observable 隨著時間到達的值。

請注意,如果源 Observables 完成,合併仍會以相同的方式工作。

The Merge Marble Diagram

看另一個例子:

正如我們所見,合併的源 Observables 的值立即顯示在輸出中。 直到所有合併的 Observable 完成後,結果 Observable 才會完成。

現在我們瞭解了合併策略,讓我們看看它如何在高階 Observable 對映的上下文中使用。

回到我們之前的表單草稿儲存示例,很明顯在這種情況下我們需要 concatMap 而不是 mergeMap,因為我們不希望儲存並行發生。

讓我們看看如果我們不小心選擇了 mergeMap 會發生什麼:


this.form.valueChanges
    .pipe(
        mergeMap(formValue => 
                 this.http.put(`/api/course/${courseId}`, 
                               formValue))
    )
    .subscribe(
       saveResult =>  ... handle successful save ...,
        err => ... handle save error ...      
    );

現在假設使用者與表單互動並開始相當快地輸入資料。 在這種情況下,我們現在會在網路日誌中看到多個並行執行的儲存請求:

正如我們所看到的,請求是並行發生的,在這種情況下是一個錯誤! 在高負載下,這些請求可能會被亂序處理。

Observable Switching

現在我們來談談另一個 Observable 組合策略:切換。 切換的概念更接近於合併而不是串聯,因為我們不等待任何 Observable 終止。

但是在切換時,與合併不同,如果一個新的 Observable 開始發出值,我們將在訂閱新的 Observable 之前取消訂閱之前的 Observable。

Observable 切換就是為了確保未使用的 Observables 的取消訂閱邏輯被觸發,從而可以釋放資源!

Switch Marble Diagram

注意對角線,這些不是偶然的! 在切換策略的情況下,在圖中表示高階 Observable 很重要,它是影象的頂行。

這個高階 Observable 發出的值本身就是 Observable。

對角線從高階 Observable 頂線分叉的那一刻,是 Observable 值被 switch 發出和訂閱的那一刻。

Breaking down the switch Marble Diagram

這是這張圖中發生的事情:

  • 高階 Observable 發出它的第一個內部 Observable (a-b-c-d),它被訂閱(通過 switch 策略實現)

  • 第一個內部 Observable (a-b-c-d) 發出值 a 和 b,它們立即反映在輸出中

  • 但隨後第二個內部 Observable (e-f-g) 被髮射,這觸發了第一個內部 Observable (a-b-c-d) 的取消訂閱,這是切換的關鍵部分

  • 然後第二個內部 Observable (e-f-g) 開始發出新值,這些值反映在輸出中

  • 但請注意,第一個內部 Observable (a-b-c-d) 同時仍在發出新值 c 和 d

  • 然而,這些後來的值沒有反映在輸出中,那是因為我們同時取消了第一個內部 Observable (a-b-c-d) 的訂閱

我們現在可以理解為什麼必須以這種不尋常的方式繪製圖表,用對角線:這是因為我們需要在每個內部 Observable 被訂閱(或取消訂閱)時直觀地表示,這發生在對角線從源高階 Observable。

The RxJs switchMap Operator

然後讓我們採用切換策略並將其應用於高階對映。 假設我們有一個普通的輸入流,它發出值 1、3 和 5。

然後我們將每個值對映到一個 Observable,就像我們在 concatMap 和 mergeMap 的情況下所做的那樣,並獲得一個更高階的 Observable。

如果我們現在在發出的內部 Observable 之間切換,而不是連線或合併它們,我們最終會得到 switchMap 運算子:

Breaking down the switchMap Marble Diagram

這是該運算子的工作原理:

  • 源 observable 發出值 1、3 和 5
  • 然後通過應用對映函式將這些值轉換為 Observable
  • 對映的內部 Observable 被 switchMap 訂閱
  • 當內部 Observables 發出一個值時,該值會立即反映在輸出中
  • 但是如果在前一個 Observable 有機會完成之前發出了像 5 這樣的新值,則前一個內部 Observable (30-30-30) 將被取消訂閱,並且它的值將不再反映在輸出中
  • 注意上圖中紅色的 30-30-30 內部 Observable:最後 30 個值沒有發出,因為 30-30-30 內部 Observable 被取消訂閱

如我們所見,Observable 切換就是確保我們從未使用的 Observable 觸發取消訂閱邏輯。 現在讓我們看看 switchMap 的執行情況!

Search TypeAhead - switchMap Operator Example

switchMap 的一個非常常見的用例是搜尋 Typeahead。 首先讓我們定義源 Observable,其值本身將觸發搜尋請求。

這個源 Observable 將發出值,這些值是使用者在輸入中鍵入的搜尋文字:

const searchText$: Observable<string> = 
      fromEvent<any>(this.input.nativeElement, 'keyup')
    .pipe(
        map(event => event.target.value),
        startWith('')
    )
    .subscribe(console.log);

此源 Observable 連結到使用者鍵入其搜尋的輸入文字欄位。 當用戶輸入單詞“Hello World”作為搜尋時,這些是 searchText$ 發出的值:

Debouncing and removing duplicates from a Typeahead

請注意重複值,要麼是由於使用兩個單詞之間的空格,要麼是使用 Shift 鍵將字母 H 和 W 大寫。

為了避免將所有這些值作為單獨的搜尋請求傳送到後端,讓我們使用 debounceTime 運算子等待使用者輸入穩定:

const searchText$: Observable<string> = 
      fromEvent<any>(this.input.nativeElement, 'keyup')
    .pipe(
        map(event => event.target.value),
        startWith(''),
        debounceTime(400)
    )
    .subscribe(console.log);

使用此運算子,如果使用者以正常速度鍵入,則 searchText$ 的輸出中現在只有一個值:Hello World

這已經比我們之前的要好得多,現在只有在穩定至少 400 毫秒時才會發出值!

但是如果使用者在考慮搜尋時輸入緩慢,以至於兩個值之間需要超過 400 毫秒,那麼搜尋流可能如下所示:

此外,使用者可以鍵入一個值,按退格鍵並再次鍵入,這可能會導致重複的搜尋值。 我們可以通過新增 distinctUntilChanged 操作符來防止重複搜尋的發生。

Cancelling obsolete searches in a Typeahead

但更重要的是,我們需要一種方法來取消以前的搜尋,因為新的搜尋開始了。

我們在這裡要做的是將每個搜尋字串轉換為後端搜尋請求並訂閱它,並在兩個連續的搜尋請求之間應用切換策略,如果觸發新的搜尋,則取消之前的搜尋。

這正是 switchMap 運算子將要做的! 這是使用它的 Typeahead 邏輯的最終實現:

const searchText$: Observable<string> = 
      fromEvent<any>(this.input.nativeElement, 'keyup')
    .pipe(
        map(event => event.target.value),
        startWith(''),
        debounceTime(400),
        distinctUntilChanged()
    ); 

const lessons$: Observable<Lesson[]> = searchText$
    .pipe(
        switchMap(search => this.loadLessons(search))        
    )
    .subscribe();

function loadLessons(search:string): Observable<Lesson[]> {
    
    const params = new HttpParams().set('search', search);
   
    return this.http.get(`/api/lessons/${coursesId}`, {params});
}

switchMap Demo with a Typeahead

現在讓我們看看 switchMap 操作符的作用! 如果使用者在搜尋欄上輸入,然後猶豫並輸入其他內容,我們通常會在網路日誌中看到以下內容:

正如我們所看到的,之前的一些搜尋在進行時已被取消,這很棒,因為這將釋放可用於其他事情的伺服器資源。

The Exhaust Strategy

switchMap 操作符是預輸入場景的理想選擇,但在其他情況下,我們想要做的是忽略源 Observable 中的新值,直到前一個值被完全處理。

例如,假設我們正在觸發後端儲存請求以響應單擊儲存按鈕。 我們可能首先嚐試使用 concatMap 運算子來實現這一點,以確保儲存操作按順序發生:

fromEvent(this.saveButton.nativeElement, 'click')
    .pipe(
        concatMap(() => this.saveCourse(this.form.value))
    )
    .subscribe();

這確保儲存按順序完成,但是如果使用者多次單擊儲存按鈕會發生什麼? 以下是我們將在網路日誌中看到的內容:

正如我們所見,每次點選都會觸發自己的儲存:如果我們點選 20 次,我們會得到 20 次儲存! 在這種情況下,我們想要的不僅僅是確保按順序進行儲存。

我們還希望能夠忽略點選,但前提是儲存已經在進行中。 排氣 Observable 組合策略將允許我們做到這一點。

Exhaust Marble Diagram

就像以前一樣,我們在第一行有一個更高階的 Observable,它的值本身就是 Observable,從第一行分叉出來。這是這張圖中發生的事情:

  • 就像 switch 的情況一樣,exhaust 訂閱第一個內部 Observable (a-b-c) 像往常一樣,值 a、b 和 c 會立即反映在輸出中
  • 然後發出第二個內部 Observable (d-e-f),而第一個 Observable (a-b-c) 仍在進行中
  • 第二個 Observable 被排放策略丟棄,並且不會被訂閱(這是排放的關鍵部分) 只有在第一個 Observable (a-b-c) 完成後,排氣策略才會訂閱新的 Observable
  • 當第三個 Observable (g-h-i) 發出時,第一個 Observable (a-b-c) 已經完成,所以第三個 Observable 不會被丟棄,會被訂閱
  • 然後,第三個 Observable 的值 g-h-i 將顯示在結果 Observable 的輸出中,與輸出中不存在的值 d-e-f 不同

就像 concat、merge 和 switch 的情況一樣,我們現在可以在高階對映的上下文中應用 exhaust 策略。

The RxJs exhaustMap Operator

現在讓我們看看exhaustMap 操作符的彈珠圖。 讓我們記住,與上圖的第一行不同,源 Observable 1-3-5 發出的值不是 Observable。

相反,這些值可以是例如滑鼠點選:

所以這是在排放地圖圖的情況下發生的事情:

  • 發出值 1,並建立內部 Observable 10-10-10
  • Observable 10-10-10 發出所有值並在源 Observable 中發出值 3 之前完成,因此所有 10-10-10 值在輸出中發出
  • 在輸入中發出一個新值 3,觸發一個新的 30-30-30 內部 Observable
  • 但是現在,雖然 30-30-30 仍在執行,但我們在源 Observable 中得到了一個新值 5
  • 這個值 5 被排氣策略丟棄,這意味著從未建立 50-50-50 Observable,因此 50-50-50 值從未出現在輸出中

A Practical Example for exhaustMap

現在讓我們將這個新的exhaustMap Operator 應用到我們的儲存按鈕場景中:

fromEvent(this.saveButton.nativeElement, 'click')
    .pipe(
        exhaustMap(() => this.saveCourse(this.form.value))
    )
    .subscribe();

如果我們現在點選儲存,假設連續 5 次,我們將獲得以下網路日誌:

正如我們所看到的,我們在儲存請求仍在進行時所做的點選被忽略了,正如預期的那樣!

請注意,如果我們連續點選例如 20 次,最終正在進行的儲存請求將完成,然後第二個儲存請求將開始。

How to choose the right mapping Operator?

concatMap、mergeMap、switchMap 和exhaustMap 的行為相似,因為它們都是高階對映運算子。

但它在許多微妙的方面也如此不同,以至於實際上沒有一個運算子可以安全地指向預設值。

相反,我們可以簡單地根據用例選擇合適的運算子:

  • 如果我們需要在等待完成的同時按順序做事情,那麼 concatMap 是正確的選擇

  • 對於並行處理,mergeMap 是最好的選擇

  • 如果我們需要取消邏輯,switchMap 是要走的路

  • 為了在當前的 Observables 仍在進行時忽略新的 Observables,exhaustMap 就是這樣做的

總結

正如我們所見,RxJ 的高階對映運算子對於在響應式程式設計中執行一些非常常見的操作(例如網路呼叫)至關重要。

為了真正理解這些對映操作符及其名稱,我們首先需要重點了解底層的Observable組合策略concat、merge、switch和exhaust。

我們還需要意識到有一個更高階的對映操作正在發生,其中值被轉換成分離的 Observables,並且這些 Observables 被對映運算子本身以隱藏的方式訂閱。

選擇正確的運算元就是選擇正確的內部 Observable 組合策略。 選擇錯誤的運算子通常不會導致程式立即損壞,但隨著時間的推移可能會導致一些難以解決的問題。

更多Jerry的原創文章,盡在:"汪子熙":