首页 > 解决方案 > 如果可观察停止之一发出事件,为什么 Observable.race 不起作用?

问题描述

如果互联网连接丢失,我想在 webapp 中实现 websocket 重新连接。为了检测互联网丢失,我使用乒乓方法,这意味着我从客户端发送 ping 消息,服务器返回我的 pong 消息。

当 webapp 加载时,我发送 init ping 消息并开始在套接字上监听某种回复:

this.websocket.onmessage = (evt) => {
      try {

        const websocketPayload: any = JSON.parse(evt.data);

        if (websocketPayload.pong !== undefined && websocketPayload.pong == 1) {

          this.pingPong$.next('pong');
        }

这意味着互联网连接看起来不错,我们可以继续。我也有以下代码:

 Observable.race(
      Observable.of('timeout').delay(5000).repeat(),
      this.pingPong$
    ).subscribe((data) => {
      console.log("[ping-pong]:", data);
      if (data == 'pong') {
        Observable.interval(5000).take(1).subscribe(() => {
          console.log("[ping-pong]:sending ping")
          this.send({ping:1})
        });
      } else if (data == 'timeout'){
        // show reconnect screen and start reconnect
        console.error("It looks like websocket connection lost");
      }
    });

但!当 this.pingPong$ 主题停止发出事件时 - .next() 不会发生,因为当我手动断开连接时我们无法得到响应 - 我认为在 Observable.race 中这个 observable 将被发出

Observable.of('timeout').delay(5000).repeat()

但是如果this.pingPong$停止发射,我的订阅永远不会发生。

为什么 ?

谢谢

标签: rxjsobservableemit

解决方案


race选择并继续订阅第一个发出的 Observable。

因此,如果您this.pingPong$开始发出然后停止它没有任何区别,因为race一直订阅this.pingPong$. 其他 Observables 不再重要。您可能希望从中发出一个值this.pingPong$并重复整个过程。例如像下面这样:

Observable.race(
    Observable.of('timeout').delay(5000).repeat(),
    this.pingPong$
  )
  .pipe(
    take(1), // complete the chain immediately
    repeat() // resubscribe after take(1) completes the chain
  )
  .subscribe(...);

显然,这主要取决于你想做什么,但我希望你明白这一点。


推荐阅读