c# - EventHub ForEach 并行异步
问题描述
总是设法让自己与异步工作混淆,我在这里进行了一些验证/确认,我正在做我认为我在以下情况下正在做的事情..
给出以下简单的例子:
// pretend / assume these are json msgs or something ;)
var strEvents = new List<string> { "event1", "event2", "event3" };
我可以将每个事件发布到 eventthub,如下所示:
foreach (var e in strEvents)
{
// Do some things
outEventHub.Add(e); // ICollector
}
foreach 将在单个线程上运行,并按顺序执行内部的每一件事.. 我猜到 eventthub 的发布也将保持在同一个线程上?
将 ICollector 更改为 IAsyncCollector,并实现以下目标:
foreach (var e in strEvents)
{
// Do some things
await outEventHub.AddAsync(e);
}
我想我在这里是说 foreach 将在单个线程上运行,实际发送到事件中心将被推迟到其他地方?或者至少不阻止同一个线程..
更改为 Parallel.ForEach 事件,因为这些事件一次将到达 100+ 左右:
Parallel.ForEach(events, async (e) =>
{
// Do some things
await outEventHub.AddAsync(e);
});
现在开始变得有点朦胧,因为我不确定现在到底发生了什么...... afaik 每个事件都有它自己的线程(在硬件范围内)并且该线程中的步骤不会阻止它.. 所以抛开这个琐碎的例子。
最后,我可以将它们全部转换为我认为的任务..
private static async Task DoThingAsync(string e, IAsyncCollector<string> outEventHub)
{
await outEventHub.AddAsync(e);
}
var t = new List<Task>();
foreach (var e in strEvents)
{
t.Add(DoThingAsync(e, outEventHub));
}
await Task.WhenAll(t);
现在我真的很朦胧,我认为这是在一个线程上准备所有内容..然后在任何可用线程上同时运行所有内容?
我很欣赏,为了确定哪个适合手头的工作,需要进行基准测试......但是现在解释框架在每种情况下所做的事情对我来说非常有帮助......
解决方案
并行!=异步
这是这里的主要思想。两者各有各的用途,可以一起使用,但是差别很大。您的假设大多是正确的,但让我澄清一下:
简单的 foreach
这是非并行和非异步的。没什么好谈的。
在 foreach 中等待
这是非并行的异步代码。
foreach (var e in strEvents)
{
// Do some things
await outEventHub.AddAsync(e);
}
这一切都将发生在一个线程上。它需要一个事件,开始将它添加到您的事件中心,并且在它完成时(我猜它会执行某种网络 IO),它会将线程交还给线程池(或 UI,如果它在UI 线程),因此它可以在等待AddAsync
返回时执行其他工作。但正如你所说,它根本不是平行的。
并行 Foreach(异步)
这是一个陷阱!简而言之,Parallel.Foreach
专为同步工作负载而设计。我们将回到这一点,但首先让我们假设您将它与非异步代码一起使用。
并行 foreach(同步)
又名并行但不是异步的。
Parallel.ForEach(events, (e) =>
{
// Do some things
outEventHub.Add(e);
});
每个项目都会有自己的“任务”,但它们不会产生线程。创建线程的成本很高,在最佳情况下,线程数超过 CPU 内核数是没有意义的。相反,这些任务在ThreadPool上运行,该 ThreadPool的线程数与最佳值一样多。每个线程接受一个任务,处理它,然后再接受另一个,等等。
你可以把它想象成 - 在一台 4 核机器上 - 有 4 个工作人员围绕着一堆任务,因此一次运行其中 4 个。您可以想象这在 IO 绑定工作负载(这很可能是)的情况下并不理想。如果您的网络很慢,您可以阻止所有 4 个线程尝试发送事件,而它们可能正在做有用的工作。这导致我们...
任务
异步且可能并行(取决于使用情况)。
您的描述在这里也是正确的,除了 ThreadPool,它一次(在主线程上)启动所有任务,然后在池的线程上运行。当它们运行时,主线程被释放,然后可以根据需要执行其他工作。到目前为止,情况与此相同Parallel.Foreach
。但:
发生的情况是 TaskPool 线程拿起一个任务,进行必要的预处理,然后异步发出网络请求。这意味着此任务在等待网络时不会阻塞,而是释放ThreadPool线程以获取另一个工作项。当网络请求完成时,任务继续(网络请求之后的剩余代码行)被安排回任务列表。
您可以看到理论上这是最有效的过程,速度如此之快,以至于您必须小心不要淹没您的网络。
回到 Parallel.Foreach 和异步
此时,您应该能够发现问题。你的 async lambdaasync (e) => { await outEventHub.AddAsync(e);}
所做的只是开始工作,它会在到达await
. (请记住,async/await 在等待时释放线程。)Parallel.Foreach
在启动所有线程后立即返回。但是没有什么在等待这些任务!这些变成了火然后忘记,这通常是一种不好的做法。就像您await Task.WhenAll
从任务示例中删除了调用一样。
我希望这为您清除了大部分问题,如果没有,请告诉我要改进的地方。
推荐阅读
- mysql - 如何设计数据库以满足以下要求?
- ios - ios swift 5 MaterialTextFieldView 输入测试不起作用
- php - 如何将不同的值分组到单级数组 Laravel
- python - 如何在 MacBook 上同时运行两个 python 程序?
- javascript - 外部文件中的 JS 脚本:ReferenceError:未定义函数
- javascript - 过多的变量声明是否会对性能产生重大影响?
- kotlin - 如何在kotlin编程中将毫秒转换为时间戳
- excel - 导入数据格式已更改
- laravel - OrderBy 子句到 laravel 中雄辩的关系字段
- c# - 你能告诉我返回“System.Data.SqlClient.SqlException:'必须声明标量变量“@idorder”的 SqlDataReader 有什么问题吗