c# - 如何解决 Publish().RefCount() 行为不一致的问题?
问题描述
最近,我偶然发现了Enigmativity 关于and运算符的有趣声明:Publish
RefCount
您正在使用危险的 .Publish().RefCount() 运算符对,它创建了一个在完成后无法订阅的序列。
这一说法似乎反对 Lee Campbell 对这些运营商的评价。引用他的书Intro to Rx:
Publish/RefCount 对对于获取冷 observable 并将其作为热 observable 序列共享给后续观察者非常有用。
起初我不相信 Enigmativity 的说法是正确的,所以我试图反驳它。我的实验表明,这Publish().RefCount()
确实是不一致的。第二次订阅已发布的序列可能会导致对源序列的新订阅,这取决于源序列是否在连接时完成。如果已完成,则不会重新订阅。如果未完成,则将重新订阅。这是此行为的演示:
var observable = Observable
.Create<int>(o =>
{
o.OnNext(13);
o.OnCompleted(); // Commenting this line alters the observed behavior
return Disposable.Empty;
})
.Do(x => Console.WriteLine($"Producer generated: {x}"))
.Finally(() => Console.WriteLine($"Producer finished"))
.Publish()
.RefCount()
.Do(x => Console.WriteLine($"Consumer received #{x}"))
.Finally(() => Console.WriteLine($"Consumer finished"));
observable.Subscribe().Dispose();
observable.Subscribe().Dispose();
在这个例子中,observable
它由三个部分组成。首先是生成单个值然后完成的生产部分。然后遵循发布机制(Publish
+ RefCount
)。最后是观察生产者发出的值的消费部分。订阅了observable
两次。预期的行为是每个订阅都会收到一个值。但这不是发生的事情!这是输出:
Producer generated: 13
Consumer received #13
Producer finished
Consumer finished
Consumer finished
(在小提琴上试试)
如果我们注释该o.OnCompleted();
行,这是输出。这种微妙的变化导致了预期和期望的行为:
Producer generated: 13
Consumer received #13
Producer finished
Consumer finished
Producer generated: 13
Consumer received #13
Producer finished
Consumer finished
在第一种情况下,冷生产者( 之前的部分Publish().RefCount()
)只订阅了一次。第一个消费者收到了发出的值,但第二个消费者什么也没收到(OnCompleted
通知除外)。在第二种情况下,生产者被订阅了两次。每次它产生一个值,每个消费者得到一个值。
我的问题是:我们如何解决这个问题?我们如何修改Publish
运算符,或RefCount
,或两者,以使它们的行为始终一致且合乎需要?以下是理想行为的规范:
- 发布的序列应该将所有直接来自源序列的通知传播给它的订阅者,而不是其他任何东西。
- 当当前订阅者数量从零增加到一时,发布的序列应该订阅源序列。
- 只要至少有一个订阅者,发布的序列就应该与源保持连接。
- 当当前订阅者数量变为零时,发布的序列应该从源取消订阅。
我要求PublishRefCount
提供上述功能的自定义运算符,或者使用内置运算符实现所需功能的方法。
顺便说一句,存在一个类似的问题,询问为什么会发生这种情况。我的问题是关于如何解决它。
更新:回想起来,上述规范导致了一种不稳定的行为,使得竞态条件不可避免。不能保证对已发布序列的两次订阅将导致对源序列的一次订阅。源序列可能在两个订阅之间完成,导致第一个订阅者取消订阅,导致RefCount
运营商取消订阅,导致下一个订阅者重新订阅源。内置的行为.Publish().RefCount()
可以防止这种情况发生。
道德教训是.Publish().RefCount()
序列没有被破坏,但它不可重用。它不能可靠地用于多个连接/断开会话。如果您想要第二个会话,您应该创建一个新.Publish().RefCount()
序列。
解决方案
李在解释方面做得很好,IConnectableObservable
但Publish
解释得不是很好。这是一种非常简单的动物,很难解释。我假设你明白IConnectableObservable
:
如果我们简单而懒惰地重新实现零参数Publish
函数,它看起来像这样:
// For illustrative purposes only: don't use this code
public class PublishObservable<T> : IConnectableObservable<T>
{
private readonly IObservable<T> _source;
private readonly Subject<T> _proxy = new Subject<T>();
private IDisposable _connection;
public PublishObservable(IObservable<T> source)
{
_source = source;
}
public IDisposable Connect()
{
if(_connection == null)
_connection = _source.Subscribe(_proxy);
var disposable = Disposable.Create(() =>
{
_connection.Dispose();
_connection = null;
});
return _connection;
}
public IDisposable Subscribe(IObserver<T> observer)
{
var _subscription = _proxy.Subscribe(observer);
return _subscription;
}
}
public static class X
{
public static IConnectableObservable<T> Publish<T>(this IObservable<T> source)
{
return new PublishObservable<T>(source);
}
}
Publish
创建一个Subject
订阅源 observable 的代理。代理可以根据连接订阅/取消订阅源:调用Connect
,代理订阅源。调用Dispose
一次性连接,代理从源取消订阅。从中得到的重要想法是,有一个单一Subject
的代理与源的任何连接。不能保证您只订阅一个源,但可以保证您有一个代理和一个并发连接。您可以通过连接/断开连接进行多个订阅。
RefCount
处理事情的调用Connect
部分:这是一个简单的重新实现:
// For illustrative purposes only: don't use this code
public class RefCountObservable<T> : IObservable<T>
{
private readonly IConnectableObservable<T> _source;
private IDisposable _connection;
private int _refCount = 0;
public RefCountObservable(IConnectableObservable<T> source)
{
_source = source;
}
public IDisposable Subscribe(IObserver<T> observer)
{
var subscription = _source.Subscribe(observer);
var disposable = Disposable.Create(() =>
{
subscription.Dispose();
DecrementCount();
});
if(++_refCount == 1)
_connection = _source.Connect();
return disposable;
}
private void DecrementCount()
{
if(--_refCount == 0)
_connection.Dispose();
}
}
public static class X
{
public static IObservable<T> RefCount<T>(this IConnectableObservable<T> source)
{
return new RefCountObservable<T>(source);
}
}
更多代码,但仍然非常简单:如果 refcount 上升到 1,则调用Connect
,ConnectableObservable
如果下降到 0,则断开连接。
将两者放在一起,您将得到一对保证只有一个源 observable 的并发订阅,通过一个 persistent 代理Subject
。将Subject
仅在有 >0 个下游订阅时订阅源。
鉴于该介绍,您的问题中有很多误解,因此我将一一进行介绍:
... Publish().RefCount() 确实可能不一致。第二次订阅已发布的序列可能会导致对源序列的新订阅,这取决于源序列是否在连接时完成。如果已完成,则不会重新订阅。如果未完成,则将重新订阅。
.Publish().RefCount()
将仅在一种情况下重新订阅源:当订阅者从零变为 1 时。如果订阅者数量出于任何原因从 0 变为 1 再变为 0 到 1,那么您最终将重新订阅。源 observable 完成将导致RefCount
发出一个OnCompleted
,并且它的所有观察者都取消订阅。因此后续订阅RefCount
将触发重新订阅源的尝试。自然,如果来源正确地遵守了可观察合同,它会OnCompleted
立即发出一个,就是这样。
[使用 OnCompleted 参见示例 observable...] observable 被订阅了两次。预期的行为是每个订阅都会收到一个值。
不会。预期的行为是代理Subject
在发出 anOnCompleted
后将重新OnCompleted
向任何后续订阅尝试发出 an。由于您的源 observable 在您的第一个订阅结束时同步完成,因此第二个订阅将尝试订阅一个Subject
已经发出OnCompleted
. 这应该导致一个OnCompleted
,否则 Observable 合约将被破坏。
[参见示例 observable 没有 OnCompleted 作为第二种情况...] 在第一种情况下,冷生产者(Publish().RefCount() 之前的部分)只订阅了一次。第一个消费者收到了发出的值,但第二个消费者什么也没收到(除了来自 OnCompleted 通知)。在第二种情况下,生产者被订阅了两次。每次它产生一个值,每个消费者得到一个值。
这是对的。由于代理Subject
从未完成,后续对源的重新订阅将导致冷的 observable 重新运行。
我的问题是:我们如何解决这个问题?[..]
- 发布的序列应该将所有直接来自源序列的通知传播给它的订阅者,而不是其他任何东西。
- 当当前订阅者数量从零增加到一时,发布的序列应该订阅源序列。
- 只要至少有一个订阅者,发布的序列就应该与源保持连接。
- 当当前订阅者数量变为零时,发布的序列应该从源取消订阅。
只要您不完成/错误,以上所有当前都会发生.Publish
并且.RefCount
当前。我不建议实施一个改变它的运营商,打破 Observable 合同。
编辑:
我认为与 Rx 混淆的 #1 来源是热/冷可观察对象。由于Publish
可以“预热”冷的 observables,因此它会导致令人困惑的边缘情况也就不足为奇了。
首先,关于可观察合同。Observable 合约更简洁地说,anOnNext
永远不能跟在OnCompleted
/OnError
之后,应该只有一个OnCompleted
or OnError
通知。这确实留下了尝试订阅终止的 observables 的边缘情况:尝试订阅终止的 observables 会导致立即收到终止消息。这会破坏合同吗?也许吧,但据我所知,这是图书馆里唯一的合同作弊。另一种选择是订阅死空气。这对任何人都没有帮助。
这如何与热/冷可观察对象联系起来?不幸的是,令人困惑。订阅冰冷的 observable 会触发整个 observable 管道的重建。这意味着 subscribe-to-already-terminated 规则仅适用于 hot observables。冷的 observables 总是重新开始。
考虑这段代码,o
冷可观察的在哪里:
var o = Observable.Interval(TimeSpan.FromMilliseconds(100))
.Take(5);
var s1 = o.Subscribe(i => Console.WriteLine(i.ToString()));
await Task.Delay(TimeSpan.FromMilliseconds(600));
var s2 = o.Subscribe(i => Console.WriteLine(i.ToString()));
就合约而言,observable behinds1
和 observable behinds2
是完全不同的。因此,即使它们之间存在延迟,并且您最终会看到OnNext
after OnCompleted
,这也不是问题,因为它们是完全不同的 observables。
它变得粘稠的地方是热身Publish
版本。如果您要添加.Publish().RefCount()
到上面代码的末尾o
...
- 在不更改任何其他内容的情况下,
s2
将立即终止打印任何内容。 - 将延迟更改为 400 左右,并
s2
打印最后两个数字。 - 更改
s1
为 only.Take(2)
,然后s2
重新开始打印 0 到 4。
更糟糕的是,薛定谔的猫效应:如果你设置一个观察者o
来观察整个时间会发生什么,这会改变引用计数,影响功能!看着它,改变行为。调试噩梦。
这是试图“热身”冷可观测的危险。它只是不能很好地工作,尤其是在Publish/RefCount
.
我的建议是:
- 不要试图加热冷的 observables。
- 如果你需要共享订阅,无论是冷的还是热的 observables,坚持@Enigmativity 严格使用选择器
Publish
版本的一般规则 - 如果必须,请在
Publish/RefCount
observable 上进行虚拟订阅。这至少提供了一致的 Refcount >= 1,从而降低了量子活动效应。
推荐阅读
- docker - 从 macbook m1 启动 docker 时测试容器错误
- java - Java Intellj:org.json.simple 不存在
- flutter - 如何解决 Flutter 中的 3rd 方插件冲突?
- spring-boot - 它是使用现有数据插入的。为什么要创建一个新行?
- flutter - 我不想在列表视图中显示表的所有记录,但我想在颤动中仅显示特定客户的记录
- android - 如何从通知片段中打开正确的指定名称选项卡?
- jsxgraph - 不透明度不覆盖 Jsxgraph 下方的对象
- php - 错误:MAMP MAC 上没有名称为“php56-intl”的可用公式或木桶
- javascript - 从响应数组(Javascript)动态生成innerHTML元素
- javascript - 用 flex 填充页面并用 body 将页脚粘贴到底部(Tailwindcss,Nextjs)