c# - 如何并行执行任务但每 T 秒不超过 N 个任务?
问题描述
我需要尽可能快地并行运行许多任务。但是如果我的程序每 1 秒运行超过 30 个任务,就会被阻塞。如何确保每 1 秒间隔运行的任务不超过 30 个?
换句话说,如果在最后 1 秒的时间间隔内完成了 30 个任务,我们必须阻止新任务启动。
我丑陋的可能解决方案:
private async Task Process(List<Task> taskList, int maxIntervalCount, int timeIntervalSeconds)
{
var timeList = new List<DateTime>();
var sem = new Semaphore(maxIntervalCount, maxIntervalCount);
var tasksToRun = taskList.Select(async task =>
{
do
{
sem.WaitOne();
}
while (HasAllowance(timeList, maxIntervalCount, timeIntervalSeconds));
await task;
timeList.Add(DateTime.Now);
sem.Release();
});
await Task.WhenAll(tasksToRun);
}
private bool HasAllowance(List<DateTime> timeList, int maxIntervalCount, int timeIntervalSeconds)
{
return timeList.Count <= maxIntervalCount
|| DateTime.Now.Subtract(TimeSpan.FromSeconds(timeIntervalSeconds)) > timeList[timeList.Count - maxIntervalCount];
}
解决方案
用户代码永远不必直接控制任务的调度方式。一方面,它不能控制任务的运行方式是TaskScheduler的工作。当用户代码调用.Start()
时,它只是将一个任务添加到线程池队列中执行。 await
执行已经执行的任务。
TaskScheduler 示例展示了如何创建有限的并发调度程序,但同样有更好的高级选项。
问题的代码无论如何都不会限制排队的任务,它限制了可以等待的任务数量。他们都已经在运行了。这类似于在管道中对先前的异步操作进行批处理,只允许有限数量的消息传递到下一个级别。
有延迟的动作块
简单的、开箱即用的方法是使用具有有限 MaxDegreeOfParallelism 的 ActionBlock,以确保最多可以同时运行 N 个并发操作。如果我们知道每个操作需要多长时间,我们可以增加一点延迟以确保我们不会超过油门限制。
在这种情况下,7 个并发工作人员每秒执行 4 个请求,总共每秒最多 28 个请求。这BoundedCapacity
意味着在downloader.SendAsync
块之前最多只能将 7 个项目存储在输入缓冲区中。这样,ActionBlock
如果操作花费太长时间,我们就可以避免泛滥。
var downloader = new ActionBlock<string>(
async url => {
await Task.Delay(250);
var response=await httpClient.GetStringAsync(url);
//Do something with it.
},
new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 7, BoundedCapacity=7 }
);
//Start posting to the downloader
foreach(var item in urls)
{
await downloader.SendAsync(item);
}
downloader.Complete();
await downloader.Completion;
带有 SemaphoreSlim 的 ActionBlock
另一种选择是将其与SemaphoreSlim
由计时器定期重置的 a 结合起来。
var refreshTimer = new Timer(_=>sm.Release(30));
var downloader = new ActionBlock<string>(
async url => {
await semaphore.WaitAsync();
try
{
var response=await httpClient.GetStringAsync(url);
//Do something with it.
}
finally
{
semaphore.Release();
}
},
new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 5, BoundedCapacity=5 }
);
//Start the timer right before we start posting
refreshTimer.Change(1000,1000);
foreach(....)
{
}
推荐阅读
- python - 如何堆叠多个图
- sql - SQL 将行数据放入列(长到宽格式)
- python - 使用 chess.uci 更改鳕鱼技能等级
- azure - 将错误从 Azure 数据工厂记录到 Application Insights(ADF 到 ApplicationInsights)
- mongodb - Elasticsearch 7.1:将 mongodb 聚合管道转换为弹性搜索查询
- delphi - 如何在 Delphi 中将“字符集”转换为“字节集”
- scheme - 作为标准,Scheme 是否有任何用于调用 let 的语法糖?
- python - 使用 Multiprocessing Poolmap 执行进程时,该进程未完成
- python - 在python中为极坐标图添加标签
- android - CameraX 多个后置摄像头