首页 > 解决方案 > RxJS:即使在源 observable 上使用了 `shareReplay()`,`throwError()` 也会为每个 observable 单独执行

问题描述

我在 observable ( ) 上使用 RxJSshareReplay()运算符在courses$其他两个 observable (beginnerCourses$advancedCourses$) 之间共享 observable 流。它工作正常,单个 API 调用响应在两个可观察对象之间共享成功。

但是,当涉及到错误时,这些 observable 不会共享错误,并且会在浏览器控制台中看到错误被抛出两次。运营商不shareReplay()也分享错误吗?这是预期的行为吗?

const http$ = createHttpObservable('/api/courses');

const courses$ = http$
  .pipe(
    map(res => res['payload'] ),
    shareReplay(),
    catchError(err => {
      return throwError(err);
    })
  );

this.beginnerCourses$ = courses$
  .pipe(
    map(courses => courses
      .filter(course => course.category === 'BEGINNER')));

this.advancedCourses$ = courses$
  .pipe(
    map(courses => courses
      .filter(course => course.category === 'ADVANCED')));

}

标签: rxjsrxjs6rxjs-pipeable-operators

解决方案


我认为这是预期的行为,有点出乎意料的是您遇到了 2 个不同的错误。

shareReplayReplaySubject在数据消费者和数据生产者之间放置 a 。当错误通知到达时,ReplaySubjectin use 将向所有注册订阅者发送相同的错误通知:

error(err: any) {
  if (this.closed) {
    throw new ObjectUnsubscribedError();
  }
  this.hasError = true;
  this.thrownError = err;
  this.isStopped = true;
  const { observers } = this;
  const len = observers.length;
  const copy = observers.slice();
  for (let i = 0; i < len; i++) {
    copy[i].error(err);
  }
  this.observers.length = 0;
}

资源

但是在使用shareReplay时,如果发生错误,当新订阅者即将订阅时,ReplaySubject正在使用的将被另一个替换。说它正在被替换,这也需要重新订阅 source

所以我认为所有订阅者都应该收到相同的错误通知,只要他们已经是ReplaySubject订阅者列表的一部分。否则,当有新订阅者进来时,源将被重新订阅。


您可以做的是防止接收ReplaySubject错误通知并允许其订阅者按原样接收它,是使用materializedematerialize运算符:

const courses$ = http$
  .pipe(
    materialize(), // Everything as a `next` notification
    map(res => res['payload'] ),
    shareReplay(),
    dematerialize() // Back to the original event
  );

使用这种方法,如果注册的订阅者收到错误通知,它将被取消订阅,这意味着它也将从ReplaySubject订阅者列表中删除。但它ReplaySubject仍然存在,并且不会在后续订阅者中被替换。


另外,我认为这是非常多余的:

catchError(err => throwError(err));

推荐阅读