首页 > 解决方案 > RxJS - 按顺序执行 observables

问题描述

我正在尝试按顺序运行几个可观察对象,同时保存它们的值,但是我很难找到正确的方法来做到这一点。

我正在尝试做的一个抽象轮廓是创建一个“FruitBasket”,首先创建苹果,然后按顺序创建梨。

createApples: Observable<Apple[]> {
  // creates apples...
}

createPears: Observable<Pear[]> {
  // creates pears...
}

createFruitBasket(apples: Apple[], pears: Pear[]): Observable<FruitBasket>
{
  // create a fruit basket
}

我希望在尝试创建梨之前完成苹果的创建,并且应该在尝试创建水果篮之前完成梨的创建。

使用 async/await 表示法,期望的结果是:

makeABasket() {
  let apples = await createApples();
  let pears = await createPears();
  let fruitBasket = await createFruit(apples, pears);
}

作为第一次尝试,我提出了以下建议:

makeABasket() {
  let apples: Apple[] = [];
  let pears: Pear[] = [];
  let basket: FruitBasket;

  this.createApples().subscribe((newApples) => {
    apples = apples.concat(newApples);

    this.createPears().subscribe((newPears) => {
      pears = pears.concat(newPears);

      this.createFruitBasket(apples, pears).subscribe((newBasket) => {
        basket = newBasket;
      });
    });
  });
}

我知道有一种更好的方法可以做到这一点,使用 RxJS 运算符,但我似乎无法找到正确的方法。请注意,我不想使用forkJoin,因为我希望创建函数按顺序运行,而不是并行运行。

在此先感谢您的任何建议。

标签: rxjsobservable

解决方案


要并行运行,您需要concat组合功能。它的文档:https ://www.learnrxjs.io/learn-rxjs/operators/combination/concat

简单地做

concat( // <- creates sequence.
  this.createApples(), // <- first one.
  this.createPears(), // <- once apple has been completed executes this one.
).pipe(
  scan((result, item) => [...result, item], []), // <- accumulating tuple
  skip(1), // <- apples aren't enough, skipping 1st emit.
  concatMap(([apples, pears]) => this.createFruitBasket(apples, pears)), // <- now we can set the basket.
).subscribe();

推荐阅读