首页 > 解决方案 > 将 rxjs 5 重构为 rxjs 6 代码 - retryWhen

问题描述

我已经用 rxjs 5 表示法编写了这个服务代码:

      constructor(private _authService: AuthService) {
    this._ws$ = WebSocketSubject.create<any>({
      url: WS_URL,
      protocol: this._authService.token
    });

    this._ws$.retryWhen(errors => {
        // switchMap to retrieve the source error
        return errors.switchMap(sourceErr => {
            console.log('Retry WS.', sourceErr);
            return Observable.timer(1000).map(() => Observable.of(true));
          }
        );
      }
    ).subscribe(
      msg => {
        if ('channel_name' in msg) {
          console.log('Channel name', msg.channel_name);
          this._channelName = msg.channel_name;
          this._authService.channelName = msg.channel_name;
        }
        this.subject$.next(msg);
        console.log(msg);
      },
      err => {
        console.log(err);
        this.subject$.error(err);
      },
      () => this.subject$.complete()
    );
  }

我正在尝试将其重构为 rxjs 6 有效代码,现在我得到了这个:

constructor(private _authService: AuthService) {
this._ws$ = WebSocketSubject.create({
  url: WS_URL,
  protocol: this._authService.token
});

retryWhen(() => {
    // switchMap to retrieve the source error
    return switchMap(() => {
        return timer(1000).pipe(map(() => of(true)));
      }
    );
  }
).subscribe(
  msg => {
    if ('channel_name' in msg) {
      this._channelName = msg.channel_name;
      this._authService.channelName = msg.channel_name;
    }
    this.subject$.next(msg);
  },
  err => {
    this.subject$.error(err);
  },
  () => this.subject$.complete()
);

但我收到以下错误:

TS2345: Argument of type '() => OperatorFunction<{}, Observable<boolean>>' is not assignable to parameter of type '(errors: Observable<any>) => Observable<any>'.
    Type 'OperatorFunction<{}, Observable<boolean>>' is not assignable to type 'Observable<any>'.
    Property '_isScalar' is missing in type 'OperatorFunction<{}, Observable<boolean>>'.

我不知道在这里使用 retryWhen 函数的最佳方法是什么,以及如何在其中使用 WebSocketSubject。

标签: javascriptangulartypescriptrxjs

解决方案


Rxjs 6 中上述代码的实现如下所示

 this._ws$.pipe(
  retryWhen(errors =>
    errors.pipe(
      tap(val => console.log('Retry WS.', val)),
      delay(1000)
    )
  )).subscribe( msg => {
    if ('channel_name' in msg) {
      this._channelName = msg.channel_name;
      this._authService.channelName = msg.channel_name;
    }
    this.subject$.next(msg);
  },err => {
    this.subject$.error(err);
  },() => this.subject$.complete()
  );

有关使用 Rxjs 6 时重试的更多信息,您可以参考这里
https://www.learnrxjs.io/operators/error_handling/retrywhen.html


推荐阅读