c# - 以最大并发运行多个操作 - 最后的 2 个任务未执行
问题描述
我创建了一个类,它允许我同时运行多个操作,并带有一个设置最大并发限制的选项。即,如果我有 100 个操作要做,并且我设置maxCurrency
为 10,那么在任何给定时间,最多 10 个操作应该同时运行。最终,所有的操作都应该被执行。
这是代码:
public async Task<IReadOnlyCollection<T>> Run<T>(IEnumerable<Func<CancellationToken, Task<T>>> operations, int maxConcurrency, CancellationToken ct)
{
using var semaphore = new SemaphoreSlim(maxConcurrency, maxConcurrency);
var results = new ConcurrentBag<T>();
var tasks = new List<Task>();
foreach (var operation in operations)
{
await semaphore.WaitAsync(ct).ConfigureAwait(false);
var task = Task.Factory.StartNew(async () =>
{
try
{
Debug.WriteLine($"Adding new result");
var singleResult = await operation(ct).ConfigureAwait(false);
results.Add(singleResult);
Debug.WriteLine($"Added {singleResult}");
}
finally
{
semaphore.Release();
}
}, ct);
tasks.Add(task);
}
await Task.WhenAll(tasks).ConfigureAwait(false);
Debug.WriteLine($"Completed tasks: {tasks.Count(t => t.IsCompleted)}");
Debug.WriteLine($"Calculated results: {results.Count}");
return results.ToList().AsReadOnly();
}
这是我如何使用它的示例:
var operations = Enumerable.Range(1, 10)
.Select<int, Func<CancellationToken, Task<int>>>(n => async ct =>
{
await Task.Delay(100, ct);
return n;
});
var data = await _sut.Run(operations, 2, CancellationToken.None);
每次执行此操作时,该data
集合只有 8 个结果。我希望有 10 个结果。
这是调试日志:
Adding new
Adding new
Added 1
Added 2
Adding new
Adding new
Added 3
Added 4
Adding new
Adding new
Added 5
Adding new
Added 6
Adding new
Added 7
Adding new
Added 8
Adding new
Completed tasks: 10
Calculated results: 8
如你看到的:
- 10个任务完成
- “添加新”记录了 10 次
- “添加 x”记录了 8 次
我不明白为什么最后 2 次操作没有完成。所有任务都IsComplete
设置为true
,据我了解,这应该意味着所有任务都已执行完毕。
解决方案
这里的问题是Task.Factory.StartNew
返回一个在等待时返回内部任务的任务。
它不会给你一个等待这个内部任务的任务,因此你的问题。
解决此问题的最简单方法是调用Unwrap
您创建的任务,这将打开内部任务并允许您等待。
这应该有效:
var task = ....
....
}, ct).Unwrap();
通过这个小改动,您将获得以下输出:
...
Added 9
Added 10
Completed tasks: 10
Calculated results: 10
请注意,我对您的问题的评论仍然有效:
- 您仍然在幻想 WhenAll 将等待所有任务,而实际上除了最后一个 N 之外的所有任务都已经完成,因为循环本身直到前面的任务完成后才会继续。因此,您应该将同步对象获取移动到您的内部任务中,以便您可以在开始等待它们之前将它们全部排队。
我也相信(尽管我不是 100%知道)使用 SemaphoreSlim 不是一个好方法,因为我相信任何与线程相关的同步对象在与任务相关的工作中使用都可能不安全。线程池中的线程被重用,而实时任务正在等待子任务完成,这意味着这样的线程可能已经拥有来自尚未完成的先前任务的同步对象,因此允许超过您想要运行的 2 个在“同时”。SemaphoreSlim 可以使用,其他同步原语可能不行。
推荐阅读
- dataframe - 使用缺失值初始化列并将数据框的另一列复制+转换为初始化列
- html - 联系表格和页脚样式问题
- jquery - 如何在wordpress中编写jquery
- c# - .net Core 中的 .ConfigureKestrel() 方法存在问题
- php - 登录时如何将导航菜单栏更改为“注销”
- node.js - 异步/等待,然后在我的情况下不起作用
- r - find n% of records in a variable in the data frame
- python - 在 repl.it 上保持 Discord Bot 在线
- python - 我怎样才能创建一个动作将与我的主要动作同时发生一段时间?
- c# - Identity signIn / PasswordSignIn 返回成功,但 @User.Identity.IsAuthenticated 返回 false