c# - 假脱机由 Observable.FromEvent 生成的正在进行的项目
问题描述
我的目标是IObservable<T>
为未来的订阅者后台处理所有项目/通知。
例如,如果有人订阅消息流,首先他会收到订阅之前的所有消息。然后他开始接收新消息,只要有任何消息。这应该无缝地发生,不会在新旧消息之间的“边界”上重复和丢失。
我想出了以下扩展方法:
public static IObservable<T> WithHistory<T>(this IObservable<T> source)
{
var accumulator = new BlockingCollection<T>();
source.Subscribe(accumulator.Add);
return accumulator
.GetConsumingEnumerable()
.ToObservable()
.SubscribeOn(ThreadPoolScheduler.Instance);
}
据我测试,它有效:
class Generator<T>
{
event Action<T> onPush;
public IObservable<T> Items =>
Observable.FromEvent<T>(d => onPush += d, d => onPush -= d);
public void Push(T item) => onPush?.Invoke(item);
}
...
private static void Main()
{
var g = new Generator<int>();
var ongoingItems = g.Items;
var allItems = g.Items.WithHistory();
g.Push(1);
g.Push(2);
ongoingItems.Subscribe(x => Console.WriteLine($"Ongoing: got {x}"));
allItems.Subscribe(x => Console.WriteLine($"WithHistory: got {x}"));
g.Push(3);
g.Push(4);
g.Push(5);
Console.ReadLine();
}
结果:
Ongoing: got 3 Ongoing: got 4 Ongoing: got 5 WithHistory: got 1 WithHistory: got 2 WithHistory: got 3 WithHistory: got 4 WithHistory: got 5
然而,使用BlockingCollection<T>
似乎是一种矫枉过正。此外,上面的方法不支持完成、错误处理,如果没有.SubscribeOn(ThreadPoolScheduler.Instance)
.
有没有更好的方法来实现它,而没有描述的缺陷?
解决方案
最好的方法是使用.Replay()
void Main()
{
var g = new Generator<int>();
var ongoingItems = g.Items;
var allItems = g.Items.Replay().RefCount();
using(var tempSubscriber = allItems.Subscribe())
{
g.Push(1);
g.Push(2);
ongoingItems.Subscribe(x => Console.WriteLine($"Ongoing: got {x}"));
allItems.Subscribe(x => Console.WriteLine($"WithHistory: got {x}"));
g.Push(3);
g.Push(4);
g.Push(5);
Console.ReadLine();
}
}
.Replay().RefCount()
只要有订阅者,就会产生一个可观察的对象,它将保留一个内部队列以进行重播。如果你有一个持久的订阅者(就像你的解决方案在WithHistory
方法中所做的那样),你就会有内存泄漏。解决此问题的最佳方法是拥有一个临时订阅者,该订阅者会在您不再对历史记录感兴趣后自动断开连接。
推荐阅读
- python - 如何使用 Excel 文件将数据加载到 Service Now 以使用 Rest API Explorer 创建 RITM(请求项)?
- ubuntu - 在 mariadb 中运行 sql 语句时出现权限错误
- mongoose - 使用 Azure Function 从 Mongo Atlas 获取数据
- c++ - 是否可以在不引用“this”指针的情况下使用成员运算符?
- javascript - 屏幕尺寸更改时显示块忽略媒体查询更改?
- variables - 为什么我的重载函数不接受变量?该程序的目的是得到狗年在人类年中的结果
- firebase - 在 SwiftUI 中仅更新来自 Firebase 的新数据
- python - 尝试为“Not Now”按钮查找 xpath 时出现 Selenium 错误
- python - 如何使用 pandas 将数据框的多列更新为现有的 oracle 表
- algorithm - 缩放一个多边形以接触另一个多边形