RxJS 一些有趣的 operate 操作符
前言 🔗
RxJS
一些有趣的 operate
操作符
之前我们写了关于 RxJS
中的几个概念,Observable
, Subject
, pipe
, Scheduler
等
而 RxJS
之所以很方便,很大程度上取决于它封装好的一些 operate
操作符
前文的 pipe
管道也属于操作符的一种
正文 🔗
RxJS
中,个人理解其实就是两大类的操作符,一类是创建类操作符,一类就是管道类
不过文档中给它分类的更加详细
There are operators for different purposes, and they may be categorized as: creation, transformation, filtering, joining, multicasting, error handling, utility, etc. In the following list you will find all the operators organized in categories.
创建类操作符 🔗
在 RxJS
中,创建类的操作符其实不多,有几个比较常见的
fromEvent 🔗
fromEvent
使得我们能够进行 dom
元素事件的绑定
import { fromEvent } from "rxjs";
// 给 window 绑定点击事件
fromEvent(window, "click").subscribe((e) => console.log(e));
这样子每次我们点击就可以输出一次值,该值为事件 event
对象
如果我们想要解绑该事件
我们可以调用返回的 Subscription
的 unsubscribe
方法
import { fromEvent } from "rxjs";
const subscription = fromEvent(window, "click").subscribe((e) => console.log(e));
// 解绑
subscription.unsubscribe();
interval 🔗
setInterval
的一个包装,使得我们可以启用一个定时器
import { interval } from "rxjs";
// 每 1s 执行一次
interval(1000).subscribe((val) => console.log(val));
每次会输出该次对应的索引(从 0
开始)
取消该定时器,同样调用 Subscription
的 unsubscribe
方法
import { interval } from "rxjs";
const subscription = interval(1000).subscribe((val) => console.log(val));
// 取消定时器
subscription.unsubscribe();
of 🔗
把传入的每个值都经过 Observable
发送出来
import { of } from "rxjs";
of(1, 2, 3, 4).subscribe((val) => console.log(val));
传入 1, 2, 3, 4
,那么输出就是 1, 2, 3, 4
了
range 🔗
生成一个序列,然后经由 Observable
发送出来
import { range } from "rxjs";
range(2, 5).subscribe((val) => console.log(val));
注意,这里的参数不是起始和结束,而是起始和个数
即输出不是 2, 3, 4, 5
而是 2, 3, 4, 5, 6
generate 🔗
能够像 for
那样生成值
import { generate } from "rxjs";
generate({
initialState: 1,
condition: (v) => v < 5,
iterate: (v) => v + 1,
}).subscribe((val) => console.log(val));
上面的代码表示初始值为 1
, 结束条件为值小于 5
, 然后每次都会自增 1
和下面的代码等价
new Observable((subscriber) => {
for (let i = 1; i < 5; i++) {
subscriber.next(i);
}
}).subscribe((val) => console.log(val));
这样子我们就能从 1
输出到 4
merge 🔗
合并多个 Observable
为单个 Observable
,然后一一输出对应的值
import { fromEvent, interval, map, merge } from "rxjs";
const click$ = fromEvent(window, "click");
const interval$ = interval(1000).pipe(map((val) => `定时器:${val}`));
merge(click$, interval$).subscribe((val) => console.log(val));
经过 merge
之后,新的 Observable
会在每次点击和间隔 1s
输出一次对应的值
forkJoin 🔗
合并多个 Observable
为单个 Observable
,收集每个 Observable
完成前的一个值,然后输出
import { forkJoin, fromEvent, interval, map, take } from "rxjs";
const click$ = fromEvent(window, "click").pipe(take(2));
const interval$ = interval(1000).pipe(
map((val) => `定时器:${val}`),
take(3)
);
forkJoin([click$, interval$]).subscribe((val) => console.log(val));
在点击两次且等待超过三秒之后,会输出一个数组,这个数组包含二者的值
concat 🔗
合并多个 Observable
为单个 Observable
,和 merge
不同的是,concat
会在第一个 Observable
完成之后再订阅第二个 Observable
,以此类推
import { concat, fromEvent, interval, map, take } from "rxjs";
const click$ = fromEvent(window, "click").pipe(take(2));
const interval$ = interval(1000).pipe(
map((val) => `定时器:${val}`),
take(3)
);
concat(click$, interval$).subscribe((val) => console.log(val));
这里我们必须点击两次之后,定时器才会启动,如果点击事件不使用 take(2)
限定次数的话,那么定时器永远不会开始
因为点击事件永远无法完成
race 🔗
合并多个 Observable
为单个 Observable
,只会输出一个发出值的 Observable
import { fromEvent, interval, map, race, take } from "rxjs";
const click$ = fromEvent(window, "click").pipe(take(2));
const interval$ = interval(1000).pipe(
map((val) => `定时器:${val}`),
take(3)
);
race(click$, interval$).subscribe((val) => console.log(val));
此时如果立马进行点击,那么定时器就不会输出,如果等到定时器输出第一个值,那么点击事件就不会生效
定时器“赢”
点击事件“赢”
zip 🔗
合并多个 Observable
为单个 Observable
,每个 Observable
发出的每个值会组成一个数组作为新的值发出
import { fromEvent, interval, map, take, zip } from "rxjs";
const click$ = fromEvent(window, "click").pipe(take(2));
const interval$ = interval(1000).pipe(
map((val) => `定时器:${val}`),
take(3)
);
zip(click$, interval$).subscribe((val) => console.log(val));
每个点击和每次定时器执行作为值发送出去,新值数量由这些 Observable
发出的值的最少的个数来确定
在这里点击事件只会发生两次,所以新的 Observable
只会发出两个值
定时器的第三个值无法被匹配,故不会发送到新的 Observable
对象中
一旦某个传入的 Observable
完成,其他传入的 Observable
会被 unsubscribe
,所以不用担心资源释放问题
combineLatest 🔗
合并多个 Observable
为单个 Observable
,与 zip
类似,但是 combineLatest
会取最近的一次值组成一个新值然后发出
import { combineLatest, fromEvent, interval, map, take, zip } from "rxjs";
const click$ = fromEvent(window, "click").pipe(take(2));
const interval$ = interval(1000).pipe(
map((val) => `定时器:${val}`),
take(3)
);
combineLatest([click$, interval$]).subscribe((val) => console.log(val));
这里假如我们等三秒之后在点击,那么输出的值是 [event, '定时器:2']
而 zip
这里会输出 [event, '定时器:0']
除了上面这些常用的, RxJS
还封装了 ajax
,不过前端一般都是用 axios
来进行 http
请求
所以感觉用处不大
还有一些比较少见的创建操作符,比如 defer
, timer
, iif
等,都不难,看一下文档一下子就懂了
管道操作符 🔗
在之前的文章 {% post_link RxJS-使用之-pipe-管道 RxJS 使用之 pipe 管道 %} 中,我们已经列举了一些常见的管道操作符了
比如 map
, filter
, first
, last
等
这些一般都比较容易理解
现在我们来列举一些高级一点的管道
takeUntil 🔗
传入一个 Observable
,在这个 Observable
完成时,源 Observable
则完成
import { fromEvent, interval, map, take, takeUntil } from "rxjs";
const interval$ = interval(1000).pipe(
map((val) => `定时器:${val}`),
take(3)
);
const click$ = fromEvent(window, "click").pipe(takeUntil(interval$));
click$.subscribe((e) => console.log(e));
这里如果我们等 3s
之后再点击,那么点击事件不会生效,因为定时器已经完成了
在 3s
内的点击事件都是可以发出的
之前我们在 Subject
那一章讲过这个管道,在 antd
( angular
版本) 中非常常见
在需要对全局可观察对象订阅的地方,加上 pipe(takeUntil(destroy$))
,然后在 ngOnDestroy
钩子中完成 destroy$
即可比较优雅的管理这些订阅过程
distinct 🔗
对于源 Observable
发送的一系列的值,会经过去重之后再发送出来
对于基本类型来说,使用起来非常简单
import { distinct, of} from "rxjs";
of(1, 2, 3, 3, 4, 4)
.pipe(distinct())
.subscribe((val) => console.log(val));
这里源 Observable
发出 1, 2, 3, 3, 4, 4
经过去重之后会输出 1, 2, 3, 4
对于引用类型,可以通过一个函数来提取唯一的部分进行比较,一般都是以对象的某个属性来去重,比如唯一 id
等
当然如果不传,那么就是按照引用的地址进行比较,即 obj1 === obj2
这种形式
import { distinct, of } from "rxjs";
const s1 = {
id: 1,
name: "lwf",
};
const s2 = {
id: 2,
name: "lwx",
};
const s3 = {
id: 2,
name: "lwx",
};
of(s1, s2, s3)
.pipe(
// 这里用 id 作为唯一标识进行去重
distinct((s) => s.id)
)
.subscribe((val) => console.log(val));
由于 s2
和 s3
的 id
是一样的,所以最终只会输出 s1
和 s2
distinctUntilChanged 🔗
这个就比较有意思了,从名字上看,也是去重,但是去重的模式不一样
这个管道的去重指和前一个值进行比较,如果相同,那么该值就不会被输出
import { distinctUntilChanged, of } from "rxjs";
of(1, 1, 2, 1, 1, 2)
.pipe(distinctUntilChanged())
.subscribe((val) => console.log(val));
如果我们使用 distinct
,上面的代码输出为 1, 2
而 distinctUntilChanged
会输出 1, 2, 1, 2
当然这个管道也支持传入函数来确定唯一的值
比如,现在我们希望在窗口水平 resize
的时候进行一些操作,可以这么写
import { distinctUntilChanged, fromEvent, map } from "rxjs";
fromEvent(window, "resize")
.pipe(
// 拿到每一次 resize 的窗口大小
map(() => ({
width: window.innerWidth,
height: window.innerHeight,
})),
// 在 width 发生改变时
distinctUntilChanged((pre, cur) => pre.width === cur.width)
)
.subscribe((val) => console.log("width change"));
效果如下:
当然,如果你想换成垂直 resize
,只要更改为 distinctUntilChanged((pre, cur) => pre.height === cur.height)
即可
可以想象一下如果我们不使用 RxJS
的话,这段逻辑写起来还是挺复杂的,而且免不得得产生一些局部变量(记录 pre
值)
retry 🔗
在 Observable
发生错误的时候可以重新订阅
import { Observable, retry } from "rxjs";
new Observable((subscriber) => {
subscriber.next(1);
subscriber.next(2);
subscriber.error(new Error("error"));
})
.pipe(retry(2))
.subscribe({
next(val) {
console.log(val);
},
error(e) {
console.error(e);
},
});
在这里我们使用 retry(2)
表示如果源 Observable
发生了错误,那么重新订阅,最大次数为 2
次
也就是两次后,如果成功了,那无事发生,如果还是报错,那么错误就会通过 error
回调抛出
可以看到前两次的 error
并没有被抛出,只有最后一次的 error
被抛出
这个管道对于 http
请求还是挺有用的,直接就可以设置重拾次数
elementAt 🔗
直接取某个索引的元素,很像 js
中通过 []
取值
import { elementAt, range } from "rxjs";
range(1, 5)
.pipe(elementAt(2))
.subscribe((val) => console.log(val));
取索引 2
的值,对于 1, 2, 3, 4, 5
,也就是数字 3
效果如下:
single 🔗
只有在匹配到一个值的时候才会输出,其他情况下,没有值匹配或者匹配了多个值,则会报错
import { of, single } from "rxjs";
// 匹配成功
of(1, 2, 3, 4)
.pipe(single((v) => v === 1))
.subscribe((val) => console.log(val));
// 匹配多个
of(1, 2, 3, 4)
.pipe(single((v) => v === 5))
.subscribe((val) => console.log(val));
// 未匹配
of(1, 1, 3, 4)
.pipe(single((v) => v === 1))
.subscribe((val) => console.log(val));
效果如下:
combineLatestAll, concatAll, mergeAll 🔗
和 combineLatest
的效果一样,不过 combineLatestAll
作为管道主要用于高阶 Observable
import { fromEvent, map, interval, take, combineLatestAll } from "rxjs";
const clicks = fromEvent(window, "click");
const higherOrder = clicks.pipe(
map(() => interval(0).pipe(take(2))),
take(2)
);
higherOrder.pipe(combineLatestAll()).subscribe((x) => console.log(x));
这里每次点击产生一个定时器,每个定时器只输出两个值,只响应两次点击事件
意味着会有两个定时器,然后通过 combineLatestAll
结合两个定时器,完全可以看成如下代码
import { combineLatest } from "rxjs";
combineLatest([interval(0).pipe(take(2)), interval(0).pipe(take(2))]).subscribe(
(val) => console.log(val)
);
效果如下:
除了 combineLatestAll
,其他的一些创建类操作符也有其对应的管道操作符
concat
对应concatAll
merge
对应mergeAll
exhaustAll 🔗
用于高阶 Observable
,当订阅第一个 Observable
,接下来在这个 Observable
完成前就订阅的 Observable
被抛弃
感觉很绕,我们可以写个例子
import { fromEvent, interval, take, map, exhaustAll } from "rxjs";
fromEvent(document, "click")
.pipe(
map(() => interval(1000).pipe(take(3))),
exhaustAll()
)
.subscribe((x) => console.log(x));
这里我们使用 map
让生成的 Observable
成为高阶的,然后使用 exhaustAll
当我们点击的时候,会输出 0, 1, 2
,如果在 0, 1, 2
输出还没完成的时候,继续点击,那么这个 Observable
是会被抛弃的
也就是只有等到 0, 1, 2
输出完成,我们再次点击,那么才会继续输出 0, 1, 2
效果如下:
switchAll 🔗
用于高阶 Observable
,这个和 exhaustAll
相反,如果在前一个 Observable
还未完成时,得到了下一个 Observable
那么当前订阅的 Observable
会被完成,然后去订阅这个新的 Observable
import { fromEvent, interval, take, map, tap, switchAll } from "rxjs";
fromEvent(document, "click")
.pipe(
map(() => interval(1000).pipe(take(3))),
switchAll()
)
.subscribe((x) => console.log(x));
这个例子和 exhaustAll
的一样,只不过我们使用 switchAll
来合并 Observable
在输出 0, 1, 2
输出未完成之前,继续点击的话,那么会重新输出 0, 1, 2
效果如下:
后记 🔗
RxJS
的系列基本就结束了,接下来学习 Angular
,在 RxJS
方面应该就问题不大了