RxJS快速入门

内容导航

目录
  • 内容导航
  • RxJS是什么
  • RxJS的主要成员
    • Observable (可观察对象)

      • 创建 Observable
      • 订阅 Observables
      • 执行 Observables
      • 清理 Observable 执行
    • Observer (观察者)
    • Subscription (订阅)
    • Subject (主体)
      • 多播的 Observables
      • BehaviorSubject
      • ReplaySubject
      • AsyncSubject
    • Scheduler (调度器)
      • 调度器类型
    • Pipeable(操作符)
      • 常用的操作符
      • 创建操作符
      • 连接创建操作符
      • 转换操作符
      • 过滤操作符
      • 组合操作符
      • 多播操作符
      • 错误处理操作符
      • 工具操作符
      • 条件和布尔操作符
      • 数学和聚合操作符

RxJS是什么

RxJS 是一个库,它通过使用 observable 序列来编写异步和基于事件的程序。它提供了一个核心类型 Observable,附属类型 (Observer、 Schedulers、 Subjects) 和受 [Array#extras] 启发的操作符 (map、filter、reduce、every, 等等),这些数组操作符可以把异步事件作为集合来处理。

可以把 RxJS 当做是用来处理事件的 Lodash

ReactiveX 结合了 观察者模式迭代器模式使用集合的函数式编程,以满足以一种理想方式来管理事件序列所需要的一切。

RxJS的主要成员

  • Observable (可观察对象): 表示一个概念,这个概念是一个可调用的未来值或事件的集合。
  • Observer (观察者): 一个回调函数的集合,它知道如何去监听由 Observable 提供的值。
  • Subscription (订阅): 表示 Observable 的执行,主要用于取消 Observable 的执行。
  • Operators (操作符): 采用函数式编程风格的纯函数 (pure function),使用像 mapfilterconcatflatMap 等这样的操作符来处理集合。
  • Subject (主体): 相当于 EventEmitter,并且是将值或事件多路推送给多个 Observer 的唯一方式。
  • Schedulers (调度器): 用来控制并发并且是中央集权的调度员,允许我们在发生计算时进行协调,例如 setTimeoutrequestAnimationFrame 或其他。

Observable (可观察对象)

RxJS 是基于观察者模式和迭代器模式以函数式编程思维来实现的。RxJS 中含有两个基本概念:Observables 与 Observer。Observables 作为被观察者,是一个值或事件的流集合;而 Observer 则作为观察者,根据 Observables 进行处理。Observables 是多个值的惰性推送集合。

  • of():用于创建简单的Observable,该Observable只发出给定的参数,在发送完这些参数后发出完成通知.
  • from():从一个数组、类数组对象、promise、迭代器对象或者类Observable对象创建一个Observable.
  • fromEvent(),:把event转换成Observable.
  • range():在指定起始值返回指定数量数字.
  • interval():基于给定时间间隔发出数字序列。返回一个发出无限自增的序列整数,可以选择固定的时间间隔进行发送。
  • timer():创建一个Observable,该Observable在初始延时之后开始发送并且在每个时间周期后发出自增的数字

创建 Observable

import { Observable,of,from,fromEvent,range,interval } from 'rxjs';
import { map } from 'rxjs/operators';

const Observable1 = new Observable(subscriber => {
    try{
          subscriber.next(1);
          subscriber.next(2);
          subscriber.next(3);
          setTimeout(() => {
            subscriber.next(4);
            subscriber.complete();
          }, 1000);
        } catch (err) {
        subscriber.error(err);//传递一个错误对象,如果捕捉到异常的话。
    }
    });
    const Observable2 = from([
      { name: 'Dave', age: 34, salary: 2000 },
      { name: 'Nick', age: 37, salary: 32000 },
      { name: 'Howie', age: 40, salary: 26000 },
      { name: 'Brian', age: 40, salary: 30000 },
      { name: 'Kevin', age: 47, salary: 24000 },
    ]);
const Observable3 = of("Dave","Nick");//把所有参数组合到数组,逐个提供给消费者
const Observable4 = range(1,10);
const Observable5 = interval(3000);//从零开始每3000毫秒自增并提供给消费者
const Observable6 = timer(3000,1000);//等待3000毫秒后,从零开始每1000毫秒自增并提供给消费者

订阅 Observables

因为 Observable 执行可能会是无限的,并且观察者通常希望能在有限的时间内中止执行,所以我们需要一个 API 来取消执行。因为每个执行都是其对应观察者专属的,一旦观察者完成接收值,它必须要一种方法来停止执行,以避免浪费计算能力或内存资源。

当调用了 observable.subscribe ,观察者会被附加到新创建的 Observable 执行中。这个调用还返回一个对象,即 Subscription (订阅):

import { Observable,of,from,fromEvent,range,interval } from 'rxjs';
const observable1 = range(1,10);
    observable1.subscribe(
          num => {
            console.log(num);
          },
          err => console.log(err),
          () => console.log("Streaming is over.")
        );

执行 Observables

Observable 执行可以传递三种类型的值:

  • "Next" 通知: 发送一个值,比如数字、字符串、对象,等等。
  • "Error" 通知: 发送一个 JavaScript 错误 或 异常。
  • "Complete" 通知: 不再发送任何值。

"Next" 通知是最重要,也是最常见的类型:它们表示传递给观察者的实际数据。"Error" 和 "Complete" 通知可能只会在 Observable 执行期间发生一次,并且只会执行其中的一个。

import { Observable,of,from,fromEvent,range,interval } from 'rxjs';
import { map } from 'rxjs/operators';

const observable = new Observable(subscriber => {
    try{
          subscriber.next(1);
          subscriber.next(2);
          subscriber.next(3);
          subscriber.complete();
          subscriber.next(4); // 因为违反规约,所以不会发送
        } catch (err) {
        subscriber.error(err);//传递一个错误对象,如果捕捉到异常的话。
    }
    });

清理 Observable 执行

因为 Observable 执行可能会是无限的,并且观察者通常希望能在有限的时间内中止执行,所以我们需要一个 API 来取消执行。因为每个执行都是其对应观察者专属的,一旦观察者完成接收值,它必须要一种方法来停止执行,以避免浪费计算能力或内存资源

当你订阅了 Observable,你会得到一个 Subscription ,它表示进行中的执行。只要调用 unsubscribe() 方法就可以取消执行。

import { Observable,of,from,fromEvent,range,interval } from 'rxjs';
    const observable = new Observable(subscriber => {
      let intervalID = setInterval(() => {
        subscriber.next('hi');
      }, 1000);
      // 提供取消和清理 interval 资源的方法
      return function unsubscribe() {
        clearInterval(intervalID);
      };
    });
    let subscription = observable.subscribe(x => console.log(x));
    subscription.unsubscribe();

Observer (观察者)

观察者是由 Observable 发送的值的消费者。观察者只是一组回调函数的集合,每个回调函数对应一种 Observable 发送的通知类型:nexterrorcomplete 。下面的示例是一个典型的观察者对象:

观察者只是有三个回调函数的对象,每个回调函数对应一种 Observable 发送的通知类型。

observable.subscribe(
    next: x => console.log('Observer got a next value: ' + x),
    error: err => console.error('Observer got an error: ' + err),
    complete: () => console.log('Observer got a complete notification')
);

Subscription (订阅)

Subscription 是表示可清理资源的对象,通常是 Observable 的执行。Subscription 有一个重要的方法,即 unsubscribe,它不需要任何参数,只是用来清理由 Subscription 占用的资源。在上一个版本的 RxJS 中,Subscription 叫做 "Disposable" (可清理对象)。

Subscription 基本上只有一个 unsubscribe() 函数,这个函数用来释放资源或去取消 Observable 执行。

import { Observable,of,from,fromEvent,range,interval } from 'rxjs';
var observable1 = interval(1000);
    var subscription1 = observable1.subscribe(x => console.log(x));
    // 稍后:
    // 这会取消正在进行中的 Observable 执行
    // Observable 执行是通过使用观察者调用 subscribe 方法启动的
    subscription1.unsubscribe();

    var observable2 = interval(400);
    var observable3 = interval(300);
    var subscription2 = observable2.subscribe(x => console.log('first: ' + x));
    var childSubscription = observable3.subscribe(x => console.log('second: ' + x));
    subscription2.add(childSubscription);
    setTimeout(() => {
      // subscription 和 childSubscription 都会取消订阅
      subscription2.unsubscribe();
    }, 1000);

Subject (主体)

RxJS Subject 是一种特殊类型的 Observable,它允许将值多播给多个观察者,所以 Subject 是多播的,而普通的 Observables 是单播的(每个已订阅的观察者都拥有 Observable 的独立执行)。

Subject 像是 Observable,但是可以多播给多个观察者。Subject 还像是 EventEmitters,维护着多个监听器的注册表。

还有一些特殊类型的 Subject:BehaviorSubjectReplaySubjectAsyncSubject

每个 Subject 都是 Observable 。 - 对于 Subject,你可以提供一个观察者并使用 subscribe 方法,就可以开始正常接收值。从观察者的角度而言,它无法判断 Observable 执行是来自普通的 Observable 还是 Subject 。

在 Subject 的内部,subscribe 不会调用发送值的新执行。它只是将给定的观察者注册到观察者列表中,类似于其他库或语言中的 addListener 的工作方式。

每个 Subject 都是观察者。 - Subject 是一个有如下方法的对象: next(v)error(e)complete() 。要给 Subject 提供新值,只要调用 next(theValue),它会将值多播给已注册监听该 Subject 的观察者们。

import { Subject,from } from 'rxjs';
//我们为 Subject 添加了两个观察者,然后给 Subject 提供一些值
 var subject1 = new Subject();
    subject1.subscribe({
      next: (v) => console.log('observerA: ' + v)
    });
    subject1.subscribe({
      next: (v) => console.log('observerB: ' + v)
    });
    subject1.next(1);
    subject1.next(2);
    //因为 Subject 是观察者,这也就在意味着你可以把 Subject 作为参数传给任何 Observable 的 subscribe 方法
    var subject2 =new Subject();
    subject2.subscribe({
      next: (v) => console.log('observerA: ' + v)
    });
    subject2.subscribe({
      next: (v) => console.log('observerB: ' + v)
    });
    var observable = from([1, 2, 3]);
    observable.subscribe(subject2); // 你可以提供一个 Subject 进行订阅

多播的 Observables

“多播 Observable” 通过 Subject 来发送通知,这个 Subject 可能有多个订阅者,然而普通的 “单播 Observable” 只发送通知给单个观察者。

多播 Observable 在底层是通过使用 Subject 使得多个观察者可以看见同一个 Observable 执行。

在底层,这就是 multicast 操作符的工作原理:观察者订阅一个基础的 Subject,然后 Subject 订阅源 Observable 。

import { Subject } from 'rxjs/internal/Subject';
import { take, multicast } from 'rxjs/operators';
    const source = timer(1000, 2500).pipe(take(5));
    const subject = new Subject();
    subject.subscribe({
      next: (v) => console.log('observerC: ' + v)
    });
    subject.subscribe({
      next: (v) => console.log('observerD: ' + v)
    });
    const multicasted = source.pipe(multicast(subject));
    multicasted.subscribe({
      next: (v) => console.log('observerA: ' + v)
    });
    multicasted.subscribe({
      next: (v) => console.log('observerB: ' + v)
    });
source.subscribe(subject);

BehaviorSubject

Subject 的其中一个变体就是 BehaviorSubject,它有一个“当前值”的概念。它保存了发送给消费者的最新值。并且当有新的观察者订阅时,会立即从 BehaviorSubject 那接收到“当前值”。

BehaviorSubjects 适合用来表示“随时间推移的值”。举例来说,生日的流是一个 Subject,但年龄的流应该是一个 BehaviorSubject 。

import { BehaviorSubject } from 'rxjs';
//BehaviorSubject 使用值0进行初始化,当第一个观察者订阅时会得到0。第二个观察者订阅时会得到值2,尽管它是在值2发送之后订阅的。
const subject = new BehaviorSubject(0); // 0是初始值
    subject.subscribe({
      next: (v) => console.log('observerA: ' + v)
    });

    subject.next(1);
    subject.next(2);

    subject.subscribe({
      next: (v) => console.log('observerB: ' + v)
    });

    subject.next(3);

ReplaySubject

ReplaySubject 类似于 BehaviorSubject,它可以发送旧值给新的订阅者,但它还可以记录 Observable 执行的一部分。

ReplaySubject 记录 Observable 执行中的多个值并将其回放给新的订阅者。

除了缓冲数量,你还可以指定 window time (以毫秒为单位)来确定多久之前的值可以记录。

import { ReplaySubject } from 'rxjs';
const subject = new ReplaySubject(3); // 为新的订阅者缓冲最后3个值
    subject.subscribe({
      next: (v) => console.log('observerA: ' + v)
    });
    subject.next(1);
    subject.next(2);
    subject.next(3);
    subject.next(4);
    subject.subscribe({
      next: (v) => console.log('observerB: ' + v)
    });
    subject.next(5);

//我们缓存数量100,但 window time 参数只设置了120毫秒
    const subject = new ReplaySubject(100, 120 /* windowTime */);

    subject.subscribe({
      next: (v) => console.log('observerA: ' + v)
    });
    let i = 1;
    setInterval(() => subject.next(i++), 200);
    setTimeout(() => {
      subject.subscribe({
        next: (v) => console.log('observerB: ' + v)
      });
    }, 1000);

AsyncSubject

AsyncSubject 是另一个 Subject 变体,只有当 Observable 执行完成时(执行 complete()),它才会将执行的最后一个值发送给观察者。

AsyncSubject 和 last() 操作符类似,因为它也是等待 complete 通知,以发送一个单个值。

import { AsyncSubject } from 'rxjs';
   const subject = new AsyncSubject();

    subject.subscribe({
      next: (v) => console.log('observerA: ' + v)
    });

    subject.next(1);
    subject.next(2);
    subject.next(3);
    subject.next(4);

    subject.subscribe({
      next: (v) => console.log('observerB: ' + v)
    });

    subject.next(5);
    subject.complete();

Scheduler (调度器)

调度器控制着何时启动 subscription 和何时发送通知。它由三部分组成:

  • 调度器是一种数据结构。 它知道如何根据优先级或其他标准来存储任务和将任务进行排序。
  • 调度器是执行上下文。 它表示在何时何地执行任务(举例来说,立即的,或另一种回调函数机制(比如 setTimeout 或 process.nextTick),或动画帧)。
  • 调度器有一个(虚拟的)时钟。 调度器功能通过它的 getter 方法 now() 提供了“时间”的概念。在具体调度器上安排的任务将严格遵循该时钟所表示的时间。

调度器可以让你规定 Observable 在什么样的执行上下文中发送通知给它的观察者。

import { asyncScheduler, Observable } from 'rxjs';
//我们使用普通的 Observable ,它同步地发出值`1`、`2`、`3`,并使用操作符 `observeOn` 来指定 `async` 调度器发送这些值。
const observable = new Observable(subscriber => {
      subscriber.next(1);
      subscriber.next(2);
      subscriber.next(3);
      subscriber.complete();
    })
      .pipe(
        observeOn(asyncScheduler)
      );

    console.log('just before subscribe');
    observable.subscribe({
      next: x => console.log('got value ' + x),
      error: err => console.error('something wrong occurred: ' + err),
      complete: () => console.log('done'),
    });
    console.log('just after subscribe');
    //你会发现"just after subscribe"在"got value..."之前就出现了
    //just before subscribe
    //just after subscribe
    //got value 1
    //got value 2
    //got value 3
    //done

调度器类型

async 调度器是 RxJS 提供的内置调度器中的一个。可以通过使用 Scheduler 对象的静态属性创建并返回其中的每种类型的调度器。

调度器 目的
null 不传递任何调度器的话,会以同步递归的方式发送通知。用于定时操作或尾递归操作。
queueScheduler 当前事件帧中的队列调度(蹦床调度器)。用于迭代操作。
asapScheduler 微任务的队列调度,它使用可用的最快速的传输机制,比如 Node.js 的 process.nextTick() 或 Web Worker 的 MessageChannel 或 setTimeout 或其他。用于异步转换。
asyncScheduler 使用 setInterval 的调度。用于基于时间的操作符。
animationFrameScheduler 计划将在下一次浏览器内容重新绘制之前发生的任务。 可用于创建流畅的浏览器动画。

Pipeable(操作符)

操作符就是函数,管道操作符本质上是一个纯函数,它将一个Observable作为输入并生成另一个Observable作为输出。订阅输出Observable也将订阅输入Observable。 操作符有两种:

管道操作符是一个将Observable作为其输入并返回另一个Observable的函数。这是一个纯粹的操作:以前的Observable保持不变。

  1. 管道操作符是可以使用语法observableInstance.pipe(operator())传递给Observable的类型。 这些包括filter()mergeMap()。 调用时,它们不会更改现有的Observable实例。 相反,它们返回一个新的Observable,其订阅逻辑基于第一个Observable。

  2. 创建运算符是另一种运算符,可以称为独立函数来创建新的Observable。例如:of(1,2,3)创建一个observable ,该对象将依次发射1、2和3。创建运算符将在后面的部分中详细讨论。

obs.pipe(
  op1(),
  op2(),
  op3(),
  op3(),
)

常用的操作符

finalize<T>(callback: () => void): MonoTypeOperatorFunction<T>:

返回原始Observable,但在Observable完成或发生错误终止时将调用指定的函数。

创建操作符

连接创建操作符

These are Observable creation operators that also have join functionality -- emitting values of multiple source Observables.

转换操作符

过滤操作符

组合操作符

Also see the Join Creation Operators section above.

多播操作符

错误处理操作符

工具操作符

条件和布尔操作符

数学和聚合操作符

(0)

相关推荐

  • NgRx 里 first 和 take(1) 操作符的区别

    take(1) vs first() first() 运算符采用可选的 predicate 函数,并在源完成后没有匹配的值时发出错误通知. 下列代码会报错: import { EMPTY, range ...

  • ES6中常用的10个新特性讲解

    ECMAScript 6(ES6) 目前基本成为业界标准,它的普及速度比 ES5 要快很多,主要原因是现代浏览器对 ES6 的支持相当迅速,尤其是 Chrome 和 Firefox 浏览器,已经支持 ...

  • 设计模式(20) 观察者模式

    观察者模式是一种平时接触较多的模式.它主要用于一对多的通知发布机制,当一个对象发生改变时自动通知其他对象,其他对象便做出相应的反应,同时保证了被观察对象与观察对象之间没有直接的依赖. GOF对观察者模 ...

  • 设计模式-行为型-观察者模式

    观察者模式(Observer): 指多个对象间存在一对多的依赖关系,当一个对象的状态发生改变时,所有依赖于它的对象都得到通知并被自动更新. 观察者模式的角色: 1)抽象目标(Subject):也叫抽象 ...

  • 行为型模式之观察者模式

    在现实世界中,许多对象并不是独立存在的,其中一个对象的行为发生改变可能会导致一个或者多个其他对象的行为也发生改变.例如,某种商品的物价上涨时会导致部分商家高兴,而消费者伤心. 在软件世界也是这样,例如 ...

  • 15 个优雅的 JavaScript 个技巧

    Vue中文社区 2021-10-18 以下文章来源于大迁世界 ,作者前端小智 大迁世界 我要先坚持分享20年,大家来一起见证吧. JavaScript 有很多很酷的特性,大多数初学者和中级开发人员都不 ...

  • 这些 JS 中强大的操作符,总有几个你没听说过

    JS 里的操作符大家每天都在使用,还有一些 ES2020.ES2021 新加的实用操作符,这些共同构成了 JS 灵活的语法生态. 本文除介绍常用的操作符之外,还会介绍 JS 里一些不常用但是很强大的操 ...

  • JS的7种数据类型以及它们的底层数据结构

    年纪大了记性不好(其实是脑子不好使,但又不想承认),有些东西总是容易忘,所以为了便于之后查看干脆记下来,用自己的语言再把一些概念整理一下,都是自己写的,以后再看这些文字也会有亲切感(好像很有道理的亚子 ...

  • RxJs SwitchMapTo 操作符之移花接木

    将每个源值投影到同一个 Observable,该 Observable 在输出 Observable 中使用 switchMap 多次展平. 输入一个 Observable,输出一个 function ...

  • 2.5.3欲开的花头不同画法2,花头的画法,牡丹,国画快速入门

    2.5.3欲开的花头不同画法2,花头的画法,牡丹,国画快速入门

  • 【期权时代】实用篇(五) | 如何快速入门期权?提高学习效率?基础知识、软件、书籍、网站都为你整理好...

    期权小师妹@期权时代 全文共 11179 字  阅读需要 28 分钟 在这个五一假期,小师妹为大家总结了几篇实用的工具贴,一起来回顾一下吧: <实用篇(一) | 期权投资最全的专业术语都在这里了 ...

  • 金口诀月令定神煞,教你快速入门紫薇

    季月定神煞: 1.天赦: ①.口诀:天赦能解一切愁,危中转安喜心头,仕人得赏受提拔,天气忽然转晴天. ②.定位:春戊寅.夏甲午.秋戊申.冬甲子,入课或年.月.日.时上见到. ③.主事:能化解一切祸灾, ...

  • VPLC系列机器视觉运动控制一体机快速入门(十)

    此前,我们依次讲解了软硬件介绍及计数实例.相机的基本使用.基于形状匹配的视觉定位.BLOB有无检测.测量尺寸.机器视觉方案中使用到的标定功能.ZDevelop软件实现识别条形码和二维码,测量点/直线/ ...

  • VPLC系列机器视觉运动控制一体机快速入门(九)

    此前,我们依次讲解了软硬件介绍及计数实例.相机的基本使用.基于形状匹配的视觉定位.BLOB有无检测.测量尺寸.机器视觉方案中使用到的标定功能.ZDevelop软件实现识别条形码和二维码,以及测量点/直 ...

  • VPLC系列机器视觉运动控制一体机快速入门(八)

    此前,我们依次讲解了软硬件介绍及计数实例.相机的基本使用.基于形状匹配的视觉定位.BLOB有无检测.测量尺寸.机器视觉方案中使用到的标定功能以及ZDevelop软件实现识别条形码和二维码的功能. 本期 ...

  • VPLC系列机器视觉运动控制一体机快速入门(七)

    此前,我们依次讲解了软硬件介绍及计数实例.相机的基本使用.基于形状匹配的视觉定位.BLOB有无检测.测量尺寸.机器视觉方案中使用到的标定功能以及使用ZDevelop软件实现坐标标定的方法.本期课程我们 ...

  • VPLC系列机器视觉运动控制一体机快速入门(六)

    此前,我们依次讲解了软硬件介绍及计数实例.相机的基本使用.基于形状匹配的视觉定位.BLOB有无检测以及测量尺寸. 本期课程,正运动技术和大家一起分享和标定有关的详细知识内容. 机器视觉常用的标定功能 ...

  • 快速入门 | 篇一:如何进行运动控制器固件升级?

    zfm文件为控制器固件升级包,根据对应的控制器型号选择对应的固件(不同型号的固件包不一样,确保选择正确的固件包,如需固件升级,请联系厂家).可以使用ZDevelop软件或者zfirmdown工具软件下 ...