首页 > 解决方案 > RXJS 在管道内组合多个可观察对象

问题描述

我有一个返回一定数量 id 的 API 调用。这些 id 中的每一个都用于进行新的 api 调用。这些 API 调用的结果需要组合成一个对象。

起初,我在第一个 api 调用的 .pipe(map) 运算符中使用了一个循环。在这个循环中,我进行了第二次 api 调用,并且在每个调用中的 .pipe(map) 运算符中,我将在我的角度组件中编辑一个变量。

这不是很漂亮,我实际上想知道这是否是线程安全的。我知道 javascript 是单线程的,但是让多个异步进程弄乱同一个全局变量似乎不太安全。

之后,我只是将第二个 api 调用返回的 observable 存储在一个数组中,方法是遍历 apiCall1 返回的 Id,并使用 forkJoin 相应地订阅和处理每个结果(参见示例)。

然而,这不是很漂亮,我想知道是否有一个我可以在我的管道中使用的操作员?

所以代替(伪代码):

  .pipe(
      map(ids=> {

        let observables = []
        for (const id of ids) {
         observables.push(this.service.getSomeStuff(id));
        }

        forkJoin(...observables).subscribe(dataArray) => {
          for (data of dataArray) {
            //Do something
          }
        });

      }),
      takeWhile(() => this.componentActive),
      catchError(error => {
        console.log(error);
        return throwError(error);
      })
    )
    .subscribe();

是否有一个操作员使它像这样:

  .pipe(
      map(ids=> {

        let observables = []
        for (const id of ids) {
         observables.push(this.service.getSomeStuff(id));
        }

      return observables
      }),
      forkJoin(dataArray => {
          for (data of dataArray) {
            //Do something
          }
        });
      takeWhile(() => this.componentActive),
      catchError(error => {
        console.log(error);
        return throwError(error);
      })
    )
    .subscribe();

标签: angularrxjsrxjs-pipeable-operators

解决方案


这是你可以做的:

sourceObservable$.pipe(
  // depends on your need here you can use mergeMap as well
  switchMap(ids => {
    const observables = ids.map(i => this.service.getSomeStuff(id));
    return forkJoin(observables);
  }),
  tap(joined => {
    // joined will be an array of values of the observables in the same
    // order as you pushed in forkJoin
    for (data of joined) {
      // do something
    }
  }),
  takeWhile(() => this.componentActive),
  catchError(error => {
    console.log(error);
    return throwError(error);
  })
)
.subscribe();

推荐阅读