首页 > 解决方案 > Angular Rxjs 可观察链与 ngrx 存储

问题描述

我正在使用 Angular 7 和 ngrx 商店。商店包含应用程序状态,我在 OnInit 上的应用程序组件中订阅它。存储中有几个变量,它们是可互换的(用按钮交换)。

这是我在组件中的示例代码。

this.appState.getGasStationSushi().pipe(switchMap((sushi) => {
  this.gsSushi = sushi;
  return this.appState.getStarbucksSushi();
}), switchMap((sushi) => {
  this.sbSushi = sushi;
  return this.service.compare(this.gsSushi, this.sbSushi);
}).subscribe((result)=>{
  console.log(result);
}));

通过在视图中单击按钮,用户可以更改这两个sushi值,这会导致最后一次订阅调用两次,这是有道理的(RxJS)。我可以删除switchMap并写一些类似的东西

-- gas station sushi susbcription 
   -- star bucks sushi susbcription 
      -- compare

我不是这个的超级粉丝,我相信一定有一个rxjs/operator。有人可以提出建议吗?

此外,尝试过forkjoin,但使用 ngrx 商店似乎需要致电firstlast类似以下内容。这里是上面语句forkjoinWithstore的参考

const $sushiObs = [
  this.appState.getGasStationSushi().pipe(first()),
  this.appState.getStarbucksSushi().pipe(first())
];
forkjoin($sushiObs).subscribe((result) => {
  console.log(result);
});

如果我使用上述模式,订阅会第一次触发,但之后根本不会触发。

标签: angulartypescriptrxjs

解决方案


首先,这是一个关于 stackblitz的工作示例。

我没有使用 store,而是创建了一个SushiState返回 observables 的模拟类。

class SushiState {
  private _gas = new BehaviorSubject(1);
  private _starbucks = new BehaviorSubject(1);

  public get gas() {
    return this._gas.asObservable();
  }
  public get starbucks() {
    return this._gas.asObservable();
  }

  public increaseSushi(n = 1) {
    this._gas.next(this._gas.value + n);
    this._starbucks.next(this._starbucks.value + n);
  }

  public static compareSushi(g: number, s: number): string {
    return `gas is ${g}, starbucks is ${s}`;
  }
}

至于组件,这里是代码。

export class AppComponent implements OnInit {
  state: SushiState;

  gasSushi: Observable<number>;
  sbSushi: Observable<number>;

  combined: string;
  combinedTimes = 0;
  zipped: string;
  zippedTimes = 0;

  ngOnInit() {
    this.state = new SushiState;
    this.gasSushi = this.state.gas;
    this.sbSushi = this.state.gas;

    const combined = combineLatest(
      this.gasSushi,
      this.sbSushi,
    ).pipe(
      tap((sushi) => {
        console.log('combined', sushi);
        this.combinedTimes++;
      }),
      map((sushi) => SushiState.compareSushi(sushi[0], sushi[1])),
    );
    combined.subscribe(result => this.combined = result);

    const zipped = zip(
      this.gasSushi,
      this.sbSushi,
    ).pipe(
      tap((sushi) => {
        console.log('zipped', sushi);
        this.zippedTimes++;
      }),
      map((sushi) => SushiState.compareSushi(sushi[0], sushi[1])),
    );
    zipped.subscribe(result => this.zipped = result);
  }

  increaseSushi() {
    this.state.increaseSushi();
  }

}

如果您在 stackblitz 上运行它并检查控制台,您将看到以下行为:

控制台输出

如果我们使用组合的 latest ,我们分别组合 observables 并且只关心最新状态,导致 2 次调用console.log.

我们可以改为使用zip,它会等待两个可观察对象都发出,然后再产生输出。这看起来非常适合我们的“增加两者”按钮,但有一个问题:如果starbucksSushi单独增加(可能来自应用程序的不同部分),zipped版本将等待加油站寿司也得到更新。

要提出第三种解决方案,您可以使用combineLatest组合寿司计数器,然后使用debounceTime运算符在发出输出之前等待一些毫秒数。

const debounced = zip(
  this.gasSushi,
  this.sbSushi,
).pipe(
  tap((sushi) => {
    console.log('debounced', sushi);
    this.debouncedTimes++;
  }),
  map((sushi) => SushiState.compareSushi(sushi[0], sushi[1])),
  debounceTime(100),
);
debounced.subscribe(result => this.debounced = result);

这将对所有来源的变化做出反应,但不会比100ms.

最后,你必须这样做的原因first()

forkJoin在完成后加入可观察对象(只能发生一次,因此它不适合连续流)并且更适合“类似承诺”的工作,例如 HTTP 调用、流程完成等。顺便说一下,如果你只从流中获取一个元素,生成的流在单次发射后完成。

附言

我建议使用async管道来处理 observables(就像我对属性所做的那样

gasSushi: Observable<number>;
sbSushi: Observable<number>;

然后在模板内

<div>
  <h3>starbucks sushi</h3>
  <p>{{sbSushi | async}}</p>
</div>

代替

result => this.zipped = result

我在这个例子中都使用了,所以你可以比较它们。根据我的经验,一旦你停止提前转换“去观察”它们并让async管道完成它的工作,使用可观察对象会变得容易得多。

最重要的是,如果你subscribe在组件的某个地方使用,你应该unsubscribe在组件被销毁时使用——这一点都不难,但是如果我们从不显式订阅,并允许async管道进行订阅,它也会处理销毁我们 :)


推荐阅读