c# - 从 Observable 的订阅方法返回 Disposable
问题描述
我有一个关于 Observables 的问题(我在这本书的出版商子论坛上发布了这个问题,但我仍在等待任何回复)。
我按照标准做法使用提供的辅助方法,而不是手工制作可观察对象。然而,只是出于学术兴趣,我确实了解了手工制作可观察物体需要什么。
我在一本书中看到了一个实现,其中在订阅方法 Disposable.Empty 结束时返回。代码有点像下面。
public class MyObservable : IObservable<int>
{
public IDisposable Subscribe(IObserver<int> observer)
{
for (int i = 0; i < 5; i++)
{
Thread.Sleep(1000);
observer.OnNext(i);
}
observer.OnCompleted();
return Disposable.Empty;
}
}
如果我想返回一个适当的 Disposable ,这实际上会导致在调用 Dispose 时取消订阅应该是什么方式?
我不得不介绍一个订阅处理程序
public class SubscriptionHandler : IDisposable
{
private readonly List<IObserver<int>> _listOfObservers;
private readonly IObserver<int> _currentObserver;
public SubscriptionHandler(List<IObserver<int>> currentListOfObservers, IObserver<int> currentObserver)
{
_listOfObservers = currentListOfObservers;
_currentObserver = currentObserver;
}
public void Dispose()
{
if (_currentObserver != null && _listOfObservers.Contains(_currentObserver))
{
_listOfObservers.Remove(_currentObserver);
}
}
}
这是 Observable 的代码
public class MyObservable : IObservable<int>
{
private List<IObserver<int>> _listOfSubscribedObservers = new List<IObserver<int>>();
public IDisposable Subscribe(IObserver<int> observer)
{
if (!_listOfSubscribedObservers.Contains(observer))
{
_listOfSubscribedObservers.Add(observer);
}
Task.Run(() =>
{
for (int i = 0; i < 5; i++)
{
Thread.Sleep(1000);
observer.OnNext(i);
}
observer.OnCompleted();
});
return new SubscriptionHandler(_listOfSubscribedObservers, observer);
}
}
我有一种感觉,我错过了一些东西。必须有一种内置的方法来为手工制作的 Observable 返回一个有意义的 Disposable 或者这是只有 Observable 创建辅助方法才有的东西?
解决方案
我应该明确指出,所有这些都是 Rx 设计内部的演示。您可以查看 classes AnonymousObservable<T>
、
AnonymousObserver<T>
和AnonymousDisposable
,这是框架的工作方式。很直接。但是,您几乎不应该使用任何此类代码,而应使用Disposable.Create
和之类的东西Observable.Create
。如果您正在实施IObservable
,那么您几乎肯定做错了。
这是基本思想:observable 需要产生一个IDisposable
从 observable 的内部观察者列表中删除相关观察者的方法。您的代码(错误地)从内部列表中删除了所有观察者。
这是一个基本的一次性用品,可以很容易地在功能上创建。使用此代码,GenericDisposable.Create
与Disposable.Create(Action a)
.
public class GenericDisposable : IDisposable
{
public static IDisposable Create(Action disposeAction)
{
return new GenericDisposable(disposeAction);
}
private readonly Action _disposeAction;
public GenericDisposable(Action disposeAction)
{
_disposeAction = disposeAction;
}
public void Dispose()
{
_disposeAction();
}
}
...这是一个可观察的实现示例:
public class SendIntMessages : IObservable<int>
{
private readonly HashSet<IObserver<int>> _observers = new HashSet<IObserver<int>>();
protected void OnNext(int i)
{
foreach (var o in _observers)
o.OnNext(i);
}
protected void OnError(Exception e)
{
foreach (var o in _observers)
o.OnError(e);
}
protected void OnCompleted()
{
foreach (var o in _observers)
o.OnCompleted();
}
public void SendIntMessage(int i)
{
OnNext(i);
}
public void EndStream()
{
OnCompleted();
}
public void SendError(Exception e)
{
OnError(e);
}
public IDisposable Subscribe(IObserver<int> observer)
{
_observers.Add(observer);
return GenericDisposable.Create(() => _observers.Remove(observer));
}
}
这是一个长期运行的、热的可观察的。它跟踪它的观察者,并且一次性取消订阅它们。
对比一下这个可观察的:
public class CountTo5 : IObservable<int>
{
public IDisposable Subscribe(IObserver<int> observer)
{
observer.OnNext(1);
observer.OnNext(2);
observer.OnNext(3);
observer.OnNext(4);
observer.OnNext(5);
return GenericDisposable.Create(() => {});
}
}
这是一个立即运行的“冷”可观察对象。中间没有办法取消订阅:当你拿到一次性用品时,observable 已经结束了。
Disposable.Empty
是 . 的简单简写DisposableCreate(() => {})
。
推荐阅读
- shell - 如何测试是否存在与 shell 中环境 yml 文件的确切包描述匹配的 conda 环境?
- mysql - 如何按聚合列过滤非聚合查询结果?
- javascript - 需要一些想法来在 React Native 应用程序中实现异步存储以保持用户登录应用程序
- java - 更新和/或删除时违反 Hibernate H2 参照完整性约束
- scala - 如何在 Play 框架中进行 Flyway 迁移
- ios - 如何处理 FittedBox 中的动态图像
- python - 使用元组从 VBA 列表中删除重复坐标
- php - 在 Symfony 中使用 Doctrine 创建实体时出错
- c++ - 找出有多少个数是完美平方并且 sqrt() 是 L、R 范围内的质数
- python - 如何在 python 函数中使用可选的命令行参数?