首页 > 解决方案 > 如何使用 RxJS 正确链接请求

问题描述

我只是想学习 Angular 和 RxJS(Observables),但我在以正确的方式链接请求时遇到了一些问题。

场景如下:

  1. 我运行请求 A 并获得一个包含对象数组的 JSON 对象。
  2. 对于这些对象中的每一个,我都需要执行请求 B,该请求 B 又返回一个带有对象数组的 JSON 对象。
  3. 对于来自请求 B 的响应中的每个对象,我需要执行返回一个简单 JSON 对象的请求 C。
  4. 现在我必须存储从请求 C 收到的所有对象并用信息扩展它们(信息来自请求 D、E 和 F 的答案)。
  5. 只有当所有请求都完成后,我才想显示我的应用程序的 UI,在此之前只有一个加载微调器应该是可见的。

我当前的解决方案为我提供了所有必要的信息,但我找不到等到所有请求完成的方法。它看起来像这样:

private fetchData(): void {
  this.backendServie.getObjectA().pipe(
    flatMap((objects: IObjectA[]) => {
      // return objects the get them one by one
      return objects
    }),
    flatMap((object: IObjectA) => {
      return this.backendService.getObjectB(object.id)
    }),
    flatMap((objects: IObjectB[]) => {
      // return objects the get them one by one
      return objects
    }),
    flatMap((object: IObjectB) => {
      return this.backendService.getObjectC(object.id)
    }),
    flatMap((object: IObjectC) => {
      // convert object to make additional fields available
      const extendedObject: IObjectCExtended = object as IObjectCExtended;
      this.backendService.getAdditionalInformationD(object.id).subscribe((info: string) >= {
        extendedObject.additionalInfoD = info;
      });
      this.backendService.getAdditionalInformationE(object.id).subscribe((info: string) >= {
        extendedObject.additionalInfoE = info;
      });
      this.backendService.getAdditionalInformationF(object.id).subscribe((info: string) >= {
        extendedObject.additionalInfoF = info;
      })
      return extendedObject;
    })
  ).subscribe((extendedObject: IObejectCExtended) => {
    this.objects.push(extendedObject);
  })
}

不幸的是,这种链接请求的方式感觉是错误的,正如我所说,我无法找到一种方法来检查所有请求是否都已完成并且 UI 只是一点一点地填满。

我希望我的问题可以理解并提前感谢您的帮助。

此致

标签: angularrxjsobservable

解决方案


如前所述,这种设计效率很低。因此,最好更改您的后端 - 正如评论中已经提到的那样。

如果您无法更改后端,则可以使用switchMap和的组合对请求进行分组forkJoin

一次精简所有条目

您可以为要调用的每个 API 一次简化所有条目。所以基本上:

  1. 获取所有对象As
  2. 获取每个 objectA 的所有 objectB
  3. 获取每个objectB的所有objectC
  4. 获取每个对象的所有添加C
  5. 每次添加都扩展所有 objectC
