首页 > 解决方案 > 需要顺序运行等待子任务的并发处理(在控制台应用程序中)

问题描述

我正在尝试找到处理由需要按顺序执行的各个子任务组成的项目的并发处理的最佳方法(C# / .net 4.6)

即同时处理列表中的对象,但按顺序执行一系列(异步等待的)子任务——并且只运行所有操作完成后出现的代码。

伪代码:

public async Task SynchronizeItems() 
{
   List<Items> items = await client.RetrieveItems();
   foreach (var item in collection) // but in parallel
   {
      await item.DoThingA()
      await item.DoThingB()
      await item.DoThingC()
   }   
   // **only run code here when all sub tasks for all items are complete**
}

更大的上下文(简化形式):我有一个可执行文件,需要每 N 分钟作为计划任务运行。Program 的入口点/Main 方法初始化一个 ItemSyncService 并调用 SynchronizeItems()。因为 SynchronizeItems() 是异步的,所以当遇到第一个 await 时,整个过程会立即退出,因为控制权会返回到 Main 方法。

(简单地添加调用 SynchronizeItems().Wait() 是行不通的,因为这是一个简化的场景。实际上,调用层次结构非常复杂,程序集是动态加载的,方法是调用的等等。)

为了防止这种情况发生(基于我在 Stack Overflow 上看到的一篇文章),我添加了一个 ManualResetEvent,以便我可以手动控制“所有任务都完成”的时间。

static void Main(string[] args)
{
   ManualResetEvent completionEvent = new ManualResetEvent(false);
   _executor = new ItemService();
   _executor.SynchronizeItems(completionEvent)

   // wait for completion events to be set before exiting the method
   completionEvent.WaitOne()
}

同步方法如下所示:

public async Task SynchronizeItems(ManualResetEvent completionEvent) 
{
   List<Items> items = await client.RetrieveItems();
   foreach (var item in collection) // but in parallel
   {
      await item.DoThingA()
      await item.DoThingB()
      await item.DoThingC()
   }   
   // ** only run code here when all sub tasks for all items are complete**
   // signal completion
   completionEvent.Set()
}

将其更改为使用 Parallel ForEach 进行项目级并发如下所示:

public async Task SynchronizeItems(ManualResetEvent completionEvent) 
{
   List<Items> items = await client.RetrieveItems();
   Parallel.ForEach(items, async (item) => // in parallel now
   {
      await item.DoThingA()
      await item.DoThingB()
      await item.DoThingC()
   }   
   **// only run code here when all sub tasks for all items are complete **
   // signal completion
   completionEvent.Set() // ** this now runs immediately without waiting **
}

但是,完成此操作后,将在启动每个项目的任务后立即调用 completionEvent。

我找到了一个实现 ParallelForEachAsync ( https://github.com/Dasync/AsyncEnumerable )的第三方库。看起来这将阻止完成事件被设置,直到所有项目的所有等待任务都完成之后。

但我想知道我是否做错了?最初,当我写这篇文章时,我正在作为一个控制台应用程序进行测试,并且在交互模式下有一个 ReadKey(),所以我没有遇到与异步相关的进程退出问题。

标签: c#asynchronousasync-awaittask-parallel-library

解决方案


如果这些DoThings方法发出 I/O 请求而不是 CPU 绑定操作,那么您可以将项目处理移动到它自己的方法:

private async Task ProcessItem(Items item) {
    await item.DoThingA();
    await item.DoThingB();
    await item.DoThingC();
}

然后为每个对象构建一个对象列表Task,然后等待它们全部完成:

var taskList = new List<Task>();
foreach (var item in collection)
{
    taskList.Add(ProcessItem(item));
}
await Task.WhenAll(taskList);
// you will get here only when all the items are processed

当作await用于一个不完整的Task,它返回自己的不完整Task并且执行返回到调用方法。因此DoThingA(),例如,如果发出网络请求,则一旦发送该网络请求,执行就会返回到该SynchronizeItems方法并开始列表中的下一个。

所以这样做会立即开始一切,然后当回复回来时,事情就结束了。“完成”可能会或可能不会发生在单独的线程上。这取决于应用程序的类型。

  • 在没有同步上下文的应用程序(ASP.NET Core、控制台应用程序或 Windows 服务)中,每个都将在后台线程上完成。

  • 如果此应用程序确实有一个同步上下文(ASP.NET 或桌面应用程序),那么每个应用程序都会等到主线程空闲后再完成。如果您知道不需要上下文(例如,在 ASP.NET 中,您没有HttpContext在其中任何一个中使用,或者在 UI 应用程序中,您没有更改 UI),那么您可以使用ConfigureAwait(false)来告诉它你不需要返回到它开始的相同上下文,它会更快地完成:

private async Task ProcessItem(Items item) {
    await item.DoThingA().ConfigureAwait(false);
    await item.DoThingB().ConfigureAwait(false);
    await item.DoThingC().ConfigureAwait(false);
}

Microsoft 有一系列关于使用 async 和 await 进行异步编程的非常好的文章。查看左侧的目录以了解该部分中的其余文章。


推荐阅读