首页 > 解决方案 > 反应式扩展 Observerable.FromAsync:如何等待异步操作完成

问题描述

在我的应用程序中,我从消息总线(rabbit mq,但实际上并不重要)获取事件。此事件的处理需要相当长的时间,并且这些事件是突发产生的。一次只能处理一个事件。为了实现这一点,我使用 Rx 来序列化事件并异步执行处理,以免阻塞生产者。

这是(简化的)示例代码:

static void Main(string[] args)
{
   var input = Observable.Interval(TimeSpan.FromMilliseconds(500));
   var subscription = input.Select(i => Observable.FromAsync(ct => Process(ct, i)))
                           .Concat()
                           .Subscribe();

   Console.ReadLine();
   subscription.Dispose();
   // Wait until finished?
   Console.WriteLine("Completed");
}

private static async Task Process(CancellationToken cts, long i)
{
   Console.WriteLine($"Processing {i} ...");
   await Task.Delay(1000).ConfigureAwait(false);
   Console.WriteLine($"Finished processing {i}");
}

示例应用程序正在处理订阅,然后应用程序被终止。但在此示例中,应用程序正在终止,而订阅前收到的最后一个事件仍在处理中。

问题:在处理订阅后等待处理最后一个事件的最佳方法是什么?我仍在尝试围绕异步 Rx 的东西。我认为有一种相当简单的方法可以做到这一点,但我现在还没有看到。

标签: c#system.reactive

解决方案


如果您对一个快速简单的解决方案感到满意,它适用于像这样的简单案例,那么您可以只是WaitIObservable不是订阅它。如果您想接收类似订阅的通知,请Do在 final 之前使用运算符Wait

static void Main(string[] args)
{
    var input = Observable.Interval(TimeSpan.FromMilliseconds(500));
    input.Select(i => Observable.FromAsync(ct => Process(ct, i)))
        .Concat()
        .Do(onNext: x => { }, onError: ex => { }, onCompleted: () => { })
        .Wait();
    Console.WriteLine("Completed");
}

推荐阅读