c# - 如何根据对象大小(或某些系统约束)动态控制并行度?
问题描述
让我们简化一下这个场景。有一台具有 16 GB RAM 和 4 个 CPU 内核的机器。给定一个具有不同大小的对象列表,例如[3,1,7,9,4,5,2]
,每个元素肯定需要基于它们的大小相应数量的 RAM,例如“1”将需要 1 GB RAM。
在 C# 中,使用Parallelism库(内置或第 3 方)并行处理此元素列表的最佳方法是什么?OutOfMemory
一种天真的策略可能是:
- 第一轮:选择[3,1,7]。还剩下一个核心,但如果使用“9”,程序将需要 20 GB RAM。所以现在让我们使用 3 个核心。
- 第二轮:如果先完成“3”,考虑“9”,但仍超过 16 GB RAM 容量(1+7+9 = 17)。所以,停下来等待。
- 第三轮:如果“7”完成,程序将继续执行“1”、“9”和“4”。
我不是算法和并行性方面的专家。所以我不能更具体地描述这个问题......任何帮助,链接,建议都非常感谢。我相信这个问题可能已经在其他地方解决了,我不需要重新发明轮子。
解决方案
您可以考虑使用Semaphore
可以CurrentCount
自动减少和增加超过 1 的专用工具,就像在这个问题中找到的一样。initialCount
您可以使用等于以 GB 为单位的可用内存(16)初始化此机制,并且Wait
/Release
它使用以 GB 为单位的每个对象的大小(1 到 16 之间)。这样一个对象只有在等待CurrentCount
它变得等于或大于它的大小后才能获取信号量。
要将这种机制合并到一个Parallel.ForEach
循环中,您可以创建一个延迟枚举,将Wait
信号量作为枚举的一部分,然后将此受限制的枚举作为并行循环的源提供。您应该注意的一个重要细节是Parallel.ForEach
通过使用配置禁用默认情况下使用的块分区EnumerablePartitionerOptions.NoBuffering
,否则Parallel.ForEach
可能会一次贪婪地枚举多个项目,从而干扰该算法的节流意图。
信号量应该在并行循环的主体内以finally
块的形式释放,与releaseCount
被处理对象的大小相同。
把所有东西放在一起:
var items = new[] { 3, 1, 7, 9, 4, 5, 2 };
const int availableMemory = 16; // GB
using var throttler = new SemaphoreManyFifo(availableMemory, availableMemory);
var throttledItems = items
.Select(item => { throttler.Wait(item); return item; });
var partitioner = Partitioner.Create(throttledItems,
EnumerablePartitionerOptions.NoBuffering);
var parallelOptions = new ParallelOptions()
{
MaxDegreeOfParallelism = Environment.ProcessorCount
};
Parallel.ForEach(partitioner, parallelOptions, item =>
{
try
{
Console.WriteLine($"{DateTime.Now:HH:mm:ss.fff} > Processing #{item}");
Thread.Sleep(item * 1000); // Simulate a CPU-bound operation
}
finally
{
throttler.Release(item);
Console.WriteLine($"{DateTime.Now:HH:mm:ss.fff} > Item #{item} completed");
}
});
注意:每个对象的大小不能超过initialCount
信号量的大小,否则这个算法会出错。请注意,上述SemaphoreManyFifo
实现不包括正确的参数验证。
推荐阅读
- c - 如何使用 UEFI 运行时服务?
- python - django-background-tasks 管理命令未在 AWS elasticbeanstalk 上运行
- flutter - 每次按下按钮时,如何将 ListTile 添加到列表中?
- ansible - 如何访问另一台服务器的 ansible 事实?
- bash - 用于从串行设备读取 N 字节二进制数据并将其保存到文件的 Bash 脚本
- bash - If/Elif 结构有什么问题吗?
- numpy - 我需要解决导入 NumPy 库的错误
- xml - 如何使用 Xquery 向元素节点插入多个属性?
- kubernetes - 如何在 kubernetes 中使用 statefulset 设置 pvc?
- python - python if + else if 语句