首页 > 解决方案 > 如何解决 Publish().RefCount() 行为不一致的问题?

问题描述

最近,我偶然发现了Enigmativity 关于and运算符的有趣声明PublishRefCount

您正在使用危险的 .Publish().RefCount() 运算符对,它创建了一个在完成后无法订阅的序列。

这一说法似乎反对 Lee Campbell 对这些运营商的评价。引用他的书Intro to Rx

Publish/RefCount 对对于获取冷 observable 并将其作为热 observable 序列共享给后续观察者非常有用。

起初我不相信 Enigmativity 的说法是正确的,所以我试图反驳它。我的实验表明,这Publish().RefCount()确实是不一致的。第二次订阅已发布的序列可能会导致对源序列的新订阅,这取决于源序列是否在连接时完成。如果已完成,则不会重新订阅。如果未完成,则将重新订阅。这是此行为的演示:

var observable = Observable
    .Create<int>(o =>
    {
        o.OnNext(13);
        o.OnCompleted(); // Commenting this line alters the observed behavior
        return Disposable.Empty;
    })
    .Do(x => Console.WriteLine($"Producer generated: {x}"))
    .Finally(() => Console.WriteLine($"Producer finished"))
    .Publish()
    .RefCount()
    .Do(x => Console.WriteLine($"Consumer received #{x}"))
    .Finally(() => Console.WriteLine($"Consumer finished"));

observable.Subscribe().Dispose();
observable.Subscribe().Dispose();

在这个例子中,observable它由三个部分组成。首先是生成单个值然后完成的生产部分。然后遵循发布机制(Publish+ RefCount)。最后是观察生产者发出的值的消费部分。订阅了observable两次。预期的行为是每个订阅都会收到一个值。但这不是发生的事情!这是输出:

Producer generated: 13
Consumer received #13
Producer finished
Consumer finished
Consumer finished

在小提琴上试试

如果我们注释该o.OnCompleted();行,这是输出。这种微妙的变化导致了预期和期望的行为:

Producer generated: 13
Consumer received #13
Producer finished
Consumer finished
Producer generated: 13
Consumer received #13
Producer finished
Consumer finished

在第一种情况下,生产者( 之前的部分Publish().RefCount())只订阅了一次。第一个消费者收到了发出的值,但第二个消费者什么也没收到(OnCompleted通知除外)。在第二种情况下,生产者被订阅了两次。每次它产生一个值,每个消费者得到一个值。

我的问题是:我们如何解决这个问题?我们如何修改Publish运算符,或RefCount,或两者,以使它们的行为始终一致且合乎需要?以下是理想行为的规范:

  1. 发布的序列应该将所有直接来自源序列的通知传播给它的订阅者,而不是其他任何东西。
  2. 当当前订阅者数量从零增加到一时,发布的序列应该订阅源序列。
  3. 只要至少有一个订阅者,发布的序列就应该与源保持连接。
  4. 当当前订阅者数量变为零时,发布的序列应该从源取消订阅。

我要求PublishRefCount提供上述功能的自定义运算符,或者使用内置运算符实现所需功能的方法。

顺便说一句,存在一个类似的问题,询问为什么会发生这种情况。我的问题是关于如何解决它。


更新:回想起来,上述规范导致了一种不稳定的行为,使得竞态条件不可避免。不能保证对已发布序列的两次订阅将导致对源序列的一次订阅。源序列可能在两个订阅之间完成,导致第一个订阅者取消订阅,导致RefCount运营商取消订阅,导致下一个订阅者重新订阅源。内置的行为.Publish().RefCount()可以防止这种情况发生。

道德教训是.Publish().RefCount()序列没有被破坏,但它不可重用。它不能可靠地用于多个连接/断开会话。如果您想要第二个会话,您应该创建一个新.Publish().RefCount()序列。

标签: c#system.reactiverx.net

解决方案


李在解释方面做得很好IConnectableObservablePublish解释得不是很好。这是一种非常简单的动物,很难解释。我假设你明白IConnectableObservable

如果我们简单而懒惰地重新实现零参数Publish函数,它看起来像这样:

//  For illustrative purposes only: don't use this code
public class PublishObservable<T> : IConnectableObservable<T>
{
    private readonly IObservable<T> _source;
    private readonly Subject<T> _proxy = new Subject<T>();
    private IDisposable _connection;
    
    public PublishObservable(IObservable<T> source)
    {
        _source = source;
    }
    
    public IDisposable Connect()
    {
        if(_connection == null)
            _connection = _source.Subscribe(_proxy);
        var disposable = Disposable.Create(() =>
        {
            _connection.Dispose();
            _connection = null;
        });
        return _connection;
    }

    public IDisposable Subscribe(IObserver<T> observer)
    {
        var _subscription = _proxy.Subscribe(observer);
        return _subscription;
    }
}

public static class X
{
    public static IConnectableObservable<T> Publish<T>(this IObservable<T> source)
    {
        return new PublishObservable<T>(source);
    }
}

Publish创建一个Subject订阅源 observable 的代理。代理可以根据连接订阅/取消订阅源:调用Connect,代理订阅源。调用Dispose一次性连接,代理从源取消订阅。从中得到的重要想法是,有一个单一Subject的代理与源的任何连接。不能保证您只订阅一个源,但可以保证您有一个代理和一个并发连接。您可以通过连接/断开连接进行多个订阅。

RefCount处理事情的调用Connect部分:这是一个简单的重新实现:

//  For illustrative purposes only: don't use this code
public class RefCountObservable<T> : IObservable<T>
{
    private readonly IConnectableObservable<T> _source;
    private IDisposable _connection;
    private int _refCount = 0;

