rxjs - 如果可观察停止之一发出事件,为什么 Observable.race 不起作用?
问题描述
如果互联网连接丢失,我想在 webapp 中实现 websocket 重新连接。为了检测互联网丢失,我使用乒乓方法,这意味着我从客户端发送 ping 消息,服务器返回我的 pong 消息。
当 webapp 加载时,我发送 init ping 消息并开始在套接字上监听某种回复:
this.websocket.onmessage = (evt) => {
try {
const websocketPayload: any = JSON.parse(evt.data);
if (websocketPayload.pong !== undefined && websocketPayload.pong == 1) {
this.pingPong$.next('pong');
}
这意味着互联网连接看起来不错,我们可以继续。我也有以下代码:
Observable.race(
Observable.of('timeout').delay(5000).repeat(),
this.pingPong$
).subscribe((data) => {
console.log("[ping-pong]:", data);
if (data == 'pong') {
Observable.interval(5000).take(1).subscribe(() => {
console.log("[ping-pong]:sending ping")
this.send({ping:1})
});
} else if (data == 'timeout'){
// show reconnect screen and start reconnect
console.error("It looks like websocket connection lost");
}
});
但!当 this.pingPong$ 主题停止发出事件时 - .next() 不会发生,因为当我手动断开连接时我们无法得到响应 - 我认为在 Observable.race 中这个 observable 将被发出
Observable.of('timeout').delay(5000).repeat()
但是如果this.pingPong$
停止发射,我的订阅永远不会发生。
为什么 ?
谢谢
解决方案
race
选择并继续订阅第一个发出的 Observable。
因此,如果您this.pingPong$
开始发出然后停止它没有任何区别,因为race
一直订阅this.pingPong$
. 其他 Observables 不再重要。您可能希望从中发出一个值this.pingPong$
并重复整个过程。例如像下面这样:
Observable.race(
Observable.of('timeout').delay(5000).repeat(),
this.pingPong$
)
.pipe(
take(1), // complete the chain immediately
repeat() // resubscribe after take(1) completes the chain
)
.subscribe(...);
显然,这主要取决于你想做什么,但我希望你明白这一点。
推荐阅读
- php - 我没有在 PHP 中的 for 循环中获取 $j 值
- unix - 给定一个纪元,找到它之前 n 年日期的纪元(数学上)
- mongoose - 无法读取未定义的属性“连接”-nestjs/mongoose
- javascript - 为什么从 fetch 函数返回的对象是它之外的函数?
- javascript - 按包含的前缀过滤数组
- reactjs - React - 每当更新反应上下文中的状态时如何调用函数?
- javascript - 是否可以在本地使用工作服/codecov?
- javascript - 在 HTML 站点中单击导航栏元素外部时如何关闭打开的折叠导航栏?
- .net-core - serilog 日志文件输出为“log_{Date}.txt”
- multi-tenant - 环回多租户和不同的模型