private fetchData(): void {
  // get your initial object
  this.backendServie.getObjectA().pipe(

    // switch to forkJoin, which waits until all inner observables complete and returns them as an array
    switchMap((objects: IObjectA[]) => forkJoin(
       // map all your objects to an array of observables which will then be waited on by forkJoin
       objects.map(obj => this.backendService.getObjectB(obj.id))
    )),

    // Again, switch to another forkJoin
    switchMap((objects: IObjectB[]) => forkJoin(
      // Again, map all your objects to observables that forkJoin will collect and wait on
       objects.map(obj => this.backendService.getObjectC(obj.id))
    )),

    // switch to another forkJoin that retrieves all your extensions for every object
    switchMap((objects: IObjectC[]) => forkJoin(
       // map each object to a forkJoin that retrieves the extensions for that object and map it to the extended object
       objects.map(obj => forkJoin([
         this.backendService.getAdditionalInformationD(obj.id),
         this.backendService.getAdditionalInformationE(obj.id),
         this.backendService.getAdditionalInformationF(obj.id)
       ]).pipe(
         map(([infoD, infoE, infoF]) => ({
            ...obj,
            additionalInfoD: infoD,
            additionalInfoE: infoE,
            additionalInfoF: infoF
         }))
       )

      /* Alternatively, you can use the dictionary syntax to shorten this
       objects.map(obj => forkJoin({
         additionalInfoD: this.backendService.getAdditionalInformationD(obj.id),
         additionalInfoE: this.backendService.getAdditionalInformationE(obj.id),
         additionalInfoF: this.backendService.getAdditionalInformationF(obj.id)
       }).pipe(map(additionalInfo => ({...obj, ...additionalInfo })))
      */
    ))

  // You're now getting a list of the extended objects
  ).subscribe((extendedObjects: IObejectCExtended[]) => {
    this.objects = extendedObjects;
  });
}

为了使代码更清晰,您可以将不同的部分分组到不同的函数中。这可能看起来像这样。

private fetchData(): void {
  this.fetchExtendedObjects()
    .subscribe(extendedObjects => this.objects = extendedObjects);
}

private fetchExtendedObjects():Observable<IObjectCExtended[]> {
  return this.backendServie.getObjectA().pipe(
    switchMap(objectsA => this.getAllObjectsB(objectsA)),
    switchMap(objectsB => this.getAllObjectsC(objectsB)),
    switchMap(objectsC => this.extendAllObjectsC(objectsC))
  )
}

private getAllObjectsB(objects: IObjectA[]):Observable<IObjectB[]> {
  return forkJoin(objects.map(obj => this.backendService.getObjectB(obj.id)));
}

private getAllObjectsC(objects: IObjectB[]):Observable<IObjectC[]> {
  return forkJoin(objects.map(obj => this.backendService.getObjectC(obj.id)));
}

private extendAllObjectsC(objects: IObjectC[]):Observable<IObjectCExtended[]> {
  return forkJoin(objects.map(obj => this.extendObjectC(obj)));
}

private extendObjectC(object: IObjectC):Observable<IObejectCExtended> {
  objects.map(obj => forkJoin({
    additionalInfoD: this.backendService.getAdditionalInformationD(obj.id),
    additionalInfoE: this.backendService.getAdditionalInformationE(obj.id),
    additionalInfoF: this.backendService.getAdditionalInformationF(obj.id)
  }).pipe(map(additionalInfo => ({...obj, ...additionalInfo })))
}

分别精简每个条目

作为对上述内容的优化,您可以单独精简每个对象,这会给您带来小的性能提升。总体而言,这应该不会产生太大影响,因为您仍然必须等待所有请求完成,但是如果您的某些 API 对某些对象的速度较慢,这可能会有所帮助。

这基本上意味着:

  1. 获取所有对象As
    1. 对于每个 objectA 并行获取 objectB
    2. 对于每个 objectB 并行获取 objectC
    3. 对于每个 objectC 获取所有扩展(对于每个 objectC 并行)
    4. 对于每个 objectC,使用所有扩展扩展 objectC(对于每个 objectC 并行)
  2. 将所有扩展的 objectC 组合成一个数组
private fetchData(): void {
  // get your initial object
  this.backendServie.getObjectA().pipe(

    switchMap((objects: IObjectA[]) => forkJoin(
      // Switch each objectA to an observable that retrieves objectB
      // In contrast to the first version, this is done for each objectA separately

      objects.map(obj => this.backendService.getObjectB(obj.id).pipe(

         // Switch each objectB to an observable that retrieves objectC
         switchMap((object: IObjectB) => this.backendService.getObjectC(obj.id)),

         // Switch each objectC to an observable that retrieves all of the
         // extensions an combines them with objectC
         switchMap((object: IObjectC) => 
           // Retrieves all extensions and provides them as a dictionary
           forkJoin({
             additionalInfoD: this.backendService.getAdditionalInformationD(obj.id),
             additionalInfoE: this.backendService.getAdditionalInformationE(obj.id),
             additionalInfoF: this.backendService.getAdditionalInformationF(obj.id)
           }).pipe(
             // combine the original objectC with all the extensions
             map(additionalInfo => ({...object, ...additionalInfo}))
           )
         )
      ))
    ))
  // You're now getting a list of the extended objects
  ).subscribe((extendedObjects: IObejectCExtended[]) => {
    this.objects = extendedObjects;
  });
}

同样,为了使其更具可读性,您可以将各个部分分成函数:

private fetchData(): void {
  this.fetchExtendedObjects().subscribe(objects => this.objects = objects);
}

private fetchExtendedObjects():Observable<IObjectCExtended[]> {
  return this.backendServie.getObjectA().pipe(
    switchMap((objects: IObjectA[]) => this.getExtendedObjectsC(objects))
  );
}

private getExtendedObjectsC(objects: IObjectA[]):Observable<IObjectCExtended[]> {
  return forkJoin(objects.map(obj => this.getExtendedObjectC(obj)));
}

private getExtendedObjectC(objectA: IObjectA):Observable<IObjectCExtended> {
  return this.backendService.getObjectB(objectA.id).pipe(
    switchMap((object: IObjectB) => this.backendService.getObjectC(obj.id)),
    switchMap((object: IObjectC) => this.extendObjectC(object))
  );
}

private extendObjectC(object: IObjectC):Observable<IObejectCExtended> {
  objects.map(obj => forkJoin({
    additionalInfoD: this.backendService.getAdditionalInformationD(obj.id),
    additionalInfoE: this.backendService.getAdditionalInformationE(obj.id),
    additionalInfoF: this.backendService.getAdditionalInformationF(obj.id)
  }).pipe(map(additionalInfo => ({...obj, ...additionalInfo })))
}

推荐阅读