首页 > 解决方案 > RxJS:如何触发 2 个并行请求,其中一个是可选的,超时

问题描述

我有一个案例,我必须同时触发 2 个 GET 请求:

  1. 其中第一个是强制性的。我必须等待响应,如果弹出一些错误,我有一个异常处理,我也可以取消我的 2. 请求。
  2. 第二个是可选的。如果出现错误,我可以忽略这种情况。我想等待最大值。此调用需要 5 秒“更多”,如果需要更长时间,我想取消请求(我知道我无法取消触发的请求,而只是忽略返回的值/或返回的错误)。因此,如果 1. 通话可能需要 20 秒。2. 通话可能会等待最多。25 秒。如果 1. 通话只需要 1 秒。2. 通话不能等待超过 6 秒。等等

我如何用 rxjs 实现这个?

我知道我可以压缩多个请求,但到目前为止我看到的所有示例都只有 1 个错误处理块,但在这里我需要区分两种错误情况。

提前致谢

标签: angularrxjsrxjs6

解决方案


我有更多的解决方法而不是解决方案。您的要求是触发并行请求,并根据第一个请求响应取消第二个请求。

可以使用并行请求完成,forkJoin但所有可观察对象一起解决,

merge()也会触发并行请求,但任何响应都可以按任何顺序出现。使用 merge() 我们将无法识别哪个响应来自哪个 Observable。如果您可以自由修改返回的 observable 并添加一个标志来指示 Observable 索引,那么您可以使用一些额外的标志来实现它,代码如下所示:

export class AppComponent  {
  name = 'Angular';
  obsOne = of('First Obs').pipe(map((res) => {
    return {
      firstObs: true,
      result: res
    }
  }))
  obsTwo = of('Second Obs').pipe(delay(6000))

  secondObsReturned = false
  timerHandle
  obsSubcription: Subscription;

  ngOnInit() {
    this.obsSubcription = merge(this.obsOne, this.obsTwo).subscribe((data) => {

      // you can add all this logic in pipe(map()) instead of handling in subscribe

      console.log(`data returned`, data)
      // some appropriate checks here
      if (typeof data === 'object' && data.hasOwnProperty('firstObs')) {
        if (!this.secondObsReturned) {
          // can use rxjs timer here
        this.timerHandle = setTimeout(() => {
          console.log('Delayed more than 5 seconds');
          this.obsSubcription.unsubscribe();
        }, 5000)
        }
      }
      else {
        // this is the second onservable (which may have come early)
        this.secondObsReturned = true;
      }
    })
  }
}

在此处查看示例:https ://stackblitz.com/edit/angular-s6wkk2


编辑

所以,我在想一些方法来避免改变返回的 Observable 并且我想出了CombineLatest. combine latest 的事情是,它第一次会在两个 Observable 中等待一个值,之后即使任何 Observable 解析,它也会发出。

要使用它,还有一个约束。例如,您需要知道 Observables 永远不会返回的特定值,例如false,如果您知道 Observables 永远不会返回false(或任何默认值),那么您可以使用 BehaviorSubjects 和 combineLatest。使用永远无法返回的值初始化 BehaviorSubjects。

您将需要点击 observable 为主题添加值。

// give appropriate types
subjectOne = <any> new BehaviorSubject(false); // will contain value of the first observable
subjectTwo = <any> new BehaviorSubject(false); // will contain value of the second observable
takeUntilSub = new Subject(); // use this to stop the subscriptions

obsOne = of('First Obs')
  .pipe(
    tap((value) => {
      this.subjectOne.next(value);
    }),
    catchError((e) => {
      // if an Error occurs in first then you don't want to proceeed at all
      // add an error in the subjectOne, this will stop the combineLatest stream.
      this.subjectOne.error('Observable one errored')
      return throwError;(e)
    })
  )

obsTwo = of('Second Obs')
  .pipe(
    delay(6000),
    tap((value) => {
      this.subjectTwo.next(value);
    }),
    catchError((e) => {
      // if you want to continue the stream, you need to handle the error and return a success.
      // no need to populate the subject coz you don't care about this error
      return of(e)
    })
  )

secondObsReturned = false
timerHandle;

ngOnInit() {

  // calling the actual Observables here.
  merge(this.obsOne, this.obsTwo).pipe(takeUntil(this.takeUntilSub)).subscribe()

  // this will be called once for the very first time giving values as false for both of them (or the emitted initial values)
  // after that when any one of them resolves, flow will come here
  combineLatest(this.subjectOne, this.subjectTwo).pipe(takeUntil(this.takeUntilSub)).subscribe(([dataFromObsOne, dataFromObsTwo]) => {

    console.log(`data received: ${dataFromObsOne} and ${dataFromObsTwo}`)

    if (dataFromObsTwo !== false) {
      // second observable was resolved
      this.secondObsReturned = true;
      if (this.timerHandle) {
        clearTimeout(this.timerHandle);
      }
    }

    if (dataFromObsOne !== false) {
      // first observable resoved
      if (!this.secondObsReturned) {
        // if second obs hasn't already been resolved then start a timer.
        this.timerHandle = setTimeout(() => {
          console.log('Delayed more than 5 seconds');
          this.takeUntilSub.next(true);   // stop all subscriptions
        }, 5000)
      }
    }
  })
}

在此处查看示例:代码链接


推荐阅读