websocket - 如何在不关闭底层 Web 套接字的情况下将 `firstValueFrom` 与 `WebSocketSubject` 一起使用?
问题描述
我正在使用WebSocketSubject
,并且我经常想阻止执行,直到给定事件到达,这就是我使用 的原因firstValueFrom
,如下面的代码:
let websocket = new WebSocketSubject<any>(url);
let firstMessage = await firstValueFrom(websocket.pipe(filter(m => true));
我只有一个问题,那就是它在解决承诺时firstValueFrom
调用websocket.unsubscribe()
,但是在一个WebSocketSubject
具有关闭底层 Web 套接字的效果上,我想保持打开状态!
目前,我想到了几种可能的出路:
- 写一个等价的
firstValueFrom
东西不会取消订阅。
反驳论点:我不希望重新实现一个几乎完美的函数,除了一个小问题; - 使用另一个
Subject
将订阅的WebSocketSubject
,我将firstValueFrom
在该主题上使用。
反驳论点:在使用方面,我发现拥有两个Subject
对象可能会造成混淆,并且必须知道使用哪个对象(例如websocket.next
,用于向上游发送消息,仅websocketProxy
用于接收消息,永远不要在两者之间混淆!); multiplex
用于创建临时Observable
对象,然后将其关闭而不会出现firstValueFrom
问题。
反驳论点:因为在这种情况下我实际上并没有多路复用,所以我宁愿不使用那种方法,它的签名和用法对于我的用例来说似乎是多余的。
简而言之,我怀疑我遗漏了一些基本的东西(例如,适当的OperatorFunction
),这将使我能够做到这一点,以便unsubscribe
调用 byfirstValueFrom
不会导致底层 Web 套接字被关闭。
解决方案
本质上,您希望始终拥有订阅,以便套接字连接保持打开状态。我认为这不是firstValueFrom
这项工作的合适工具。我认为创建显式订阅更简单。
如果目的是在应用程序的整个生命周期内保持打开状态,只需在应用程序启动时订阅。
由于您想过滤掉前几个排放,直到满足某些条件,您可以使用skipWhile
:
const websocket = new WebSocketSubject<any>(url);
const messages = websocket.pipe(skipWhile(m => m !== 'my special event'));
websocket.subscribe(); // keep socket open
// listen
messages.subscribe(m => console.log('message received:', m);
// send
websocket.next('hello server');
可能值得在 rxjs websocket 周围创建一个轻量级包装类,以处理保持连接打开并过滤掉前几个事件:
class MyWebsocket {
private websocket = new WebSocketSubject<any>(this.url);
public messages = websocket.pipe(skipWhile(m => m !== 'my special event'));
constructor(private url) {
this.websocket.subscribe(); // keep socket open
}
public sendMessage(message: any) {
this.websocket.sendMessage(message);
}
}
const websocket = new MyWebsocket(url);
// listen
websocket.messages.subscribe(m => console.log('message received:', m);
// send
websocket.sendMessage('hello server');