首页 > 解决方案 > EventHub ForEach 并行异步

问题描述

总是设法让自己与异步工作混淆,我在这里进行了一些验证/确认,我正在做我认为我在以下情况下正在做的事情..

给出以下简单的例子:

// pretend / assume these are json msgs or something ;)
var strEvents = new List<string> { "event1", "event2", "event3" };

我可以将每个事件发布到 eventthub,如下所示:

foreach (var e in strEvents)
{
    // Do some things
    outEventHub.Add(e); // ICollector
}

foreach 将在单个线程上运行,并按顺序执行内部的每一件事.. 我猜到 eventthub 的发布也将保持在同一个线程上?

将 ICollector 更改为 IAsyncCollector,并实现以下目标:

foreach (var e in strEvents)
{
    // Do some things
    await outEventHub.AddAsync(e);
}

我想我在这里是说 foreach 将在单个线程上运行,实际发送到事件中心将被推迟到其他地方?或者至少不阻止同一个线程..

更改为 Parallel.ForEach 事件,因为这些事件一次将到达 100+ 左右:

 Parallel.ForEach(events, async (e) =>
 {
      // Do some things
      await outEventHub.AddAsync(e);
 });

现在开始变得有点朦胧,因为我不确定现在到底发生了什么...... afaik 每个事件都有它自己的线程(在硬件范围内)并且该线程中的步骤不会阻止它.. 所以抛开这个琐碎的例子。

最后,我可以将它们全部转换为我认为的任务..

 private static async Task DoThingAsync(string e, IAsyncCollector<string> outEventHub)
 {
      await outEventHub.AddAsync(e);
 }

 var t = new List<Task>();

 foreach (var e in strEvents)
 {
      t.Add(DoThingAsync(e, outEventHub));
 }

 await Task.WhenAll(t);

现在我真的很朦胧,我认为这是在一个线程上准备所有内容..然后在任何可用线程上同时运行所有内容?

我很欣赏,为了确定哪个适合手头的工作,需要进行基准测试......但是现在解释框架在每种情况下所做的事情对我来说非常有帮助......

标签: c#multithreadingasync-await

解决方案


并行!=异步

这是这里的主要思想。两者各有各的用途,可以一起使用,但是差别很大。您的假设大多是正确的,但让我澄清一下:

简单的 foreach

是非并行非异步的。没什么好谈的。

在 foreach 中等待

是非并行的异步代码。

foreach (var e in strEvents)
{
    // Do some things
    await outEventHub.AddAsync(e);
}

这一切都将发生在一个线程上。它需要一个事件,开始将它添加到您的事件中心,并且在它完成时(我猜它会执行某种网络 IO),它会将线程交还给线程池(或 UI,如果它在UI 线程),因此它可以在等待AddAsync返回时执行其他工作。但正如你所说,它根本不是平行的。

并行 Foreach(异步)

这是一个陷阱!简而言之,Parallel.Foreach专为同步工作负载而设计。我们将回到这一点,但首先让我们假设您将它与非异步代码一起使用。

并行 foreach(同步)

又名并行但不是异步的。

Parallel.ForEach(events, (e) =>
 {
      // Do some things
      outEventHub.Add(e);
 });

每个项目都会有自己的“任务”,但它们不会产生线程。创建线程的成本很高,在最佳情况下,线程数超过 CPU 内核数是没有意义的。相反,这些任务在ThreadPool上运行,该 ThreadPool的线程数与最佳值一样多。每个线程接受一个任务,处理它,然后再接受另一个,等等。

你可以把它想象成 - 在一台 4 核机器上 - 有 4 个工作人员围绕着一堆任务,因此一次运行其中 4 个。您可以想象这在 IO 绑定工作负载(这很可能是)的情况下并不理想。如果您的网络很慢,您可以阻止所有 4 个线程尝试发送事件,而它们可能正在做有用的工作。这导致我们...

任务

异步且可能并行(取决于使用情况)。

您的描述在这里也是正确的,除了 ThreadPool,它一次(在主线程上)启动所有任务,然后在池的线程上运行。当它们运行时,主线程被释放,然后可以根据需要执行其他工作。到目前为止,情况与此相同Parallel.Foreach。但:

发生的情况是 TaskPool 线程拿起一个任务,进行必要的预处理,然后异步发出网络请求。这意味着此任务在等待网络时不会阻塞,而是释放ThreadPool线程以获取另一个工作项。当网络请求完成时,任务继续(网络请求之后的剩余代码行)被安排回任务列表。

您可以看到理论上这是最有效的过程,速度如此之快,以至于您必须小心不要淹没您的网络。

回到 Parallel.Foreach 和异步

此时,您应该能够发现问题。你的 async lambdaasync (e) => { await outEventHub.AddAsync(e);}所做的只是开始工作,它会在到达await. (请记住,async/await 在等待时释放线程。)Parallel.Foreach在启动所有线程后立即返回。但是没有什么在等待这些任务!这些变成了火然后忘记,这通常是一种不好的做法。就像您await Task.WhenAll从任务示例中删除了调用一样。

我希望这为您清除了大部分问题,如果没有,请告诉我要改进的地方。


推荐阅读