首页 > 解决方案 > 如何正确创建和解析 Observable 的 Observable (RxJS)

问题描述

在我的 Angular 应用程序中,我有多个相互依赖的服务。所以我为他们创建了一个依赖关系图。这是一个简短的示例:

let accountInitialization$: Observable<void>;
let productInitialization$: Observable<void>;

const dependenciesMap = {
  accountService: [],
  productService: [
    accountInitialization$ // There could be one or more dependencies
  ]
}

accountInitialization$ = this.getDependencies(dependenciesMap.accountService)
  .pipe(
    mergeMap(_ => {
      return this.accountService.initialize();
    })
  );

productInitialization$ = this.getDependencies(dependenciesMap.productService)
  .pipe(
    mergeMap(_ => {
      return this.productService.initialize();
    })
  );

accountInitialization$.subscribe(() => {
  this.progressUpdate.next(StartUpProgressUpdate.AccountsInitialized);
});

productInitialization$.subscribe(() => {
  this.progressUpdate.next(StartUpProgressUpdate.ProductsInitialized);
});

forkJoin([accountInitialization$, productInitialization$]).subscribe(
  () => {
    // do some work
  },
  error => {
    console.log(error); // This is where I see the error specified below
  }
);

所以 Product Service 依赖于 Account Service 的初始化。getDependencies 函数如下所示:

private getDependencies(dependencies: Observable<void>[]) {
if (dependencies.length) {
  return forkJoin(dependencies);
}
return EMPTY;

}

我想在这个例子中实现的是,在初始化 Product Service 之前,确保首先初始化 Account Service。现在我收到这个错误

错误类型错误:您在预期流的位置提供了“未定义”。您可以提供 Observable、Promise、Array 或 Iterable。

看起来这些.initialize()函数从未被调用...

我不确定我在这里做错了什么。任何帮助将不胜感激,谢谢。

标签: angulartypescriptrxjs

解决方案


而不是使用forkJoin,你必须使用switchMap。我不知道如何构造它,但像这样:

import { switchMap } from 'rxjs/operators';
...
productInitialization$ = this.accountInitialization$.pipe(
  switchMap(accountInitializtion => {
    console.log('account initialization finished with: ', accountInitialization);
    // switch to this observable once complete
    return this.productService.initialize();
  });
)

对您而言,可能会有所不同,但请查看concatMap, switchMap,mergeMap以从一个Observable到另一个。


推荐阅读