首页 > 解决方案 > 从 Observable 的订阅方法返回 Disposable

问题描述

我有一个关于 Observables 的问题(在这本书的出版商子论坛上发布了这个问题,但我仍在等待任何回复)。

我按照标准做法使用提供的辅助方法,而不是手工制作可观察对象。然而,只是出于学术兴趣,我确实了解了手工制作可观察物体需要什么。

我在一本书中看到了一个实现,其中在订阅方法 Disposable.Empty 结束时返回。代码有点像下面。

public class MyObservable : IObservable<int>
{
    public IDisposable Subscribe(IObserver<int> observer)
    {
        for (int i = 0; i < 5; i++)
        {
            Thread.Sleep(1000);
            observer.OnNext(i);
        }
        observer.OnCompleted();
        return Disposable.Empty;
    }
}

如果我想返回一个适当的 Disposable ,这实际上会导致在调用 Dispose 时取消订阅应该是什么方式?

我在ObservableObserver中使用了这个

我不得不介绍一个订阅处理程序

public class SubscriptionHandler : IDisposable
{
    private readonly List<IObserver<int>> _listOfObservers;
    private readonly IObserver<int> _currentObserver;

    public SubscriptionHandler(List<IObserver<int>> currentListOfObservers, IObserver<int> currentObserver)
    {
        _listOfObservers = currentListOfObservers;
        _currentObserver = currentObserver;
    }

    public void Dispose()
    {
        if (_currentObserver != null && _listOfObservers.Contains(_currentObserver))
        {
            _listOfObservers.Remove(_currentObserver);
        }
    }
}
    

这是 Observable 的代码

public class MyObservable : IObservable<int>
{
    private List<IObserver<int>> _listOfSubscribedObservers = new List<IObserver<int>>();

    public IDisposable Subscribe(IObserver<int> observer)
    {
        if (!_listOfSubscribedObservers.Contains(observer))
        {
            _listOfSubscribedObservers.Add(observer);
        }

        Task.Run(() =>
        {
            for (int i = 0; i < 5; i++)
            {
                Thread.Sleep(1000);
                observer.OnNext(i);
            }

            observer.OnCompleted();
        });

        return new SubscriptionHandler(_listOfSubscribedObservers, observer);
    }
}

我有一种感觉,我错过了一些东西。必须有一种内置的方法来为手工制作的 Observable 返回一个有意义的 Disposable 或者这是只有 Observable 创建辅助方法才有的东西?

标签: c#reactive-programmingsystem.reactive

解决方案


我应该明确指出,所有这些都是 Rx 设计内部的演示。您可以查看 classes AnonymousObservable<T>AnonymousObserver<T>AnonymousDisposable,这是框架的工作方式。很直接。但是,您几乎不应该使用任何此类代码,而应使用Disposable.Create和之类的东西Observable.Create。如果您正在实施IObservable,那么您几乎肯定做错了。

这是基本思想:observable 需要产生一个IDisposable从 observable 的内部观察者列表中删除相关观察者的方法。您的代码(错误地)从内部列表中删除了所有观察者。

这是一个基本的一次性用品,可以很容易地在功能上创建。使用此代码,GenericDisposable.CreateDisposable.Create(Action a).

public class GenericDisposable : IDisposable
{
    public static IDisposable Create(Action disposeAction)
    {
        return new GenericDisposable(disposeAction);
    }

    private readonly Action _disposeAction;
    public GenericDisposable(Action disposeAction)
    {
        _disposeAction = disposeAction;
    }
    public void Dispose()
    {
        _disposeAction();
    }
}

...这是一个可观察的实现示例:

public class SendIntMessages : IObservable<int>
{
    private readonly HashSet<IObserver<int>> _observers = new HashSet<IObserver<int>>();

    protected void OnNext(int i)
    {
        foreach (var o in _observers)
            o.OnNext(i);
    }

    protected void OnError(Exception e)
    {
        foreach (var o in _observers)
            o.OnError(e);
    }

    protected void OnCompleted()
    {
        foreach (var o in _observers)
            o.OnCompleted();
    }

    public void SendIntMessage(int i)
    {
        OnNext(i);
    }

    public void EndStream()
    {
        OnCompleted();
    }

    public void SendError(Exception e)
    {
        OnError(e);
    }

    public IDisposable Subscribe(IObserver<int> observer)
    {
        _observers.Add(observer);
        return GenericDisposable.Create(() => _observers.Remove(observer));
    }
}

这是一个长期运行的、热的可观察的。它跟踪它的观察者,并且一次性取消订阅它们。

对比一下这个可观察的:

public class CountTo5 : IObservable<int>
{
    public IDisposable Subscribe(IObserver<int> observer)
    {
        observer.OnNext(1);
        observer.OnNext(2);
        observer.OnNext(3);
        observer.OnNext(4);
        observer.OnNext(5);

        return GenericDisposable.Create(() => {});
    }
}

这是一个立即运行的“冷”可观察对象。中间没有办法取消订阅:当你拿到一次性用品时,observable 已经结束了。

Disposable.Empty是 . 的简单简写DisposableCreate(() => {})


推荐阅读