首页 > 解决方案 > 如何并行执行任务但每 T 秒不超过 N 个任务?

问题描述

我需要尽可能快地并行运行许多任务。但是如果我的程序每 1 秒运行超过 30 个任务,就会被阻塞。如何确保每 1 秒间隔运行的任务不超过 30 个?

换句话说,如果在最后 1 秒的时间间隔内完成了 30 个任务,我们必须阻止新任务启动。

我丑陋的可能解决方案:

private async Task Process(List<Task> taskList, int maxIntervalCount, int timeIntervalSeconds)
{
    var timeList = new List<DateTime>();

    var sem = new Semaphore(maxIntervalCount, maxIntervalCount);
    var tasksToRun = taskList.Select(async task =>
    {
        do
        {
            sem.WaitOne();
        }
        while (HasAllowance(timeList, maxIntervalCount, timeIntervalSeconds));

        await task;

        timeList.Add(DateTime.Now);

        sem.Release();
    });

    await Task.WhenAll(tasksToRun);
}

private bool HasAllowance(List<DateTime> timeList, int maxIntervalCount, int timeIntervalSeconds)
{
    return timeList.Count <= maxIntervalCount 
    || DateTime.Now.Subtract(TimeSpan.FromSeconds(timeIntervalSeconds)) > timeList[timeList.Count - maxIntervalCount];
}

标签: c#.netasynchronous.net-coretask-parallel-library

解决方案


用户代码永远不必直接控制任务的调度方式。一方面,它不能控制任务的运行方式是TaskScheduler的工作。当用户代码调用.Start()时,它只是将一个任务添加到线程池队列中执行。 await执行已经执行的任务。

TaskScheduler 示例展示了如何创建有限的并发调度程序,但同样有更好的高级选项。

问题的代码无论如何都不会限制排队的任务,它限制了可以等待的任务数量。他们都已经在运行了。这类似于在管道中对先前的异步操作进行批处理,只允许有限数量的消息传递到下一个级别。

有延迟的动作块

简单的、开箱即用的方法是使用具有有限 MaxDegreeOfParallelism 的 ActionBlock,以确保最多可以同时运行 N 个并发操作。如果我们知道每个操作需要多长时间,我们可以增加一点延迟以确保我们不会超过油门限制。

在这种情况下,7 个并发工作人员每秒执行 4 个请求,总共每秒最多 28 个请求。这BoundedCapacity意味着在downloader.SendAsync块之前最多只能将 7 个项目存储在输入缓冲区中。这样,ActionBlock如果操作花费太长时间,我们就可以避免泛滥。

var downloader = new ActionBlock<string>(
        async url => {
            await Task.Delay(250);
            var response=await httpClient.GetStringAsync(url);
            //Do something with it.
        },
        new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 7, BoundedCapacity=7 }
);

//Start posting to the downloader
foreach(var item in urls)
{
    await downloader.SendAsync(item);
}
downloader.Complete();
await downloader.Completion;

带有 SemaphoreSlim 的 ActionBlock

另一种选择是将其与SemaphoreSlim由计时器定期重置的 a 结合起来。

var refreshTimer = new Timer(_=>sm.Release(30));

var downloader = new ActionBlock<string>(
        async url => {
            await semaphore.WaitAsync();
            try 
            {
                var response=await httpClient.GetStringAsync(url);
                //Do something with it.
            }
            finally
            {
                semaphore.Release();
            }
        },
        new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 5, BoundedCapacity=5 }
);

//Start the timer right before we start posting 
refreshTimer.Change(1000,1000);
foreach(....)
{

}

推荐阅读