首页 > 解决方案 > RxJS:WebSocket 重新连接处理

问题描述

我创建了一个服务来处理 WebSocket 连接以发送和接收消息,我正在处理一些可能发生断开连接的情况以及如何从中恢复,但我错过了浏览器本身的情况(所以to speak) 下线时,它会在再次navigator.onLine发出true值时自动重试。

到目前为止,通过使用retryWhen运算符,我能够创建一个新会话并重新启动它(如果服务器端出现问题),直到达到最大重试次数。但是,通过使用navigator.onLine(尽管它不可​​靠,我知道,但它适用于我们正在使用的浏览器),当客户端离线时,我可以重新启动重新连接过程。

我可以观察到这个在线检查器:

private onlineCheck: Observable<boolean>;

this.onlineCheck = merge(
      of(navigator.onLine),
      fromEvent(window, 'online').pipe(mapTo(true)),
      fromEvent(window, 'offline').pipe(mapTo(false)));

这就是我创建 WebSocket 的方式(并在需要时处理重新连接):

public connect(): void {
  // no more than one connection simultaneously
  if (!(this.socket == null)) {
    this.socket.complete();
  }

  this.socket = new WebSocketSubject(this.config);
  this.socket
    .pipe(
      retryWhen((errors) => {
        return errors.pipe(
          tap((error) => { console.log('Error: ', error); }),
          concatMap((e, i) =>
            iif(
              () => i < this.reconnectAttempts,
              of(e).pipe(delay(this.reconnectInterval)),
              throwError('Max amount of retries reached')
            )));
      }))
    .subscribe(
      (message: MessageEvent) => { this.onMessage(message); },
      (error: Event) => { this.onError(error); },
      () => { console.log('completed'); });
  }

如果客户端离线(并且达到最大连接重试次数)并且现在又回来了,我如何将onlineCheck订阅与运营商结合起来以重试?retryWhen如果这不是最好的方法,您能否建议另一种方法?

这是一个完整的 StackBlitz 示例(检查控制台日志以获得更好的反馈):https ://stackblitz.com/edit/angular-f4oa3y

标签: angularwebsocketrxjs

解决方案


在你的iif真实情况下,使用这个 observable 而不是of(e).pipe(delay(this.reconnectInterval))

timer(this.reconnectInterval) // wait for reconnect interval
  .pipe(
    concatMap(() => onlineCheck),
    first(Boolean) // now wait for online to be true
  )

这将发出一个 observable,在 reconnectInterval和onlineCheck 为 true后将产生一个值


推荐阅读