首页 > 解决方案 > 当前一个流接收到第一个值时,RxJS 合并

问题描述

我想要的简短解释:

我有一组可观察对象,我希望所有这些对象的结果都来自同一个流。最初我希望激活第一个 observable,并且一旦 observable 接收到它的第一个值,我希望激活下一个 observable。

对我想要的东西的详细解释:

我正在寻找的解决方案介于 RxJS 创建运算符“merge”和“concat”之间。

我有一系列可观察的。随着时间的推移,每个可观测数据都会有几个排放。我想将这些 observables 排成队列,这样一开始只有第一个 observables 被激活(这类似于“concat”的工作方式)

然后,一旦第一个 observable 收到它的第一个值,我希望第二个 observable 被激活。一旦第二个 observable 收到它的第一个值,我希望第三个 observable 被激活,依此类推。(这与“concat”不同。“concat”等待前一个 observable 完成,但在我的用例中,我想等待前一个 observable 接收到它的第一个值)

只有一个结果流会从所有激活的 observable 中发出值(这类似于“合并”的工作方式)

我不认为有一个特定的 RxJS 运算符可以解决这个问题,但我希望可以通过混合多个运算符找到解决方案。

标签: javascriptrxjs

解决方案


级联合并

这应该按照您的描述进行,每次从前一个流中看到第一个值时,它只是递归地合并一个新流。

function cascadeMerge<T>(...observables: Observable<T>[]): Observable<T>{
  if(observables.length < 1) return EMPTY;
  return observables[0].pipe(
    mergeMap((v,i) => {
      if(i === 0 && observables.length > 1){
        return concat(
          of(v),
          cascadeMerge(...observables.slice(1))
        )
      }
      return of(v);
    })
  );
}

这样做的好处是:

merge(
  a$,
  a$.pipe(first(), switchMapTo(b$))
)

就是你subscribe每个流只来一次。您不必担心热与冷 observables 或者您是否正在多播。

cascadeMerge(a$, b$, c$).subscribe(console.log);

推荐阅读