1. 程式人生 > 其它 >RxJS CombineLatest operator 的一個具體使用例子

RxJS CombineLatest operator 的一個具體使用例子

CombineLatest 的使用場景:

This operator is best used when you have multiple, long-lived observables that rely on each other for some calculation or determination.

當有多個長時間存活的 Observable,且依賴彼此,共同完成某些計算邏輯時,適合使用 CombineLatest.

When any observable emits a value, emit the last emitted value from each.

為了學習 CombineLatest 的用法,我寫了下面這個小程式:

import { Component, OnInit, Inject } from '@angular/core';
import { fromEvent, combineLatest } from 'rxjs';
import { mapTo, startWith, scan, tap, map } from 'rxjs/operators';
import { DOCUMENT } from '@angular/common';

@Component({
  selector: 'app-combine-latest',
  templateUrl: './combine-latest.component.html'
})
export class CombineLatestComponent implements OnInit {
  readonly document: Document;

  constructor(
    // https://github.com/angular/angular/issues/20351
    @Inject(DOCUMENT) document: any) {
      this.document = document as Document;
    }

  redTotal:HTMLElement;
  blackTotal: HTMLElement;
  total:HTMLElement;  
  test:HTMLElement;

  ngOnInit(): void {
    this.redTotal = this.document.getElementById('red-total'); 
    this.blackTotal = this.document.getElementById('black-total');
    this.total = this.document.getElementById('total');
    this.test = this.document.getElementById('test');

    combineLatest(this.addOneClick$('red'), 
    
    this.addOneClick$('black')).subscribe(([red, black]: any) => {
      this.redTotal.innerHTML = red;
      this.blackTotal.innerHTML = black;
      this.total.innerHTML = red + black;
    });

    fromEvent(this.test, 'click').pipe(map( event => event.timeStamp), mapTo(1)).subscribe((event) => console.log(event));
  }

  addOneClick$ = id =>
  fromEvent(this.document.getElementById(id), 'click').pipe(
    // map every click to 1
    mapTo(1),
    // keep a running total
    scan((acc, curr) => acc + curr, 0),
    startWith(0)
  );
}

效果:

  • 點選 Red 按鈕,則 Red 計數器 和 Total 計數器 加 1
  • 點選 Black 按鈕,則 Black 計數器 和 Total 計數器 加 1

combine 輸入引數:兩個 Observable:

我這個例子裡,只執行下面這行語句,其他 IF 分支都沒有進去:

return fromArray(observables, scheduler).lift(new CombineLatestOperator(resultSelector))

首先執行 fromArray:輸入是 Array,包含兩個元素:

fromArray: 返回一個新的 Observable,輸入是subscribeToArray(input).

關於 subscribeToArray 的邏輯分析,參考我這篇文章:Rxjs 裡 subscribeToArray 工具函式的詳細分析.

下一步例項化 CombineLatestOperator:

執行 lift 操作,建立新的 Observable 物件:

應用程式呼叫 Observable 物件的 subscribe 方法:

閉包裡包含的兩個 Observable,分別 for red 和 black 按鈕:

順著 Observable 的 source 屬性和 _subscribe, 能找到 該 Observable pipe 裡傳遞的所有操作:

首先使用 array 的第一個元素作為引數,呼叫 subscriber 函式:

先執行 mapTo(1) 邏輯:

Maptosubscriber 的 destination 指向 Scansubscriber:

scan.js 的內部實現:

accumulator 就是應用程式自定義的函式:

其中 acc 就是 scan.js 裡的 seed,而 curr 即是當前值。

當前這輪迭代的結果存入 Scan Operator 的 seed 欄位裡,作為下一次迭代的輸入:

這種 Operator 的實現都有套路:

  1. export 的 function,就是傳入 Observable.pipe 裡的程式碼:
  1. operator 實現的 call 函式
  2. ScanSubscriber 繼承了 Subscriber,重新實現了 _next 方法。不同的 subscriber,差異就體現在這些 _next 方法上。

點選 red 或者 black 按鈕後,兩個 Observable 的初始值:0,0:

這個0,0 是怎麼生成的?
兩個空的物件:NONE

是在這裡插入的:

這個 merge map 應該是框架自動生成的:

第一個元素已經從空物件轉換成了0:

這行語句執行完之後,就變成兩個 0 了:

使用 slice API 將陣列複製一份:

由此可見,combineLatest Operator 本身不維護狀態,而是等待維護狀態的 scan 的輸入:

更多Jerry的原創文章,盡在:"汪子熙":