首页 > 解决方案 > 如何根据对象大小(或某些系统约束)动态控制并行度?

问题描述

让我们简化一下这个场景。有一台具有 16 GB RAM 和 4 个 CPU 内核的机器。给定一个具有不同大小的对象列表,例如[3,1,7,9,4,5,2],每个元素肯定需要基于它们的大小相应数量的 RAM,例如“1”将需要 1 GB RAM。

在 C# 中,使用Parallelism库(内置或第 3 方)并行处理此元素列表的最佳方法是什么?OutOfMemory

一种天真的策略可能是:

  1. 第一轮:选择[3,1,7]。还剩下一个核心,但如果使用“9”,程序将需要 20 GB RAM。所以现在让我们使用 3 个核心。
  2. 第二轮:如果先完成“3”,考虑“9”,但仍超过 16 GB RAM 容量(1+7+9 = 17)。所以,停下来等待。
  3. 第三轮:如果“7”完成,程序将继续执行“1”、“9”和“4”。

我不是算法和并行性方面的专家。所以我不能更具体地描述这个问题......任何帮助,链接,建议都非常感谢。我相信这个问题可能已经在其他地方解决了,我不需要重新发明轮子。

标签: c#algorithmparallel-processingconstraintstask-parallel-library

解决方案


您可以考虑使用Semaphore可以CurrentCount自动减少和增加超过 1 的专用工具,就像在这个问题中找到的一样。initialCount您可以使用等于以 GB 为单位的可用内存(16)初始化此机制,并且Wait/Release它使用以 GB 为单位的每个对象的大小(1 到 16 之间)。这样一个对象只有在等待CurrentCount它变得等于或大于它的大小后才能获取信号量。

要将这种机制合并到一个Parallel.ForEach循环中,您可以创建一个延迟枚举,将Wait信号量作为枚举的一部分,然后将此受限制的枚举作为并行循环的源提供。您应该注意的一个重要细节是Parallel.ForEach通过使用配置禁用默认情况下使用的块分区EnumerablePartitionerOptions.NoBuffering,否则Parallel.ForEach可能会一次贪婪地枚举多个项目,从而干扰该算法的节流意图。

信号量应该在并行循环的主体内以finally块的形式释放,与releaseCount被处理对象的大小相同。

把所有东西放在一起:

var items = new[] { 3, 1, 7, 9, 4, 5, 2 };
const int availableMemory = 16; // GB
using var throttler = new SemaphoreManyFifo(availableMemory, availableMemory);
var throttledItems = items
    .Select(item => { throttler.Wait(item); return item; });
var partitioner = Partitioner.Create(throttledItems,
    EnumerablePartitionerOptions.NoBuffering);
var parallelOptions = new ParallelOptions()
{
    MaxDegreeOfParallelism = Environment.ProcessorCount
};
Parallel.ForEach(partitioner, parallelOptions, item =>
{
    try
    {
        Console.WriteLine($"{DateTime.Now:HH:mm:ss.fff} > Processing #{item}");
        Thread.Sleep(item * 1000); // Simulate a CPU-bound operation
    }
    finally
    {
        throttler.Release(item);
        Console.WriteLine($"{DateTime.Now:HH:mm:ss.fff} > Item #{item} completed");
    }
});

注意:每个对象的大小不能超过initialCount信号量的大小,否则这个算法会出错。请注意,上述SemaphoreManyFifo实现不包括正确的参数验证。


推荐阅读