首页 > 解决方案 > 即使没有活跃的订阅者,我使用 shareReplay() 的 Observable 会永远运行吗?

问题描述

考虑这个Observable

myObs$ = interval(1000).pipe(shareReplay(2));

及其用法:

myObs$.subscribe(res => console.log('[Subscriber 1]:' + res));

如果我订阅,然后在几秒钟后取消订阅,最后又在几秒钟后重新订阅,似乎间隔一直在运行和计数

[Subscriber 1]: 0
[Subscriber 1]: 1 
Unsubscribe, resubscribe here
[Subscriber 1]: 3
[Subscriber 1]: 4
[Subscriber 1]: 5
[Subscriber 1]: 6 
Unsubscribe, resubscribe here
[Subscriber 1]: 10
[Subscriber 1]: 11
[Subscriber 1]: 12

我知道使用 refCount = true 时不会发生这种情况。但是当它为假时,这是否算作潜在的内存泄漏?如果没有,我该如何阻止它?

另外,为什么我需要在取消订阅后重新创建订阅?

sub = new Subscription()
sub.add(myObs.subscribe())
sub.unsubscribe()
sub.add(myObs.subscribe()) // <-- this does not work unless I recreate a new Subscription()

标签: rxjsobservable

解决方案


使用shareReplywithoutrefCount可能会导致内存泄漏,因为在每个人都取消订阅后操作员不会自动关闭流。您可以使用接受另一个流的takeUntil运算符来完成一个长期存在的流,并且在与Subject.

const sub = new Subject();

const stream$ = interval(1000).pipe(
  takeUntil(sub),
  shareReplay(1)
);

setTimeout(() => stream$.subscribe(console.log), 4000);
setTimeout(() => sub.next(), 6000);
setTimeout(() => stream$.subscribe(undefined, undefined, () => console.log('completed')), 8000); // this will log completed because the stream$ has successfully finished

另一种完成流的方法是使用takeWhile,它也非常方便,它使用谓词函数。

如果您处置了一个订阅,它被标记为已关闭并且您需要创建一个新订阅,则不能再次重复使用它。

看看这一行,在 add 方法内部有一个检查,如果关闭为真,则对提供的订阅执行拆卸。


推荐阅读