首页 > 解决方案 > SemaphoreSlim 不会限制任务

问题描述

我创建了以下方法 TestThrottled 来尝试限制我的任务,但是当我调用 WhenAll 并且此方法都具有相同的经过时间时,它根本没有限制。我做错什么了吗?

    private static async Task<T[]> TestThrottled<T>(List<Task<T>> tasks, int maxDegreeOfParallelism)
    {
        var semaphore = new SemaphoreSlim(maxDegreeOfParallelism);
        var tasksParallelized = new List<Task<T>>();

        foreach (var task in tasks)
        {
            var taskParallelized = Task.Run(async () =>
            {
                try
                {
                    await semaphore.WaitAsync();

                    return await task;
                }
                finally
                {
                    semaphore.Release();
                }
            });
            tasksParallelized.Add(taskParallelized);
        }

        return await Task.WhenAll(tasksParallelized);
    }

    private static async Task<int> TestAsync()
    {
        await Task.Delay(1000);

        return 1;
    }

    static async Task Main(string[] args)
    {
        var sw = Stopwatch.StartNew();

        var tasks = new List<Task<int>>();
        var ints = new List<int>();

        for (int i = 0; i < 30; i++)
        {
            tasks.Add(TestAsync());
        }
        ints.AddRange(await TestThrottled(tasks, 1));

        Console.WriteLine($"{sw.ElapsedMilliseconds}, count: {ints.Count}");
        Console.ReadLine();
    }

标签: c#.netmultithreadingsemaphore

解决方案


另一种方法是使用TPL DataFlow,它已经拥有您需要的一切,并且可以在需要时满足更复杂的流水线操作,并且具有更多的可配置性。它还可以节省您卸载到另一个任务的时间,就像您的示例解决方案一样

private static async Task<IList<T>> TestThrottled<T>(IEnumerable<Func<Task<T>>> tasks, int maxDegreeOfParallelism)
{
   var options = new ExecutionDataflowBlockOptions() { EnsureOrdered = false, MaxDegreeOfParallelism = maxDegreeOfParallelism };

   var transform = new TransformBlock<Func<Task<T>>, T>(func => func.Invoke(), options);
   var outputBufferBlock = new BufferBlock<T>();

   transform.LinkTo(outputBufferBlock, new DataflowLinkOptions(){PropagateCompletion = true});

   foreach (var task in tasks)
      transform.Post(task);

   transform.Complete();
   await outputBufferBlock. Completion;

   outputBufferBlock.TryReceiveAll(out var result);

   return result;
}

推荐阅读