首页 > 解决方案 > 基于前一个 Observable 的节流

问题描述

我有以下设置,每 3 秒会向服务器发出一个新的 HTTP 请求。

  getData(param1: string): Observable<any> {
    return timer(0, 3000).pipe(
      switchMap(() => this.http.get(param1))
    );
  }

如果给定的请求花费超过 3 秒,switchMap()(我认为)将取消它并启动一个新请求。

现在,我想这样做,如果一个请求花费的时间超过 3 秒,它会等待它完成,然后再触发另一个请求。就上下文而言,这个想法是,如果请求存在性能问题,我的前端不会被卡住并过早取消请求。

我有点得到这个与以下工作:

currentObs: Observable<any>;

getData(param1: string): Observable<any> {
  return timer(0, 3000).pipe(
    throttle(_ => this.currentObs),
    switchMap(() => {
      this.currentObs = this.http.get(param1)
      return this.currentObs;
    })
  );
}

这将跟踪currentObs当前 HTTP 请求的可观察对象。然后它将它传递给一个throttle()方法,以便timer()忽略通常提示的新请求中的值,直到请求 ( currentObs) 完成。

这似乎可行,但有点尴尬,因为我需要将一些状态保留在pipe(). 这也有点令人困惑,因为限制是基于它之后发生的事件。我一直在寻找一种将结果传递switchMap()到的方法,throttle()但首先我没有找到,其次,这不会导致throttle()在管道的错误一侧吗?

有没有更简洁的方法来使用 RxJS 实现这一点?

编辑:

随着@Mrk Sef 对更优雅解决方案的回答和@kvetis 对处理错误的警告,我最终得到了以下管道,它将发出请求,成功后等待 3 秒,然后发出另一个请求。如果请求失败,它将等待 3 秒并发出另一个请求。然后从顶部开始。

getData(param1: string): Observable<any> {
  return this.http.get(param1).pipe(
    repeatWhen(s => s.pipe(
      delay(3000)
    )),
    retryWhen(s => s.pipe(
      delay(3000)
    ))
  );
}

标签: rxjsangular-httpclientrxjs-observablesrxjs-pipeable-operators

解决方案


您的解决方案是一个非常优雅的解决方案。你可以让你的手更脏,走出 observables 的世界,把状态保持在一个简单的回调中。但我会说你正确地解决了这个问题。

只是要小心,如果请求失败,那么整个计时器就会失败。如果您想继续下一个请求,即使前一个请求失败,您也需要在两者中恢复switchMapcurrentObs由于throttle需要接收一个值以使管道继续,您不应该只恢复到EMTPY. 让我们发出 null。

getData(param1: string): Observable<any> {
  return timer(0, 3000).pipe(
    throttle(_ => this.currentObs),
    switchMap(() => {
      this.currentObs = this.http.get(param1).pipe(
          catchError(e => {
              console.error(e);
              return of(null); // so the throttle continues with next value
      return this.currentObs;
    }),
    filter(identity) // use identity from RxJS so we filter out the null
  );
}

一般来说,您要达到的目标称为背压。你可以谷歌“RxJS 背压”并想出不同的技术。在大多数情况下,如果没有外部 Observable 将信息反馈给源 Observable,您将无法实现您想要的。


推荐阅读