    public RefCountObservable(IConnectableObservable<T> source)
    {
        _source = source;
    }
    public IDisposable Subscribe(IObserver<T> observer)
    {
        var subscription = _source.Subscribe(observer);
        var disposable = Disposable.Create(() =>
        {
            subscription.Dispose();
            DecrementCount();
        });
        if(++_refCount == 1)
            _connection = _source.Connect();
            
        return disposable;
    }

    private void DecrementCount()
    {
        if(--_refCount == 0)
            _connection.Dispose();
    }
}
public static class X
{
    public static IObservable<T> RefCount<T>(this IConnectableObservable<T> source)
    {
        return new RefCountObservable<T>(source);
    }

}

更多代码,但仍然非常简单:如果 refcount 上升到 1,则调用ConnectConnectableObservable如果下降到 0,则断开连接。

将两者放在一起,您将得到一对保证只有一个源 observable 的并发订阅,通过一个 persistent 代理Subject。将Subject仅在有 >0 个下游订阅时订阅源。


鉴于该介绍,您的问题中有很多误解,因此我将一一进行介绍:

... Publish().RefCount() 确实可能不一致。第二次订阅已发布的序列可能会导致对源序列的新订阅,这取决于源序列是否在连接时完成。如果已完成,则不会重新订阅。如果未完成,则将重新订阅。

.Publish().RefCount()将仅在一种情况下重新订阅源:当订阅者从零变为 1 时。如果订阅者数量出于任何原因从 0 变为 1 再变为 0 到 1,那么您最终将重新订阅。源 observable 完成将导致RefCount发出一个OnCompleted,并且它的所有观察者都取消订阅。因此后续订阅RefCount将触发重新订阅源的尝试。自然,如果来源正确地遵守了可观察合同,它会OnCompleted立即发出一个,就是这样。

[使用 OnCompleted 参见示例 observable...] observable 被订阅了两次。预期的行为是每个订阅都会收到一个值。

不会。预期的行为是代理Subject在发出 anOnCompleted后将重新OnCompleted向任何后续订阅尝试发出 an。由于您的源 observable 在您的第一个订阅结束时同步完成,因此第二个订阅将尝试订阅一个Subject已经发出OnCompleted. 这应该导致一个OnCompleted,否则 Observable 合约将被破坏。

[参见示例 observable 没有 OnCompleted 作为第二种情况...] 在第一种情况下,冷生产者(Publish().RefCount() 之前的部分)只订阅了一次。第一个消费者收到了发出的值,但第二个消费者什么也没收到(除了来自 OnCompleted 通知)。在第二种情况下,生产者被订阅了两次。每次它产生一个值,每个消费者得到一个值。

这是对的。由于代理Subject从未完成,后续对源的重新订阅将导致冷的 observable 重新运行。

我的问题是:我们如何解决这个问题?[..]

  1. 发布的序列应该将所有直接来自源序列的通知传播给它的订阅者,而不是其他任何东西。
  2. 当当前订阅者数量从零增加到一时,发布的序列应该订阅源序列。
  3. 只要至少有一个订阅者,发布的序列就应该与源保持连接。
  4. 当当前订阅者数量变为零时,发布的序列应该从源取消订阅。

只要您不完成/错误,以上所有当前都会发生.Publish并且.RefCount当前。我不建议实施一个改变它的运营商,打破 Observable 合同。


编辑

我认为与 Rx 混淆的 #1 来源是热/冷可观察对象。由于Publish可以“预热”冷的 observables,因此它会导致令人困惑的边缘情况也就不足为奇了。

首先,关于可观察合同。Observable 合约更简洁地说,anOnNext永远不能跟在OnCompleted/OnError之后,应该只有一个OnCompleted or OnError通知。这确实留下了尝试订阅终止的 observables 的边缘情况:尝试订阅终止的 observables 会导致立即收到终止消息。这会破坏合同吗?也许吧,但据我所知,这是图书馆里唯一的合同作弊。另一种选择是订阅死空气。这对任何人都没有帮助。

这如何与热/冷可观察对象联系起来?不幸的是,令人困惑。订阅冰冷的 observable 会触发整个 observable 管道的重建。这意味着 subscribe-to-already-terminated 规则仅适用于 hot observables。冷的 observables 总是重新开始。

考虑这段代码,o冷可观察的在哪里:

var o = Observable.Interval(TimeSpan.FromMilliseconds(100))
    .Take(5);
var s1 = o.Subscribe(i => Console.WriteLine(i.ToString()));
await Task.Delay(TimeSpan.FromMilliseconds(600));
var s2 = o.Subscribe(i => Console.WriteLine(i.ToString()));

就合约而言,observable behinds1和 observable behinds2是完全不同的。因此,即使它们之间存在延迟,并且您最终会看到OnNextafter OnCompleted,这也不是问题,因为它们是完全不同的 observables。

它变得粘稠的地方是热身Publish版本。如果您要添加.Publish().RefCount()到上面代码的末尾o...

  • 在不更改任何其他内容的情况下,s2将立即终止打印任何内容。
  • 将延迟更改为 400 左右,并s2打印最后两个数字。
  • 更改s1为 only .Take(2),然后s2重新开始打印 0 到 4。

更糟糕的是,薛定谔的猫效应:如果你设置一个观察者o来观察整个时间会发生什么,这会改变引用计数,影响功能!看着它,改变行为。调试噩梦。

这是试图“热身”冷可观测的危险。它只是不能很好地工作,尤其是在Publish/RefCount.

我的建议是:

  1. 不要试图加热冷的 observables。
  2. 如果你需要共享订阅,无论是冷的还是热的 observables,坚持@Enigmativity 严格使用选择器Publish版本的一般规则
  3. 如果必须,请在Publish/RefCountobservable 上进行虚拟订阅。这至少提供了一致的 Refcount >= 1,从而降低了量子活动效应。

推荐阅读