首页 > 解决方案 > “无限”异步并行 foreach 循环

问题描述

我有一个List<string>包含 50K 到 100K 字的

我想以并行异步的方式迭代它

例如,我可以使用

while (true)
{
   Parallel.ForEach(words, new ParallelOptions { MaxDegreeOfParallelism = 100 }, ...)
}

但问题是:

  1. Parallel.ForEach不是异步的
  2. 当我们到达列表的末尾时,我们必须等待每个线程结束,然后while (true)语句才能继续
  3. 这意味着并不总是有100 个线程在运行,这就是我想要的

我怎么能做到这一点?

请让我知道这是否令人困惑,或者我是否不擅长解释事情。

标签: c#asynchronousparallel-processingasync-await

解决方案


这是一个完全人为设计的async友好TPL DataFlow示例,说明如何实现您的要求。

  1. 它适用于异步 IO 工作负载
  2. 是可以取消的
  3. 它限制了最大并行度
  4. 它有有限的容量,所以总是有 100 个工作可用
  5. 是无限的

给定

private static CancellationTokenSource _cs;
private static CancellationToken _token;
private static ActionBlock<string> _block;

private static async Task MethodAsync(string something)
{
   // Your async workload
}

public static async Task EndlessRunner(string[] someArray)
{
   try
   {
      var index = 0;
      while (!_token.IsCancellationRequested)
      {
         await _block.SendAsync(someArray[index],_token);
         if (++index >= someArray.Length) index = 0;
      }
   }
   catch (OperationCanceledException)
   {
      Console.WriteLine("Cancelled");
   }
}

例子

private static async Task Main()
{
   _cs = new CancellationTokenSource();
   _token = _cs.Token;

   _block = new ActionBlock<string>(
      MethodAsync, 
      new ExecutionDataflowBlockOptions()
      {
         EnsureOrdered = false,
         MaxDegreeOfParallelism = 100,
         BoundedCapacity = 100,
         CancellationToken = _cs.Token,
         SingleProducerConstrained = true
      });

   var someList = Enumerable
      .Range(0,5000)
      .Select(I => $"something {I}")
      .ToArray();

   Task.Run(() => EndlessRunner(someList));

   Console.ReadKey();

   _cs.Cancel();

   _block.Complete();
   await _block.Completion;

}

推荐阅读