之前对Rxjs的concatMap、switchMap、exhaustMap、mergeMap这四个映射操作符的理解有欠缺,以下是对这四个操作符的深入理解。首先在理解这四个操作符之前一定要理解的概念就是源数据映射Observable,我们可以把这两部分想象成流水线工厂的原材料加工机器,一定要独立的理解这两部分,这四个操作符不同的地方就是在加工机器这一部分使用了不同的逻辑,以此来产出不同的结果

对于一下代码,interval(2000)就相当于外部数据源,mergeMap((val) => of(val).pipe(delay(5000)))就是这个加工机器,最终会将内部多个Ob合并为一个Ob输出。在实际使用和理解时,我们可以不要过多关注最后合并的Ob,这只是原理,只需要理解源数据和源数据到达时操作符内部是怎么处理的即可。

1
2
3
4
5
6
const source = interval(2000);
source
.pipe(mergeMap((val) => of(val).pipe(delay(5000))))
.subscribe((r) => {
console.log(r);
});

## 1.mergeMap mergeMap操作符应该是这四个操作符里比较好理解的,简单来说就是只要当外部数据源到达时,会立即映射Observable并订阅输出,这些内部的Observable互相独立,也就是`并发处理`

例如以下代码:(interval只是用来模拟源数据)

1
2
3
4
5
6
7
8
9
10
//外部源Observable每2秒输出一个值
const source = interval(2000);
source
.pipe(mergeMap((val) => of(val).pipe(delay(5000)))) //因为mergeMap需要构建一个新的Ob,所以内部必须返回一个新的Ob,
.subscribe((r) => {
console.log(r);
});

//7秒后输出第一个值,之后每2秒继续输出
//7000ms-> 0 -9000ms-> 1 -11000ms-> 2 -13000ms-> 3

2.concatMap

concatMap操作符在数据源到达时同样会立即映射Observable,但不同点在于,这些Observable会按照映射的先后顺序串行处理

例如同样的逻辑:

1
2
3
4
5
6
7
8
9
10
//外部源Observable每2秒输出一个值
const source = interval(2000);
source
.pipe(concatMap((val) => of(val).pipe(delay(5000))))
.subscribe((r) => {
console.log(r);
});

//7秒后输出第一个值,之后每5秒继续输出
//7000ms-> 0 -12000ms-> 1 -17000ms-> 2 -22000ms-> 3

通过这个操作符我们一定要理解的一点就是,操作符内部的这些Observable映射(create)和订阅(subscribe)一定是分开的。同时对于concatMap来说,如果源数据产生的速度快于concatMap的处理速度,那么这些待订阅的Observable会被挤压在内存里,造成内存压力。

3.switchMap

在讲switchMap时,需要我们引入一个新的概念,就是操作符的状态机制,例如switchMap,exhaustMap内部会维护一个状态信息来追踪当前执行的Observable进行到了哪一步,这个状态变量只有在Observable的回调里才会发生改变,同时用来通知操作符当前活跃的Observable和状态。只有当前Observable执行了complete回调活error回调后,Observable才会释放锁,彻底转为不活跃状态。

所以说对于switchMap来说,会通过追踪当前活跃的Observable和状态,来判断当前Observable是否已经结束,如果此时接收到了外部数据源信息,同时当前Observable处于活跃状态,则会取消当前的Observable并重新映射并订阅新的Observable

例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
//通过merge操作符创建两个并发进行的数据源,分别在第1秒和第3秒发射数据
const source = merge(
of(0).pipe(delay(1000)),
of(1).pipe(delay(3000))
)

source
.pipe(switchMap((val) => of(val).pipe(delay(5000))))
.subscribe((r) => {
console.log(r);
});

//8秒后输出值
//8000ms-> 1

这里需要额外注意是,如果我们将switchMap((val) => of(val).pipe(delay(5000)))的5秒延迟改为2秒延迟,虽然在代码上第二个数据1也是在第3秒到达,而第一个数据对应的Ob1也是在第3秒输出,实际上最后的结果并不受影响。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
//通过merge操作符创建两个并发进行的数据源,分别在第1秒和第3秒发射数据
const source = merge(
of(0).pipe(delay(1000)),
of(1).pipe(delay(3000))
)

source
.pipe(switchMap((val) => of(val).pipe(delay(2000))))
.subscribe((r) => {
console.log(r);
});

//5秒后输出值
//5000ms-> 1

源数据of(1).pipe(delay(3000))在第0秒时已经安排了第3秒执行的定时器回调任务。而Ob1是在第1秒安排的2秒后也就是同样是第3秒执行的定时器回调任务.但是由于JS事件循环机制的原因,源数据因为更早的加入到了任务队列,所以拥有更高的优先级。

所以在第3秒时,源数据更早的到达,而此时第一个源数据对应的Observable由于还没执行完毕,所以当前switchMap判断此时仍处于活跃状态,固会取消Ob1,重新构建并订阅新的Observable。所以最终会在第5秒输出结果1

4.exhaustMap

exhaustMap于switchMap的区别在于,若当前Observable处于活跃阶段时,exhaustMap接收到了新的源数据,新到达的值会被忽略(不构建新的Observable),直到当前Observable结束,同时只构建不活跃状态时接收到的第一个源数据(这里需要特别注意,exhaustMap不会记录并保存处于活跃状态时收到的数据,只有当前Observable结束后,收到的第一个源数据才会被重新构建并订阅)。

举个简单的例子,exhaustMap就好比一个加工机械臂,一次只能拿取并处理一个零件,而在处理期间,因为流水线在源源不断的往前进行,期间到达的新零件会被忽略掉,直到机械臂处理好零件可以再次工作时,会抓起到达的第一个零件。

例:

1
2
3
4
5
6
7
8
//源值在经过exhaustMap时会先延迟1s输出,再延迟后输出值时正好源Observable的下一个值抵达,此时会被忽略掉
const source = interval(1000);
source
.pipe(exhaustMap((val) => of(val).pipe(delay(1000))))
.subscribe((r) => {
console.log(r);
});
//2000ms-> 0 -4000ms->2 -6000ms->4
这里同样要注意,由于JS时间循环机制,再第2秒,4秒时...,虽然代码层面源数据和当前活跃Observable执行的时间一致,但是由于任务的入列时间不同,源数据拥有更高的优先级。

5.总结

操作符 相同点 不同点 缺点
concatMap 内部仅有一个活跃的Observable 串行处理 内存压力
exhaustMap 内部仅有一个活跃的Observable 新的源数据到达时,若当前有活跃且未结束的Observable,忽略掉该值,直到Observable结束,再次构建和订阅新到达的值
switchMap 内部仅有一个活跃的Observable 新的源数据到达时,会取消掉当前活跃的Observable,并构建和订阅新的Observable
mergeMap 内部有多个(并发进行)活跃的Observable(默认不限制并发数量)