rxjs - RXJS flatMap 到重复的 observable
问题描述
我正在尝试实现服务,如果应用程序连接到我的服务器,它会提供可观察的,所以当浏览器在线时,我们会使用计时器 ping 服务器。这是代码:
public get $connected(): Observable<boolean> {
return this.hasInternetConnection
.asObservable()
.pipe(
distinctUntilChanged(),
flatMap((connected: boolean) => {
if (!connected) {
return of(connected);
} else {
return timer(5000)
.pipe(
map(() => {
var success = Math.random() > 0.5;
console.log('PING: ' + success);
return success;
})
);
}
})
);
}
hasInternetConnection
只是一个绑定到窗口的 BehaviorSubjectonline
和offline
事件的 BehaviorSubject,计时器模拟对我的 API 服务器的 ping。
问题是我的订阅$connected
仅从可观察的计时器中捕获第一个值,然后不起作用。在hasInternetConnection
主题更改为false
并返回 后true
,我的订阅再次获得第一个值,然后什么也没有。这是我在控制台中看到的:
PING: true
subscription tap
PING: true
PING: false
PING: true
...
我该如何解决?谢谢!
解决方案
完整解决方案:
private hasInternetConnection: BehaviorSubject<boolean> = new BehaviorSubject<boolean>(navigator.onLine);
private connectedSubject: BehaviorSubject<boolean> = new BehaviorSubject<boolean>(true);
private recheckConnectionSubject: Subject<void> = new Subject<void>();
constructor(
private readonly http: HttpClient,
) {
fromEvent(window, 'online')
.pipe(takeUntil(this.destroyed))
.subscribe(() => {
this.hasInternetConnection.next(true);
});
fromEvent(window, 'offline')
.pipe(takeUntil(this.destroyed))
.subscribe(() => {
this.hasInternetConnection.next(false);
});
merge(
this.hasInternetConnection,
this.recheckConnectionSubject,
)
.pipe(
mapTo(this.hasInternetConnection.value),
switchMap((connected: boolean) => {
if (!connected) {
return of(connected);
} else {
return timer(0, 30000)
.pipe(
mergeMapTo(this.http.get(`${environment.apiRoot}/ping`, { responseType: 'text' })
.pipe(
map((res) => {
return true;
}),
catchError(() => {
return of(false);
})
)
),
);
}
})
)
.subscribe(this.connectedSubject);
}
public get $connected(): Observable<boolean> {
return this.connectedSubject.asObservable()
.pipe(
distinctUntilChanged(),
);
}
public resetTimer(): void {
this.recheckConnectionSubject.next();
}
推荐阅读
- apache - 我正在尝试使用 mod_proxy_ajp 在 Web 服务器后面安装 ofbiz,但我无法让它工作
- node.js - 错误 [ERR_STREAM_DESTROYED]:使用 Winston 销毁流后无法调用 write
- python - PYREBASE - 我有一个用于某人的 firebase 应用程序的 APIKey,我想查看他们存储桶中的文件
- azure-cosmosdb - 如何降低最小吞吐量
- mysql - Sequelize 中的外键
- c# - 带有后端.net核心的Nginx反向代理使用signalR
- string - 拆分字符串并将子字符串保存在 dart 的列表中
- c++ - 如何使用 C++ 在 matlab 中实现 unique 的行为?
- python - 是否可以在另一个脚本中调用函数?(Python)
- python - 如何通过 Python 读取和匹配在线 XML 文件中的某些值?