首页 > 解决方案 > 在方法上创建一个 observable

问题描述

在我的应用程序中,我有一个方法,每次服务器上的某个队列发生更新时都会调用该方法。该应用程序已初始化为以这种方式运行。

现在,每次使用最新数据调用该方法时,我都希望将其视为事件流的一部分,因此使其成为永远不会以订阅者结尾的 Observable 的一部分。

我面临的挑战是:如何在被调用的方法上创建一个 observable?下面是我的示例代码。

//This method is invoked every time an update happens on the server
public virtual void MessageHandler(MyObject1 object1, MyObject2 object2)
{
    Observable.Create<MyObject3>(observer =>
        {
            var object3 = new MyObject3(object1, object2);
            observer.OnNext(object3 );

            return Disposable.Empty;
        })
        .Subscribe(x => WriteLine($"Message acknowledged"));
}

但这会在每次调用该方法时创建一个 observable,而不是我想要的,而且它看起来也不是正确的方法。我还读到使用“Subject”或“AsyncSubject”不是解决问题的正确方法。

标签: c#.net-coresystem.reactive

解决方案


不使用的规则Subject更多的是一个没有很好表达的指导方针。

一般来说,如果您在观察的管道中使用主题,那么您可能做错了什么——应该避免这种情况。

如果你使用 aSubject作为 observable 的源并且你正确地封装了Subject 并且你混淆了它,那么你很好。所以这通常意味着使用一个private只有你的代码可以访问的字段(所以没有人可以调用.OnCompleted()它并调用.AsObservable(),这样就没有人可以将你的 observable 转换回底层的Subject.

在您的情况下,您是直接订阅,因此.AsObservable()不需要,但我怀疑这只是演示代码。在现实世界中,确保你混淆了。

这是您的代码应如下所示:

private Subject<MyObject3> _subject = new Subject<MyObject3>();

private void SetUpObservable()
{
    _subject = new Subject<MyObject3>();
    _subject.Subscribe(x => Console.WriteLine($"Message acknowledged"));
}

public virtual void MessageHandler(MyObject1 object1, MyObject2 object2)
{
    _subject.OnNext(new MyObject3(object1, object2));
}

现在,如果您仍然想避免,Subject那么您可以这样做:

private Action<MyObject3> _delegate;

private void SetUpObservable()
{
    Observable
        .FromEvent<MyObject3>(h => _delegate += h, h => _delegate -= h)
        .Subscribe(x => Console.WriteLine($"Message acknowledged"));
}

public virtual void MessageHandler(MyObject1 object1, MyObject2 object2)
{
    _delegate?.Invoke(new MyObject3(object1, object2));
}

在我看来,这Subject可以让您更好地控制并且更容易设置。

无论如何,您可能应该保留订阅IDisposable,以便您可以正确清理。


推荐阅读