首页 > 解决方案 > 使用 mergeMap 订阅 observables 数组

问题描述

我有一些可观察对象和 REST 端点,我订阅/调用并获取数据。

我想从可观察对象和 REST 端点获取所有数据,并使用这些创建一个变量以将其发送到 REST 端点

我正在使用forkJoinmergeMap如下:

const sources: Array<Observable<any>> = [
      of(this.dataStore.trips$),
      of(this.dataStore.passengers$),
      of(this.dataStore.user$),
      of(this.dataStore.user$.pipe(
        mergeMap(user => <Observable<any>> this.userService.getUserIdByEmail(user.email))
      )),
      of(this.toursStore.totalInCart$),
      of(this.toursStore.totalInCart$.pipe(
        mergeMap(total => <Observable<any>> this.currencyService.getUSDConversionRate(total)),
      ))
    ];

forkJoin(sources).subscribe(
      observables => {
        const data = observables.map(observer => this.convertObservableToBehaviorSubject(observer, observer.source.value));

        console.log(data);

        booking = {
          booking: {
            created_at: Date.now(),
            trips: data[0].value,
            passengers: data[1].value,
            total: data[4].value,
            user_id: data[3].value,
            total: data[5].value,
          },
          user: data[2].value
        };

        console.log(booking);
      },
      err => console.log('Error:', err)
    );

将 observable 转换为 BehaviorSubject 的方法:

convertObservableToBehaviorSubject<T>(observable: Observable<T>, initValue: T): BehaviorSubject<T> {
    const subject = new BehaviorSubject(initValue);

    observable.subscribe(
      (x: T) => {
        subject.next(x);
      },
      (err: any) => {
        subject.error(err);
      },
      () => {
        subject.complete();
      },
    );

    return subject;
  }

当我console.log(data)获取所有数据时,但是当我尝试将值分配给变量时,booking我得到未定义的user_idtotal

this.dataStore.user$.pipe(
   mergeMap(user => <Observable<any>> this.userService.getUserIdByEmail(user.email))
)

this.toursStore.totalInCart$.pipe(
        mergeMap(total => <Observable<any>> this.currencyService.getUSDConversionRate(total)),
      )

我期望user_id并且total有数据。

标签: javascriptangulartypescriptrxjs

解决方案


退一步,你可以让这更简单

您不需要用 a 包围您的可观察对象,of(...)因为它们首先已经是可观察对象。of应该用于将对象通过管道传输到可观察对象中。

只需将 forkjoin 与您已经拥有的可观察对象的对象(如字典)一起使用,即。forkJoin({trips: this.datastore.trips$, passengers: this.datastore.passengers$, etc)

这意味着您将立即获得这些值,而不必将它们转换为BehavourSubject.

所以最终结果将是

forkJoin({trips: this.datastore.trips$, passengers: this.datastore.passengers$, etc...})
  .subscribe(data => {

        booking = {
          booking: {
            created_at: Date.now(),
            trips: data.trips,
            passengers: data.passengers,
            ...etc
          }
        };
  })

推荐阅读