RxJS CombineLatest operator 的一个具体使用例子

CombineLatest 的使用场景:

This operator is best used when you have multiple, long-lived observables that rely on each other for some calculation or determination.

当有多个长时间存活的 Observable,且依赖彼此,共同完成某些计算逻辑时,适合使用 CombineLatest.

When any observable emits a value, emit the last emitted value from each.

为了学习 CombineLatest 的用法,我写了下面这个小程序:

import { Component, OnInit, Inject } from '@angular/core';
import { fromEvent, combineLatest } from 'rxjs';
import { mapTo, startWith, scan, tap, map } from 'rxjs/operators';
import { DOCUMENT } from '@angular/common';

@Component({
  selector: 'app-combine-latest',
  templateUrl: './combine-latest.component.html'
})
export class CombineLatestComponent implements OnInit {
  readonly document: Document;

  constructor(
    // https://github.com/angular/angular/issues/20351
    @Inject(DOCUMENT) document: any) {
      this.document = document as Document;
    }

  redTotal:HTMLElement;
  blackTotal: HTMLElement;
  total:HTMLElement;
  test:HTMLElement;

  ngOnInit(): void {
    this.redTotal = this.document.getElementById('red-total');
    this.blackTotal = this.document.getElementById('black-total');
    this.total = this.document.getElementById('total');
    this.test = this.document.getElementById('test');

    combineLatest(this.addOneClick$('red'), 

    this.addOneClick$('black')).subscribe(([red, black]: any) => {
      this.redTotal.innerHTML = red;
      this.blackTotal.innerHTML = black;
      this.total.innerHTML = red + black;
    });

    fromEvent(this.test, 'click').pipe(map( event => event.timeStamp), mapTo(1)).subscribe((event) => console.log(event));
  }

  addOneClick$ = id =>
  fromEvent(this.document.getElementById(id), 'click').pipe(
    // map every click to 1
    mapTo(1),
    // keep a running total
    scan((acc, curr) => acc + curr, 0),
    startWith(0)
  );
}

效果:

  • 点击 Red 按钮,则 Red 计数器 和 Total 计数器 加 1

  • 点击 Black 按钮,则 Black 计数器 和 Total 计数器 加 1

combine 输入参数:两个 Observable:

我这个例子里,只执行下面这行语句,其他 IF 分支都没有进去:

return fromArray(observables, scheduler).lift(new CombineLatestOperator(resultSelector))

首先执行 fromArray:输入是 Array,包含两个元素:

fromArray: 返回一个新的 Observable,输入是subscribeToArray(input).

关于 subscribeToArray 的逻辑分析,参考我这篇文章:Rxjs 里 subscribeToArray 工具函数的详细分析.

下一步实例化 CombineLatestOperator:

执行 lift 操作,创建新的 Observable 对象:

应用程序调用 Observable 对象的 subscribe 方法:

闭包里包含的两个 Observable,分别 for red 和 black 按钮:

顺着 Observable 的 source 属性和 _subscribe, 能找到 该 Observable pipe 里传递的所有操作:

首先使用 array 的第一个元素作为参数,调用 subscriber 函数:

先执行 mapTo(1) 逻辑:

Maptosubscriber 的 destination 指向 Scansubscriber:

scan.js 的内部实现:

accumulator 就是应用程序自定义的函数:

其中 acc 就是 scan.js 里的 seed,而 curr 即是当前值。

当前这轮迭代的结果存入 Scan Operator 的 seed 字段里,作为下一次迭代的输入:

这种 Operator 的实现都有套路:

  1. export 的 function,就是传入 Observable.pipe 里的代码:

  1. operator 实现的 call 函数

  2. ScanSubscriber 继承了 Subscriber,重新实现了 _next 方法。不同的 subscriber,差异就体现在这些 _next 方法上。

点击 red 或者 black 按钮后,两个 Observable 的初始值:0,0:

这个0,0 是怎么生成的? 两个空的对象:NONE

是在这里插入的:

这个 merge map 应该是框架自动生成的:

第一个元素已经从空对象转换成了0:

这行语句执行完之后,就变成两个 0 了:

使用 slice API 将数组复制一份:

由此可见,combineLatest Operator 本身不维护状态,而是等待维护状态的 scan 的输入:

(0)

相关推荐