rxjs - 即使没有活跃的订阅者,我使用 shareReplay() 的 Observable 会永远运行吗?
问题描述
考虑这个Observable:
myObs$ = interval(1000).pipe(shareReplay(2));
及其用法:
myObs$.subscribe(res => console.log('[Subscriber 1]:' + res));
如果我订阅,然后在几秒钟后取消订阅,最后又在几秒钟后重新订阅,似乎间隔一直在运行和计数:
[Subscriber 1]: 0
[Subscriber 1]: 1
Unsubscribe, resubscribe here
[Subscriber 1]: 3
[Subscriber 1]: 4
[Subscriber 1]: 5
[Subscriber 1]: 6
Unsubscribe, resubscribe here
[Subscriber 1]: 10
[Subscriber 1]: 11
[Subscriber 1]: 12
我知道使用 refCount = true 时不会发生这种情况。但是当它为假时,这是否算作潜在的内存泄漏?如果没有,我该如何阻止它?
另外,为什么我需要在取消订阅后重新创建订阅?
sub = new Subscription()
sub.add(myObs.subscribe())
sub.unsubscribe()
sub.add(myObs.subscribe()) // <-- this does not work unless I recreate a new Subscription()
解决方案
使用shareReply
withoutrefCount
可能会导致内存泄漏,因为在每个人都取消订阅后操作员不会自动关闭流。您可以使用接受另一个流的takeUntil运算符来完成一个长期存在的流,并且在与Subject
.
const sub = new Subject();
const stream$ = interval(1000).pipe(
takeUntil(sub),
shareReplay(1)
);
setTimeout(() => stream$.subscribe(console.log), 4000);
setTimeout(() => sub.next(), 6000);
setTimeout(() => stream$.subscribe(undefined, undefined, () => console.log('completed')), 8000); // this will log completed because the stream$ has successfully finished
另一种完成流的方法是使用takeWhile,它也非常方便,它使用谓词函数。
如果您处置了一个订阅,它被标记为已关闭并且您需要创建一个新订阅,则不能再次重复使用它。
看看这一行,在 add 方法内部有一个检查,如果关闭为真,则对提供的订阅执行拆卸。
推荐阅读
- google-compute-engine - 在 VM 实例上使用 TPU 运行 python 脚本时出现问题
- postgresql - Postgres jsonb 嵌套数组追加
- javascript - 自定义验证器 - 前端不显示错误消息
- xslt - xsl:如果未检索到第一个和最后一个节点
- ruby-on-rails - Rails 引导模式客户端验证
- spring - JmsListener 没有收到来自 Activemq 的消息
- git - Git - 将大师合并回开发?
- javascript - Laravel 在数据库中的地图中显示标记
- elasticsearch - 创建高吞吐量 Elasticsearch 集群
- javascript - 通过 ajax 从选择框向 PHP 页面发送 Post 请求