c# - 反应式扩展 Observerable.FromAsync:如何等待异步操作完成
问题描述
在我的应用程序中,我从消息总线(rabbit mq,但实际上并不重要)获取事件。此事件的处理需要相当长的时间,并且这些事件是突发产生的。一次只能处理一个事件。为了实现这一点,我使用 Rx 来序列化事件并异步执行处理,以免阻塞生产者。
这是(简化的)示例代码:
static void Main(string[] args)
{
var input = Observable.Interval(TimeSpan.FromMilliseconds(500));
var subscription = input.Select(i => Observable.FromAsync(ct => Process(ct, i)))
.Concat()
.Subscribe();
Console.ReadLine();
subscription.Dispose();
// Wait until finished?
Console.WriteLine("Completed");
}
private static async Task Process(CancellationToken cts, long i)
{
Console.WriteLine($"Processing {i} ...");
await Task.Delay(1000).ConfigureAwait(false);
Console.WriteLine($"Finished processing {i}");
}
示例应用程序正在处理订阅,然后应用程序被终止。但在此示例中,应用程序正在终止,而订阅前收到的最后一个事件仍在处理中。
问题:在处理订阅后等待处理最后一个事件的最佳方法是什么?我仍在尝试围绕异步 Rx 的东西。我认为有一种相当简单的方法可以做到这一点,但我现在还没有看到。
解决方案
如果您对一个快速简单的解决方案感到满意,它适用于像这样的简单案例,那么您可以只是Wait
而IObservable
不是订阅它。如果您想接收类似订阅的通知,请Do
在 final 之前使用运算符Wait
:
static void Main(string[] args)
{
var input = Observable.Interval(TimeSpan.FromMilliseconds(500));
input.Select(i => Observable.FromAsync(ct => Process(ct, i)))
.Concat()
.Do(onNext: x => { }, onError: ex => { }, onCompleted: () => { })
.Wait();
Console.WriteLine("Completed");
}
推荐阅读
- node.js - 授权空端点 Bot Azure
- reactjs - redux 中的状态解构问题
- r - 为什么我的 for 循环在每次迭代中使用 rbind 后只返回 1 行?
- javascript - 通过js在html中复制一些
- java - 在 IntelliJ 中运行功能文件时缺少步骤
- linux - 更改系统时间时,Tkinter after() 失败
- gitlab - 是否可以在管道之间传输缓存/工件?
- soapui - 5.5 版本在 5.6 不工作的地方工作
- jquery - 使用 keyUp 密码确认时,样式仅在一个字段中起作用
- bootstrap-4 - Boostrap 4 - 徽标图像不展开标题