RxJS快速入门
内容导航
- 内容导航
- RxJS是什么
- RxJS的主要成员
- Observable (可观察对象)
- 创建 Observable
- 订阅 Observables
- 执行 Observables
- 清理 Observable 执行
- Observer (观察者)
- Subscription (订阅)
- Subject (主体)
- 多播的 Observables
- BehaviorSubject
- ReplaySubject
- AsyncSubject
- Scheduler (调度器)
- 调度器类型
- Pipeable(操作符)
- 常用的操作符
- 创建操作符
- 连接创建操作符
- 转换操作符
- 过滤操作符
- 组合操作符
- 多播操作符
- 错误处理操作符
- 工具操作符
- 条件和布尔操作符
- 数学和聚合操作符
- Observable (可观察对象)
RxJS是什么
RxJS 是一个库,它通过使用 observable 序列来编写异步和基于事件的程序。它提供了一个核心类型 Observable,附属类型 (Observer、 Schedulers、 Subjects) 和受 [Array#extras] 启发的操作符 (map、filter、reduce、every, 等等),这些数组操作符可以把异步事件作为集合来处理。
可以把 RxJS 当做是用来处理事件的 Lodash 。
ReactiveX 结合了 观察者模式、迭代器模式 和 使用集合的函数式编程,以满足以一种理想方式来管理事件序列所需要的一切。
RxJS的主要成员
- Observable (可观察对象): 表示一个概念,这个概念是一个可调用的未来值或事件的集合。
- Observer (观察者): 一个回调函数的集合,它知道如何去监听由 Observable 提供的值。
- Subscription (订阅): 表示 Observable 的执行,主要用于取消 Observable 的执行。
- Operators (操作符): 采用函数式编程风格的纯函数 (pure function),使用像
map
、filter
、concat
、flatMap
等这样的操作符来处理集合。 - Subject (主体): 相当于 EventEmitter,并且是将值或事件多路推送给多个 Observer 的唯一方式。
- Schedulers (调度器): 用来控制并发并且是中央集权的调度员,允许我们在发生计算时进行协调,例如
setTimeout
或requestAnimationFrame
或其他。
Observable (可观察对象)
RxJS 是基于观察者模式和迭代器模式以函数式编程思维来实现的。RxJS 中含有两个基本概念:Observables 与 Observer。Observables 作为被观察者,是一个值或事件的流集合;而 Observer 则作为观察者,根据 Observables 进行处理。Observables 是多个值的惰性推送集合。
- of():用于创建简单的Observable,该Observable只发出给定的参数,在发送完这些参数后发出完成通知.
- from():从一个数组、类数组对象、promise、迭代器对象或者类Observable对象创建一个Observable.
- fromEvent(),:把event转换成Observable.
- range():在指定起始值返回指定数量数字.
- interval():基于给定时间间隔发出数字序列。返回一个发出无限自增的序列整数,可以选择固定的时间间隔进行发送。
- timer():创建一个Observable,该Observable在初始延时之后开始发送并且在每个时间周期后发出自增的数字
创建 Observable
import { Observable,of,from,fromEvent,range,interval } from 'rxjs';
import { map } from 'rxjs/operators';
const Observable1 = new Observable(subscriber => {
try{
subscriber.next(1);
subscriber.next(2);
subscriber.next(3);
setTimeout(() => {
subscriber.next(4);
subscriber.complete();
}, 1000);
} catch (err) {
subscriber.error(err);//传递一个错误对象,如果捕捉到异常的话。
}
});
const Observable2 = from([
{ name: 'Dave', age: 34, salary: 2000 },
{ name: 'Nick', age: 37, salary: 32000 },
{ name: 'Howie', age: 40, salary: 26000 },
{ name: 'Brian', age: 40, salary: 30000 },
{ name: 'Kevin', age: 47, salary: 24000 },
]);
const Observable3 = of("Dave","Nick");//把所有参数组合到数组,逐个提供给消费者
const Observable4 = range(1,10);
const Observable5 = interval(3000);//从零开始每3000毫秒自增并提供给消费者
const Observable6 = timer(3000,1000);//等待3000毫秒后,从零开始每1000毫秒自增并提供给消费者
订阅 Observables
因为 Observable 执行可能会是无限的,并且观察者通常希望能在有限的时间内中止执行,所以我们需要一个 API 来取消执行。因为每个执行都是其对应观察者专属的,一旦观察者完成接收值,它必须要一种方法来停止执行,以避免浪费计算能力或内存资源。
当调用了 observable.subscribe
,观察者会被附加到新创建的 Observable 执行中。这个调用还返回一个对象,即 Subscription
(订阅):
import { Observable,of,from,fromEvent,range,interval } from 'rxjs';
const observable1 = range(1,10);
observable1.subscribe(
num => {
console.log(num);
},
err => console.log(err),
() => console.log("Streaming is over.")
);
执行 Observables
Observable 执行可以传递三种类型的值:
- "Next" 通知: 发送一个值,比如数字、字符串、对象,等等。
- "Error" 通知: 发送一个 JavaScript 错误 或 异常。
- "Complete" 通知: 不再发送任何值。
"Next" 通知是最重要,也是最常见的类型:它们表示传递给观察者的实际数据。"Error" 和 "Complete" 通知可能只会在 Observable 执行期间发生一次,并且只会执行其中的一个。
import { Observable,of,from,fromEvent,range,interval } from 'rxjs';
import { map } from 'rxjs/operators';
const observable = new Observable(subscriber => {
try{
subscriber.next(1);
subscriber.next(2);
subscriber.next(3);
subscriber.complete();
subscriber.next(4); // 因为违反规约,所以不会发送
} catch (err) {
subscriber.error(err);//传递一个错误对象,如果捕捉到异常的话。
}
});
清理 Observable 执行
因为 Observable 执行可能会是无限的,并且观察者通常希望能在有限的时间内中止执行,所以我们需要一个 API 来取消执行。因为每个执行都是其对应观察者专属的,一旦观察者完成接收值,它必须要一种方法来停止执行,以避免浪费计算能力或内存资源
当你订阅了 Observable,你会得到一个 Subscription ,它表示进行中的执行。只要调用
unsubscribe()
方法就可以取消执行。
import { Observable,of,from,fromEvent,range,interval } from 'rxjs';
const observable = new Observable(subscriber => {
let intervalID = setInterval(() => {
subscriber.next('hi');
}, 1000);
// 提供取消和清理 interval 资源的方法
return function unsubscribe() {
clearInterval(intervalID);
};
});
let subscription = observable.subscribe(x => console.log(x));
subscription.unsubscribe();
Observer (观察者)
观察者是由 Observable 发送的值的消费者。观察者只是一组回调函数的集合,每个回调函数对应一种 Observable 发送的通知类型:next
、error
和 complete
。下面的示例是一个典型的观察者对象:
观察者只是有三个回调函数的对象,每个回调函数对应一种 Observable 发送的通知类型。
observable.subscribe(
next: x => console.log('Observer got a next value: ' + x),
error: err => console.error('Observer got an error: ' + err),
complete: () => console.log('Observer got a complete notification')
);
Subscription (订阅)
Subscription 是表示可清理资源的对象,通常是 Observable 的执行。Subscription 有一个重要的方法,即 unsubscribe
,它不需要任何参数,只是用来清理由 Subscription 占用的资源。在上一个版本的 RxJS 中,Subscription 叫做 "Disposable" (可清理对象)。
Subscription 基本上只有一个
unsubscribe()
函数,这个函数用来释放资源或去取消 Observable 执行。
import { Observable,of,from,fromEvent,range,interval } from 'rxjs';
var observable1 = interval(1000);
var subscription1 = observable1.subscribe(x => console.log(x));
// 稍后:
// 这会取消正在进行中的 Observable 执行
// Observable 执行是通过使用观察者调用 subscribe 方法启动的
subscription1.unsubscribe();
var observable2 = interval(400);
var observable3 = interval(300);
var subscription2 = observable2.subscribe(x => console.log('first: ' + x));
var childSubscription = observable3.subscribe(x => console.log('second: ' + x));
subscription2.add(childSubscription);
setTimeout(() => {
// subscription 和 childSubscription 都会取消订阅
subscription2.unsubscribe();
}, 1000);
Subject (主体)
RxJS Subject 是一种特殊类型的 Observable,它允许将值多播给多个观察者,所以 Subject 是多播的,而普通的 Observables 是单播的(每个已订阅的观察者都拥有 Observable 的独立执行)。
Subject 像是 Observable,但是可以多播给多个观察者。Subject 还像是 EventEmitters,维护着多个监听器的注册表。
还有一些特殊类型的 Subject:
BehaviorSubject
、ReplaySubject
和AsyncSubject
。
每个 Subject 都是 Observable 。 - 对于 Subject,你可以提供一个观察者并使用 subscribe
方法,就可以开始正常接收值。从观察者的角度而言,它无法判断 Observable 执行是来自普通的 Observable 还是 Subject 。
在 Subject 的内部,subscribe
不会调用发送值的新执行。它只是将给定的观察者注册到观察者列表中,类似于其他库或语言中的 addListener
的工作方式。
每个 Subject 都是观察者。 - Subject 是一个有如下方法的对象: next(v)
、error(e)
和 complete()
。要给 Subject 提供新值,只要调用 next(theValue)
,它会将值多播给已注册监听该 Subject 的观察者们。
import { Subject,from } from 'rxjs';
//我们为 Subject 添加了两个观察者,然后给 Subject 提供一些值
var subject1 = new Subject();
subject1.subscribe({
next: (v) => console.log('observerA: ' + v)
});
subject1.subscribe({
next: (v) => console.log('observerB: ' + v)
});
subject1.next(1);
subject1.next(2);
//因为 Subject 是观察者,这也就在意味着你可以把 Subject 作为参数传给任何 Observable 的 subscribe 方法
var subject2 =new Subject();
subject2.subscribe({
next: (v) => console.log('observerA: ' + v)
});
subject2.subscribe({
next: (v) => console.log('observerB: ' + v)
});
var observable = from([1, 2, 3]);
observable.subscribe(subject2); // 你可以提供一个 Subject 进行订阅
多播的 Observables
“多播 Observable” 通过 Subject 来发送通知,这个 Subject 可能有多个订阅者,然而普通的 “单播 Observable” 只发送通知给单个观察者。
多播 Observable 在底层是通过使用 Subject 使得多个观察者可以看见同一个 Observable 执行。
在底层,这就是 multicast
操作符的工作原理:观察者订阅一个基础的 Subject,然后 Subject 订阅源 Observable 。
import { Subject } from 'rxjs/internal/Subject';
import { take, multicast } from 'rxjs/operators';
const source = timer(1000, 2500).pipe(take(5));
const subject = new Subject();
subject.subscribe({
next: (v) => console.log('observerC: ' + v)
});
subject.subscribe({
next: (v) => console.log('observerD: ' + v)
});
const multicasted = source.pipe(multicast(subject));
multicasted.subscribe({
next: (v) => console.log('observerA: ' + v)
});
multicasted.subscribe({
next: (v) => console.log('observerB: ' + v)
});
source.subscribe(subject);
BehaviorSubject
Subject 的其中一个变体就是 BehaviorSubject
,它有一个“当前值”的概念。它保存了发送给消费者的最新值。并且当有新的观察者订阅时,会立即从 BehaviorSubject
那接收到“当前值”。
BehaviorSubjects 适合用来表示“随时间推移的值”。举例来说,生日的流是一个 Subject,但年龄的流应该是一个 BehaviorSubject 。
import { BehaviorSubject } from 'rxjs';
//BehaviorSubject 使用值0进行初始化,当第一个观察者订阅时会得到0。第二个观察者订阅时会得到值2,尽管它是在值2发送之后订阅的。
const subject = new BehaviorSubject(0); // 0是初始值
subject.subscribe({
next: (v) => console.log('observerA: ' + v)
});
subject.next(1);
subject.next(2);
subject.subscribe({
next: (v) => console.log('observerB: ' + v)
});
subject.next(3);
ReplaySubject
ReplaySubject
类似于 BehaviorSubject
,它可以发送旧值给新的订阅者,但它还可以记录 Observable 执行的一部分。
ReplaySubject
记录 Observable 执行中的多个值并将其回放给新的订阅者。除了缓冲数量,你还可以指定 window time (以毫秒为单位)来确定多久之前的值可以记录。
import { ReplaySubject } from 'rxjs';
const subject = new ReplaySubject(3); // 为新的订阅者缓冲最后3个值
subject.subscribe({
next: (v) => console.log('observerA: ' + v)
});
subject.next(1);
subject.next(2);
subject.next(3);
subject.next(4);
subject.subscribe({
next: (v) => console.log('observerB: ' + v)
});
subject.next(5);
//我们缓存数量100,但 window time 参数只设置了120毫秒
const subject = new ReplaySubject(100, 120 /* windowTime */);
subject.subscribe({
next: (v) => console.log('observerA: ' + v)
});
let i = 1;
setInterval(() => subject.next(i++), 200);
setTimeout(() => {
subject.subscribe({
next: (v) => console.log('observerB: ' + v)
});
}, 1000);
AsyncSubject
AsyncSubject 是另一个 Subject 变体,只有当 Observable 执行完成时(执行 complete()
),它才会将执行的最后一个值发送给观察者。
AsyncSubject 和 last() 操作符类似,因为它也是等待
complete
通知,以发送一个单个值。
import { AsyncSubject } from 'rxjs';
const subject = new AsyncSubject();
subject.subscribe({
next: (v) => console.log('observerA: ' + v)
});
subject.next(1);
subject.next(2);
subject.next(3);
subject.next(4);
subject.subscribe({
next: (v) => console.log('observerB: ' + v)
});
subject.next(5);
subject.complete();
Scheduler (调度器)
调度器控制着何时启动 subscription 和何时发送通知。它由三部分组成:
- 调度器是一种数据结构。 它知道如何根据优先级或其他标准来存储任务和将任务进行排序。
- 调度器是执行上下文。 它表示在何时何地执行任务(举例来说,立即的,或另一种回调函数机制(比如 setTimeout 或 process.nextTick),或动画帧)。
- 调度器有一个(虚拟的)时钟。 调度器功能通过它的 getter 方法
now()
提供了“时间”的概念。在具体调度器上安排的任务将严格遵循该时钟所表示的时间。
调度器可以让你规定 Observable 在什么样的执行上下文中发送通知给它的观察者。
import { asyncScheduler, Observable } from 'rxjs';
//我们使用普通的 Observable ,它同步地发出值`1`、`2`、`3`,并使用操作符 `observeOn` 来指定 `async` 调度器发送这些值。
const observable = new Observable(subscriber => {
subscriber.next(1);
subscriber.next(2);
subscriber.next(3);
subscriber.complete();
})
.pipe(
observeOn(asyncScheduler)
);
console.log('just before subscribe');
observable.subscribe({
next: x => console.log('got value ' + x),
error: err => console.error('something wrong occurred: ' + err),
complete: () => console.log('done'),
});
console.log('just after subscribe');
//你会发现"just after subscribe"在"got value..."之前就出现了
//just before subscribe
//just after subscribe
//got value 1
//got value 2
//got value 3
//done
调度器类型
async
调度器是 RxJS 提供的内置调度器中的一个。可以通过使用 Scheduler
对象的静态属性创建并返回其中的每种类型的调度器。
调度器 | 目的 |
---|---|
null |
不传递任何调度器的话,会以同步递归的方式发送通知。用于定时操作或尾递归操作。 |
queueScheduler |
当前事件帧中的队列调度(蹦床调度器)。用于迭代操作。 |
asapScheduler |
微任务的队列调度,它使用可用的最快速的传输机制,比如 Node.js 的 process.nextTick() 或 Web Worker 的 MessageChannel 或 setTimeout 或其他。用于异步转换。 |
asyncScheduler |
使用 setInterval 的调度。用于基于时间的操作符。 |
animationFrameScheduler |
计划将在下一次浏览器内容重新绘制之前发生的任务。 可用于创建流畅的浏览器动画。 |
Pipeable(操作符)
操作符就是函数,管道操作符本质上是一个纯函数,它将一个Observable作为输入并生成另一个Observable作为输出。订阅输出Observable也将订阅输入Observable。 操作符有两种:
管道操作符是一个将Observable作为其输入并返回另一个Observable的函数。这是一个纯粹的操作:以前的Observable保持不变。
管道操作符是可以使用语法
observableInstance.pipe(operator())
传递给Observable的类型。 这些包括filter()
和mergeMap()
。 调用时,它们不会更改现有的Observable实例。 相反,它们返回一个新的Observable,其订阅逻辑基于第一个Observable。创建运算符是另一种运算符,可以称为独立函数来创建新的Observable。例如:
of(1,2,3)
创建一个observable ,该对象将依次发射1、2和3。创建运算符将在后面的部分中详细讨论。
obs.pipe(
op1(),
op2(),
op3(),
op3(),
)
常用的操作符
finalize<T>(callback: () => void): MonoTypeOperatorFunction<T>
:
返回原始Observable,但在Observable完成或发生错误终止时将调用指定的函数。
创建操作符
- ajax
- bindCallback
- bindNodeCallback
- defer
- empty
- from
- fromEvent
- fromEventPattern
- generate
- interval
- of
- range
- throwError
- timer
- iif
连接创建操作符
These are Observable creation operators that also have join functionality -- emitting values of multiple source Observables.
转换操作符
- buffer
- bufferCount
- bufferTime
- bufferToggle
- bufferWhen
- concatMap
- concatMapTo
- exhaust
- exhaustMap
- expand
- groupBy
- map
- mapTo
- mergeMap
- mergeMapTo
- mergeScan
- pairwise
- partition
- pluck
- scan
- switchMap
- switchMapTo
- window
- windowCount
- windowTime
- windowToggle
- windowWhen
过滤操作符
- audit
- auditTime
- debounce
- debounceTime
- distinct
- distinctKey
- distinctUntilChanged
- distinctUntilKeyChanged
- elementAt
- filter
- first
- ignoreElements
- last
- sample
- sampleTime
- single
- skip
- skipLast
- skipUntil
- skipWhile
- take
- takeLast
- takeUntil
- takeWhile
- throttle
- throttleTime
组合操作符
Also see the Join Creation Operators section above.
多播操作符
错误处理操作符
工具操作符
- tap
- delay
- delayWhen
- dematerialize
- materialize
- observeOn
- subscribeOn
- timeInterval
- timestamp
- timeout
- timeoutWith
- toArray