首页 > 解决方案 > RxJS forkJoin 管道可观察对象

问题描述

我需要对服务器进行多次调用以保存一些数据,并且每次后续调用都需要来自上一次调用结果的一些数据。尝试使用 forkJoin,但事件顺序没有意义(至少对我而言)。我认为问题出在 .pipe() 调用上,我试图在其中修改下一次调用的输入数据。

所以我有两个问题:

  1. 为什么输出不是我预期的
  2. 有没有办法使用 forkJoin 来完成这项工作(我意识到有很多其他方法可以解决这个问题,所以并不是真的在寻找替代解决方案)

这是一些示例代码或StackBlitz


  let data: { [key: string]: any } = {};

  forkJoin(
      this.saveFirst(data).pipe(
        tap(_ => console.log('saveFirst pipe after')),
        tap(result => data.id = result.id)
      ),
      this.saveSecond(data).pipe(
        tap(_ => console.log('saveSecond pipe after')),
        tap(result => data.name = result.name)
      ),
  ).subscribe(result => console.log('done: data - ', JSON.stringify(data)));

...

  private saveFirst(data: { [key: string]: any }): Observable<any> {
      console.log('saveFirst: start');
      console.log('saveFirst: data - ', JSON.stringify(data));

      // replaced call to server with "of({ id: 1 })" for simplicity of example
      return of({ id: 1 }).pipe(tap(_ => console.log('saveFirst: end')));
  }

  private saveSecond(data: { [key: string]: any }): Observable<any> {
      console.log('saveSecond: start');
      console.log('saveSecond: data - ', JSON.stringify(data));

      // replaced call to server with "of({ name: 'test' })" for simplicity of example
      return of({ name: 'test' }).pipe(tap(_ => console.log('saveSecond: end')));;
  }

我期待以下输出:

saveFirst: start
saveFirst: data -  {}
saveFirst: end
saveFirst pipe after

saveSecond: start
saveSecond: data - {}
saveSecond: end
saveSecond pipe after

done: data -  {"id":1,"name":"test"}

但是得到了这个:

saveFirst: start
saveFirst: data -  {}
saveSecond: start
saveSecond: data -  {}

saveFirst: end
saveFirst pipe after
saveSecond: end
saveSecond pipe after

done: data -  {"id":1,"name":"test"}

标签: angulartypescriptrxjs

解决方案


在这种情况下,您需要使用mergeMap / switchMap 。

this.saveFirst(data).pipe(
          tap(_ => this.actions.push('saveFirst pipe after')),
          tap(result => data.id = result.id),
          switchMap((res)=>{
            return this.saveSecond(data).pipe(
          tap(_ => this.actions.push('saveSecond pipe after')),
          tap(result => data.name = result.name)
        );
          })).subscribe(result => this.actions.push('done: data - ' + JSON.stringify(data)));

上面的代码会产生你需要的结果。当我们想要发出多个请求并且只关心最终结果时使用forkJoin 。

分叉的Stackblitz 。


推荐阅读