之前对Rxjs的concatMap、switchMap、exhaustMap、mergeMap这四个映射操作符的理解有欠缺,以下是对这四个操作符的深入理解。首先在理解这四个操作符之前一定要理解的概念就是源数据
和映射Observable
,我们可以把这两部分想象成流水线工厂的原材料
和加工机器
,一定要独立的理解这两部分,这四个操作符不同的地方就是在加工机器
这一部分使用了不同的逻辑,以此来产出不同的结果
对于一下代码,interval(2000)
就相当于外部数据源,mergeMap((val) => of(val).pipe(delay(5000)))
就是这个加工机器
,最终会将内部多个Ob合并为一个Ob输出。在实际使用和理解时,我们可以不要过多关注最后合并的Ob,这只是原理,只需要理解源数据和源数据到达时操作符内部是怎么处理的即可。
1 | const source = interval(2000); |

## 1.mergeMap mergeMap操作符应该是这四个操作符里比较好理解的,简单来说就是只要当外部数据源到达时,会立即映射Observable并订阅输出,这些内部的Observable互相独立,也就是`并发处理`。
例如以下代码:(interval只是用来模拟源数据)
1 | //外部源Observable每2秒输出一个值 |

2.concatMap
concatMap操作符在数据源到达时同样会立即映射Observable,但不同点在于,这些Observable会按照映射的先后顺序串行处理
例如同样的逻辑:
1 | //外部源Observable每2秒输出一个值 |

通过这个操作符我们一定要理解的一点就是,操作符内部的这些Observable映射(create)和订阅(subscribe)一定是分开的。同时对于concatMap来说,如果源数据产生的速度快于concatMap的处理速度,那么这些待订阅的Observable会被挤压在内存里,造成内存压力。
3.switchMap
在讲switchMap时,需要我们引入一个新的概念,就是操作符的状态机制,例如switchMap
,exhaustMap
内部会维护一个状态信息来追踪当前执行的Observable进行到了哪一步,这个状态变量只有在Observable的回调里才会发生改变,同时用来通知操作符当前活跃的Observable和状态。只有当前Observable执行了complete回调活error回调后,Observable才会释放锁,彻底转为不活跃状态。
所以说对于switchMap来说,会通过追踪当前活跃的Observable和状态,来判断当前Observable是否已经结束,如果此时接收到了外部数据源信息,同时当前Observable处于活跃状态,则会取消当前的Observable并重新映射并订阅新的Observable。
例:
1 | //通过merge操作符创建两个并发进行的数据源,分别在第1秒和第3秒发射数据 |

这里需要额外注意是,如果我们将switchMap((val) => of(val).pipe(delay(5000)))的5秒延迟改为2秒延迟,虽然在代码上第二个数据1
也是在第3秒到达,而第一个数据对应的Ob1
也是在第3秒输出,实际上最后的结果并不受影响。
1 | //通过merge操作符创建两个并发进行的数据源,分别在第1秒和第3秒发射数据 |
源数据of(1).pipe(delay(3000))
在第0秒时已经安排了第3秒执行的定时器回调任务。而Ob1
是在第1秒安排的2秒后也就是同样是第3秒执行的定时器回调任务.但是由于JS事件循环机制的原因,源数据因为更早的加入到了任务队列,所以拥有更高的优先级。
所以在第3秒时,源数据更早的到达,而此时第一个源数据对应的Observable由于还没执行完毕,所以当前switchMap判断此时仍处于活跃状态,固会取消Ob1
,重新构建并订阅新的Observable。所以最终会在第5秒输出结果1
4.exhaustMap
exhaustMap于switchMap的区别在于,若当前Observable处于活跃阶段时,exhaustMap接收到了新的源数据,新到达的值会被忽略(不构建新的Observable),直到当前Observable结束,同时只构建不活跃状态时接收到的第一个源数据(这里需要特别注意,exhaustMap不会记录并保存处于活跃状态时收到的数据,只有当前Observable结束后,收到的第一个源数据才会被重新构建并订阅)。
举个简单的例子,exhaustMap就好比一个加工机械臂,一次只能拿取并处理一个零件,而在处理期间,因为流水线在源源不断的往前进行,期间到达的新零件会被忽略掉,直到机械臂处理好零件可以再次工作时,会抓起到达的第一个零件。
例:
1 | //源值在经过exhaustMap时会先延迟1s输出,再延迟后输出值时正好源Observable的下一个值抵达,此时会被忽略掉 |

5.总结
操作符 | 相同点 | 不同点 | 缺点 |
---|---|---|---|
concatMap | 内部仅有一个活跃的Observable | 串行处理 | 内存压力 |
exhaustMap | 内部仅有一个活跃的Observable | 新的源数据到达时,若当前有活跃且未结束的Observable,忽略掉该值,直到Observable结束,再次构建和订阅新到达的值 | |
switchMap | 内部仅有一个活跃的Observable | 新的源数据到达时,会取消掉当前活跃的Observable,并构建和订阅新的Observable | |
mergeMap | 内部有多个(并发进行)活跃的Observable(默认不限制并发数量) |