c# - C#:使用 Parallel.ForEach 和异步操作限制最大并发操作
问题描述
我正在尝试使用 asp.net core 2.1 实现自托管 Web 服务,但遇到了实现后台长时间执行任务的问题。
由于每种ProcessSingle
方法(在下面的代码片段中)的高 CPU 负载和时间消耗,我想限制同时执行的任务的数量。但是我可以看到所有任务Parallel.ForEach
几乎立即开始,尽管我设置了MaxDegreeOfParallelism = 3
我的代码是(这是一个简化版本):
public static async Task<int> Work()
{
var id = await CreateIdInDB() // async create record in DB
// run background task, don't wait when it finishes
Task.Factory.StartNew(async () => {
Parallel.ForEach(
listOfData,
new ParallelOptions { CancellationToken = token, MaxDegreeOfParallelism = 3 },
async x => await ProcessSingle(x));
});
// return created id immediately
return id;
}
public static async Task ProcessSingle(MyInputData inputData)
{
var dbData = await GetDataFromDb(); // get data from DB async using Dapper
// some lasting processing (sync)
await SaveDataToDb(); // async save processed data to DB using Dapper
}
如果我理解正确,问题出async x => await ProcessSingle(x)
在 Parallel.ForEach 内部,不是吗?
有人可以描述一下,它应该如何以正确的方式实施?
更新
由于我的问题存在某种歧义,因此有必要关注主要方面:
方法分为三部分
ProcessSingle
:从数据库异步获取数据
进行长时间高 CPU 负载的数学计算
将结果保存到数据库异步
问题包括两个独立的:
如何降低 CPU 使用率(例如同时运行不超过三个数学计算)?
如何保持
ProcessSingle
方法的结构 - 由于异步 DB 调用而使它们保持异步。
希望现在会更清楚。
PS 已经给出了合适的答案,它可以工作(特别感谢@MatrixTai)。编写此更新是为了进行一般说明。
解决方案
更新
正如我刚刚注意到您在评论中提到的那样,问题是由数学计算引起的。
将计算和更新数据库的部分分开会更好。
对于计算部分,使用Parallel.ForEach()
以便优化您的工作,您可以控制线程数。
只有在所有这些任务完成之后。用于async-await
将您的数据更新到数据库,无需SemaphoreSlim
我提及。
public static async Task<int> Work()
{
var id = await CreateIdInDB() // async create record in DB
// run background task, don't wait when it finishes
Task.Run(async () => {
//Calculation Part
ConcurrentBag<int> data = new ConcurrentBag<int>();
Parallel.ForEach(
listOfData,
new ParallelOptions { CancellationToken = token, MaxDegreeOfParallelism = 3 },
x => {ConcurrentBag.Add(calculationPart(x))});
//Update DB part
int[] data_arr = data.ToArray();
List<Task> worker = new List<Task>();
foreach (var i in data_arr)
{
worker.Add(DBPart(x));
}
await Task.WhenAll(worker);
});
// return created id immediately
return id;
}
当你async-await
在Parallel.forEach
.
首先,阅读第一个和第二个答案的这个问题。将这两者结合起来毫无意义。
实际上async-await
会最大限度地利用可用线程,所以简单地使用它。
public static async Task<int> Work()
{
var id = await CreateIdInDB() // async create record in DB
// run background task, don't wait when it finishes
Task.Run(async () => {
List<Task> worker = new List<Task>();
foreach (var i in listOfData)
{
worker.Add(ProcessSingle(x));
}
await Task.WhenAll(worker);
});
// return created id immediately
return id;
}
但是还有另一个问题,在这种情况下,这些任务仍然一起开始,消耗你的 CPU 使用率。
因此,为避免这种情况,请使用SemaphoreSlim
public static async Task<int> Work()
{
var id = await CreateIdInDB() // async create record in DB
// run background task, don't wait when it finishes
Task.Run(async () => {
List<Task> worker = new List<Task>();
//To limit the number of Task started.
var throttler = new SemaphoreSlim(initialCount: 20);
foreach (var i in listOfData)
{
await throttler.WaitAsync();
worker.Add(Task.Run(async () =>
{
await ProcessSingle(x);
throttler.Release();
}
));
}
await Task.WhenAll(worker);
});
// return created id immediately
return id;
}
阅读更多如何限制并发异步 I/O 操作的数量?.
此外,不要Task.Factory.StartNew()
在简单Task.Run()
就足以完成您想要的工作时使用,请阅读 Stephen Cleary 撰写的这篇出色的文章。
推荐阅读
- github - 在 GitHub Actions 中将步骤设置为超时成功
- java - JUnit 测试用例不适用于 JDBCTemplate
- sql-server - 升级到 WSL2 后映射卷时 MSSQL 容器无法启动
- c++ - 如何在 C++ 中编写多行并从文件中读取这些行?
- google-chrome-extension - Chrome 扩展程序:清单不是有效的 JSON。在第 22 行尾随逗号不允许
- c# - 添加链接的 appsettings.json
- c++ - 模板类中类的构造函数
- amazon-web-services - AWS CLI dynamodb 更新项
- nhibernate - Nhibnerate .net Core 3.1 初始化
- python - 使用 Python Schedule 时出现 ModuleNotFoundError