首页 > 解决方案 > 如何在不关闭底层 Web 套接字的情况下将 `firstValueFrom` 与 `WebSocketSubject` 一起使用?

问题描述

我正在使用WebSocketSubject,并且我经常想阻止执行,直到给定事件到达,这就是我使用 的原因firstValueFrom,如下面的代码:

let websocket = new WebSocketSubject<any>(url);
let firstMessage = await firstValueFrom(websocket.pipe(filter(m => true));

我只有一个问题,那就是它在解决承诺时firstValueFrom调用websocket.unsubscribe(),但是在一个WebSocketSubject具有关闭底层 Web 套接字的效果上,我想保持打开状态!

目前,我想到了几种可能的出路:

简而言之,我怀疑我遗漏了一些基本的东西(例如,适当的OperatorFunction),这将使我能够做到这一点,以便unsubscribe调用 byfirstValueFrom不会导致底层 Web 套接字被关闭。

标签: websocketrxjs

解决方案


本质上,您希望始终拥有订阅,以便套接字连接保持打开状态。我认为这不是firstValueFrom这项工作的合适工具。我认为创建显式订阅更简单。

如果目的是在应用程序的整个生命周期内保持打开状态,只需在应用程序启动时订阅。

由于您想过滤掉前几个排放,直到满足某些条件,您可以使用skipWhile

const websocket = new WebSocketSubject<any>(url);
const messages = websocket.pipe(skipWhile(m => m !== 'my special event'));

websocket.subscribe(); // keep socket open


// listen
messages.subscribe(m => console.log('message received:', m);

// send
websocket.next('hello server');

可能值得在 rxjs websocket 周围创建一个轻量级包装类,以处理保持连接打开并过滤掉前几个事件:

class MyWebsocket {
  private websocket = new WebSocketSubject<any>(this.url);
  public messages = websocket.pipe(skipWhile(m => m !== 'my special event'));

  constructor(private url) {
    this.websocket.subscribe(); // keep socket open
  }
 
  public sendMessage(message: any) {
    this.websocket.sendMessage(message);
  }
}
const websocket = new MyWebsocket(url);

// listen
websocket.messages.subscribe(m => console.log('message received:', m);

// send
websocket.sendMessage('hello server');

推荐阅读