首页 > 解决方案 > 多处理:了解“块大小”背后的逻辑

问题描述

哪些因素决定了chunksize方法的最佳参数,例如multiprocessing.Pool.map()?该.map()方法似乎对其默认块大小使用任意启发式(如下所述);是什么激发了这种选择,是否有基于某些特定情况/设置的更周到的方法?

示例 - 说我是:

我的幼稚想法是给 24 名工人中的每人一个相同大小的块,即15_000_000 / 24625,000。大块应在充分利用所有工人的同时减少营业额/开销。但这似乎遗漏了为每个工人提供大批量的一些潜在缺点。这是一张不完整的照片,我错过了什么?


我的部分问题源于 if chunksize=None: both.map().starmap()call的默认逻辑.map_async(),如下所示:

def _map_async(self, func, iterable, mapper, chunksize=None, callback=None,
               error_callback=None):
    # ... (materialize `iterable` to list if it's an iterator)
    if chunksize is None:
        chunksize, extra = divmod(len(iterable), len(self._pool) * 4)  # ????
        if extra:
            chunksize += 1
    if len(iterable) == 0:
        chunksize = 0

背后的逻辑是divmod(len(iterable), len(self._pool) * 4)什么?这意味着块大小将更接近15_000_000 / (24 * 4) == 156_250. 乘以len(self._pool)4 的目的是什么?

这使得生成的块大小比我上面的“朴素逻辑”4 倍,这包括将可迭代的长度除以pool._pool.

最后,还有来自 Python 文档的这个片段.imap()进一步激发了我的好奇心:

chunksize参数与该方法使用的参数相同map() 。对于非常长的迭代,使用较大的值chunksize可以使作业完成比使用默认值 1 快得多


有帮助但有点太高级的相关答案:Python 多处理:为什么大块大小变慢?.

标签: pythonpython-3.xparallel-processingmultiprocessingpython-multiprocessing

解决方案


简答

Pool 的 chunksize-algorithm 是一种启发式算法。它为您尝试填充到 Pool 方法中的所有可以想象的问题场景提供了一个简单的解决方案。因此,它无法针对任何特定场景进行优化。

该算法将可迭代对象任意划分为大约四倍于幼稚方法的块。更多的块意味着更多的开销,但增加了调度的灵活性。这个答案将如何显示,这平均会导致更高的工作人员利用率,但不能保证每个案例的总计算时间更短。

“很高兴知道”,您可能会想,“但是知道这一点如何帮助我解决具体的多处理问题?” 好吧,它没有。更诚实的简短答案是“没有简短答案”、“多处理很复杂”和“取决于情况”。观察到的症状可能有不同的根源,即使对于类似的情况也是如此。

这个答案试图为您提供基本概念,帮助您更清楚地了解 Pool 的调度黑盒。它还尝试为您提供一些手头的基本工具,用于识别和避免与块大小相关的潜在悬崖。


目录

第一部分

  1. 定义
  2. 并行化目标
  3. 并行化场景
  4. Chunksize > 1 的风险
  5. 池的块大小算法
  6. 量化算法效率

    6.1 型号

    6.2 并行调度

    6.3 效率

    6.3.1 绝对分配效率(ADE)

    6.3.2 相对分配效率(RDE)

第二部分

  1. Naive vs. Pool 的 Chunksize-Algorithm
  2. 现实检查
  3. 结论

有必要首先澄清一些重要的术语。


1. 定义


这里的块是iterable池方法调用中指定的-argument 的一部分。如何计算块大小以及这会产生什么影响,是这个答案的主题。


任务

下图可以看到任务在工作进程中的数据物理表示。

图0

该图显示了对 的示例调用pool.map(),沿代码行显示,取自multiprocessing.pool.worker函数,其中从函数中读取的任务inqueue被解包。是池工作进程worker中的底层主要功能。MainThreadpool-method 中指定的func-argument 将仅匹配func-function 内的worker-variable 用于单次调用方法,如 apply_async和 for imapwith chunksize=1。对于带有chunksize-parameter 的池方法的其余部分,处理函数func将是映射器函数(mapstarstarmapstar)。此函数将用户指定的func参数映射到可迭代的传输块的每个元素(--> “map-tasks”)。所花费的时间,定义了一项任务也作为一个工作单位


塔斯克

虽然一个块的整个处理过程中“任务”一词的使用与 中的代码相匹配multiprocessing.pool,但没有迹象表明应该如何以块的一个元素作为参数对用户指定的单个调用func进行调用所指。为了避免命名冲突引起的混淆(想想maxtasksperchildPool 方法的 -parameter __init__),这个答案将把任务中的单个工作单元称为taskel

taskel (来自task +元素)是 task 中最小的工作单元。它是使用 - 方法的 -参数指定的函数的单次执行,使用从传输的单个元素获得的参数调用。一个任务taskels组成。funcPoolchunksize


并行化开销 (PO)

PO由 Python 内部开销和进程间通信 (IPC) 开销组成。Python 中的每个任务开销伴随着打包和解包任务及其结果所需的代码。IPC 开销伴随着必要的线程同步和不同地址空间之间的数据复制(需要两个复制步骤:父 -> 队列 -> 子)。IPC 开销的数量取决于操作系统、硬件和数据大小,这使得对影响的概括变得困难。


2. 并行化目标

使用多处理时,我们的总体目标(显然)是最小化所有任务的总处理时间。为了达到这个总体目标,我们的技术目标需要优化硬件资源的利用率

实现技术目标的一些重要子目标是:

  • 最小化并行化开销(最著名但并不孤单:IPC
  • 所有 CPU 核心的高利用率
  • 限制内存使用以防止操作系统过度分页(垃圾

首先,这些任务需要足够多的计算量(密集型),以赢回我们必须为并行化支付的 PO。PO 的相关性随着每个任务的绝对计算时间的增加而降低。或者,换句话说,您的问题的每个任务的绝对计算时间越大,减少 PO 的需求就越不相关。如果您的计算每个任务需要几个小时,那么 IPC 开销相比之下可以忽略不计。这里的主要关注点是防止在所有任务分发后空闲的工作进程。保持所有内核加载意味着,我们尽可能地并行化。


3. 并行化场景

哪些因素决定了 multiprocessing.Pool.map() 等方法的最佳块大小参数

有问题的主要因素是我们的单个任务可能会改变多少计算时间。顾名思义,最佳块大小的选择由每个任务的计算时间的变异系数( CV ) 决定。

根据这种变化的程度,规模上的两种极端情况是:

  1. 所有任务都需要完全相同的计算时间。
  2. 一个任务可能需要几秒钟或几天才能完成。

为了更好地记忆,我将这些场景称为:

  1. 密集场景
  2. 广泛的场景


密集场景

密集场景中,希望一次分发所有任务,以将必要的 IPC 和上下文切换保持在最低限度。这意味着我们只想创建尽可能多的块,尽可能多的工作进程。如上所述,PO 的权重随着每个任务的计算时间的缩短而增加。

为了获得最大吞吐量,我们还希望所有工作进程都处于忙碌状态,直到所有任务都处理完毕(没有空闲的工作进程)。为此目标,分布式块的大小应该相等或接近。


广泛的场景

Wide Scenario的主要示例是优化问题,其中结果要么快速收敛,要么计算可能需要数小时甚至数天。通常,在这种情况下,一个任务将包含哪些“轻任务”和“重任务”的混合是不可预测的,因此不建议一次在一个任务批次中分配太多任务。一次分配更少的任务意味着增加调度的灵活性。这是实现我们所有内核高利用率的子目标所必需的。

如果Pool默认情况下方法会针对密集场景进行完全优化,那么它们将越来越多地为靠近宽场景的每个问题创建次优时序。


4. Chunksize > 1 的风险

考虑一下Wide Scenario -iterable的简化伪代码示例,我们希望将其传递给池方法:

good_luck_iterable = [60, 60, 86400, 60, 86400, 60, 60, 84600]

我们假装以秒为单位查看所需的计算时间,而不是实际值,为简单起见,仅为 1 分钟或 1 天。我们假设池有四个工作进程(在四个核心上)并chunksize设置为2. 因为订单将被保留,所以发送给工作人员的块将是这些:

[(60, 60), (86400, 60), (86400, 60), (60, 84600)]

由于我们有足够的工人并且计算时间足够长,我们可以说,每个工人进程首先都会得到一个块来工作。(这不一定是快速完成任务的情况)。进一步我们可以说,整个处理大约需要 86400+60 秒,因为这是这个人工场景中一个块的最高总计算时间,我们只分配一次块。

现在考虑这个可迭代对象,与前一个可迭代对象相比,它只有一个元素切换其位置:

bad_luck_iterable = [60, 60, 86400, 86400, 60, 60, 60, 84600]

...以及相应的块:

[(60, 60), (86400, 86400), (60, 60), (60, 84600)]

不幸的是,我们的迭代排序几乎使我们的总处理时间翻了一番(86400+86400)!获得恶意 (86400, 86400) 块的工人正在阻止其任务中的第二个重型任务分配给已经完成 (60, 60) 块的空闲工人之一。如果我们设置,我们显然不会冒这样不愉快的结果的风险chunksize=1

这是更大块的风险。随着更大的块大小,我们用调度灵活性换取更少的开销,在上述情况下,这是一个糟糕的交易。

我们将在第6 章看到如何量化算法效率,更大的块大小也可能导致密集场景的次优结果。


5. Pool 的 Chunksize-Algorithm

下面您将在源代码中找到该算法的略微修改版本。如您所见,我将下部切掉并将其包装成一个用于从chunksize外部计算参数的函数。我还替换4了一个factor参数并将len()调用外包。

# mp_utils.py

def calc_chunksize(n_workers, len_iterable, factor=4):
    """Calculate chunksize argument for Pool-methods.

    Resembles source-code within `multiprocessing.pool.Pool._map_async`.
    """
    chunksize, extra = divmod(len_iterable, n_workers * factor)
    if extra:
        chunksize += 1
    return chunksize

为确保我们都在同一个页面上,下面是这样divmod做的:

divmod(x, y)是一个返回的内置函数(x//y, x%y)x // y是地板除法,从 中返回向下舍入的商x / yx % y而是从 中返回余数的模运算x / y。因此,例如divmod(10, 3)返回(3, 1)

现在,当您查看 时chunksize, extra = divmod(len_iterable, n_workers * 4),您会注意到n_workers这里是除数yx / y乘以4,稍后无需进一步调整if extra: chunksize +=1,导致初始块大小(对于 )至少len_iterable >= n_workers * 4比其他情况小四倍。

要查看乘法4对中间块大小结果的影响,请考虑以下函数:

def compare_chunksizes(len_iterable, n_workers=4):
    """Calculate naive chunksize, Pool's stage-1 chunksize and the chunksize
    for Pool's complete algorithm. Return chunksizes and the real factors by
    which naive chunksizes are bigger.
    """
    cs_naive = len_iterable // n_workers or 1  # naive approach
    cs_pool1 = len_iterable // (n_workers * 4) or 1  # incomplete pool algo.
    cs_pool2 = calc_chunksize(n_workers, len_iterable)

    real_factor_pool1 = cs_naive / cs_pool1
    real_factor_pool2 = cs_naive / cs_pool2

    return cs_naive, cs_pool1, cs_pool2, real_factor_pool1, real_factor_pool2

上面的函数计算了cs_naivePool 的 chunksize-algorithm ( cs_pool1) 的原始块大小 ( ) 和第一步块大小,以及完整的 Pool-algorithm ( cs_pool2) 的块大小。此外,它还计算实际因子 rf_pool1 = cs_naive / cs_pool1rf_pool2 = cs_naive / cs_pool2,它告诉我们天真计算的块大小比 Pool 的内部版本大多少倍。

下面您会看到使用此函数的输出创建的两个图形。左图只显示了块大小n_workers=4,直到可迭代长度为500. 右图显示 的值rf_pool1。对于 iterable length 16,实际因子变为>=4(for len_iterable >= n_workers * 4),它的最大值是7iterable lengths 28-314这与算法收敛到更长迭代的原始因子有很大的偏差。这里的“更长”是相对的,取决于指定工人的数量。

图1

请记住,chunksizecs_pool1仍然缺少完整算法中包含extra的余数的调整。divmodcs_pool2

算法继续:

if extra:
    chunksize += 1

现在,如果有余extra来自 divmod 操作),将块大小增加 1 显然无法解决每个任务。毕竟,如果可以的话,就没有剩余的开始了。

从下图中可以看出,“额外处理”的效果是,现在的真实因子下方rf_pool2收敛,偏差更平滑一些。标准差和从for下降到for 。4 4n_workers=4len_iterable=5000.5233rf_pool10.4115rf_pool2

图2

最终,增加chunksize1 的效果是,最后传输的任务的大小仅为len_iterable % chunksize or chunksize.

然而,对于生成n_chunks的数量(对于足够长的迭代,Pool 完成的 chunksize-algorithm(n_pool2在下图中)会将块的数量稳定在n_chunks == n_workers * 4. 相比之下,朴素算法(在初始打嗝之后)随着可迭代长度的增长n_chunks == n_workers而不断交替。n_chunks == n_workers + 1

图3

下面你会发现两个增强的信息功能池和天真的块大小算法。下一章将需要这些函数的输出。

# mp_utils.py

from collections import namedtuple


Chunkinfo = namedtuple(
    'Chunkinfo', ['n_workers', 'len_iterable', 'n_chunks',
                  'chunksize', 'last_chunk']
)

def calc_chunksize_info(n_workers, len_iterable, factor=4):
    """Calculate chunksize numbers."""
    chunksize, extra = divmod(len_iterable, n_workers * factor)
    if extra:
        chunksize += 1
    # `+ (len_iterable % chunksize > 0)` exploits that `True == 1`
    n_chunks = len_iterable // chunksize + (len_iterable % chunksize > 0)
    # exploit `0 == False`
    last_chunk = len_iterable % chunksize or chunksize

    return Chunkinfo(
        n_workers, len_iterable, n_chunks, chunksize, last_chunk
    )

不要被可能出乎意料的外观所迷惑calc_naive_chunksize_infoextrafromdivmod不用于计算块大小。

def calc_naive_chunksize_info(n_workers, len_iterable):
    """Calculate naive chunksize numbers."""
    chunksize, extra = divmod(len_iterable, n_workers)
    if chunksize == 0:
        chunksize = 1
        n_chunks = extra
        last_chunk = chunksize
    else:
        n_chunks = len_iterable // chunksize + (len_iterable % chunksize > 0)
        last_chunk = len_iterable % chunksize or chunksize

    return Chunkinfo(
        n_workers, len_iterable, n_chunks, chunksize, last_chunk
    )

6.量化算法效率

现在,在我们看到Pool's chunksize-algorithm 的输出与朴素算法的输出相比看起来如何不同之后......

  • 如何判断 Pool 的方法是否确实有所改进
  • 这究竟是什么

如前一章所示,对于更长的可迭代对象(更大数量的任务),Pool 的 chunksize-algorithm大约将可迭代对象划分为比 naive 方法四倍的块。更小的块意味着更多的任务,更多的任务意味着更多的并行化开销(PO),必须权衡增加调度灵活性的好处(回想“块大小> 1 的风险”)。

由于相当明显的原因,Pool 的基本块大小算法无法为我们权衡调度灵活性与PO。IPC 开销取决于操作系统、硬件和数据大小。该算法不知道我们在什么硬件上运行我们的代码,也不知道任务需要多长时间才能完成。它是一种启发式方法,为所有可能的场景提供基本功能。这意味着它无法针对任何特定场景进行优化。如前所述,随着每个任务的计算时间增加(负相关),PO也变得越来越不受关注。

当您回想起第 2 章中的并行化目标时,其中一个要点是:

  • 所有 CPU 核心的高利用率

前面提到的,Pool 的 chunksize-algorithm可以尝试改进的是 idling worker-processes 的最小,分别是 cpu-cores 的利用率

multiprocessing.Pool在您希望所有工作进程都忙的情况下,人们想知道未使用的核心/空闲工作进程的问题,关于 SO 的重复问题。虽然这可能有很多原因,但在计算结束时空闲的工作进程是我们经常可以观察到的,即使在密集场景(每个任务的计算时间相等)的情况下,工作人员的数量不是数量的除数块(n_chunks % n_workers > 0)。

现在的问题是:

我们如何才能将我们对块大小的理解转化为能够解释观察到的工人利用率的东西,或者甚至在这方面比较不同算法的效率?


6.1 型号

为了在这里获得更深入的见解,我们需要一种并行计算的抽象形式,它将过于复杂的现实简化到可管理的复杂程度,同时在定义的边界内保持重要性。这种抽象称为模型。如果要收集数据,这种“并行化模型”(PM)的实现会生成工作映射元数据(时间戳),就像实际计算一样。模型生成的元数据允许在某些约束下预测并行计算的指标。

图4

此处定义的PM中的两个子模型之一是分布模型 (DM)DM解释了原子工作单元(taskels)如何分布在并行工作人员和 time上,除了相应的块大小算法、工作人员数量、输入可迭代(taskels 数量)及其计算持续时间之外没有其他因素被考虑. 这意味着不包括任何形式的开销。

为了获得完整的PMDM扩展了开销模型 (OM),表示各种形式的并行化开销 (PO)。这样的模型需要为每个节点单独校准(硬件、操作系统依赖)。一个OM中有多少种形式的开销是开放的,因此可以存在具有不同复杂程度的多个OM 。实现的OM需要哪种准确度级别取决于特定计算的PO的总体权重。更短的任务导致更高的PO权重,这反过来又需要更精确的OM如果我们试图预测 并行化效率(PE)


6.2 并行调度(PS)

Parallel Schedule是并行计算的二维表示,其中 x 轴代表时间,y 轴代表并行工作池。工作人员的数量和总计算时间标志着一个矩形的延伸,其中绘制了较小的矩形。这些较小的矩形代表原子工作单元(taskels)。

在下面,您可以找到使用来自Pool 的块大小算法的DM的数据绘制的PS的可视化,用于Dense Scenario

图5

  • x 轴被划分为相等的时间单位,其中每个单位代表 taskel 所需的计算时间。
  • y 轴分为池使用的工作进程数。
  • 此处的任务显示为最小的青色矩形,放入匿名工作进程的时间线(时间表)中。
  • 任务是工作时间线中的一个或多个任务,以相同的色调连续突出显示。
  • 空闲时间单位通过红色瓷砖表示。
  • 并行计划分为多个部分。最后一部分是尾部。

组成部分的名称如下图所示。

图6

在包含OM的完整PM中,Idling Share不仅限于尾部,还包括任务之间甚至任务之间的空间。


6.3 效率

上面介绍的模型允许量化工人利用率。我们可以区分:

  • 分配效率 (DE) - 在DM (或密集场景的简化方法)的帮助下计算。
  • 并行化效率 (PE) - 借助校准的PM(预测)计算或根据实际计算的元数据计算。

需要注意的是,对于给定的并行化问题,计算出的效率不会自动与更快的整体计算相关联。在这种情况下,工人利用率仅区分具有已开始但未完成的任务的工人和没有这种“开放”任务的工人。这意味着,在 taskel 的时间跨度内可能的空闲没有记录

上述所有效率基本上都是通过计算除法Busy Share / Parallel Schedule的商获得的。DEPE之间的区别在于Busy Share 占开销扩展PM的整体并行计划的较小部分。

该答案将进一步仅讨论一种计算密集场景的DE的简单方法。这足以比较不同的块大小算法,因为......

  1. ... DMPM的一部分,它随着所采用的不同块大小算法而变化。
  2. ......每个任务的计算持续时间相等的密集场景描述了一个“稳定状态”,这些时间跨度不属于等式。任何其他情况只会导致随机结果,因为任务的顺序很重要。

6.3.1 绝对分配效率(ADE)

这个基本效率通常可以通过将繁忙份额除以并行计划的整个潜力来计算:

绝对分配效率 (ADE) = Busy Share / Parallel Schedule

对于Dense Scenario,简化的计算代码如下所示:

# mp_utils.py

def calc_ade(n_workers, len_iterable, n_chunks, chunksize, last_chunk):
    """Calculate Absolute Distribution Efficiency (ADE).

    `len_iterable` is not used, but contained to keep a consistent signature
    with `calc_rde`.
    """
    if n_workers == 1:
        return 1

    potential = (
        ((n_chunks // n_workers + (n_chunks % n_workers > 1)) * chunksize)
        + (n_chunks % n_workers == 1) * last_chunk
    ) * n_workers

    n_full_chunks = n_chunks - (chunksize > last_chunk)
    taskels_in_regular_chunks = n_full_chunks * chunksize
    real = taskels_in_regular_chunks + (chunksize > last_chunk) * last_chunk
    ade = real / potential

    return ade

如果没有Idling ShareBusy Share等于Parallel Schedule,因此我们得到 100% 的ADE。在我们的简化模型中,这是一个所有可用进程在处理所有任务所需的全部时间内都处于忙碌状态的场景。换句话说,整个作业被有效地并行化到 100%。

但是为什么我在这里一直将PE称为绝对 PE

为了理解这一点,我们必须考虑块大小(cs)的可能情况,以确保最大的调度灵活性(还有可能存在的 Highlander 的数量。巧合?):

_________________________________ ~ 一 ~ __________________________________

例如,如果我们有 4 个工作进程和 37 个任务,即使有 也会有空闲的工作人员chunksize=1,因为n_workers=4不是 37 的除数。除以 37 / 4 的余数是 1。这个剩余的任务必须是由一个工人处理,而其余三个处于闲置状态。

同样,仍然会有一名闲置的工人有 39 个任务,如下图所示。

图7

当你比较上面的Parallel Schedulechunksize=1下面的版本时chunksize=3,你会注意到上面的Parallel Schedule更小,x 轴上的时间线更短。现在应该很明显了,即使对于Dense Scenarios ,意外地更大的块大小也会导致整体计算时间增加。

但是为什么不直接使用 x 轴的长度来计算效率呢?

因为此模型中不包含开销。两种块大小都会有所不同,因此 x 轴并不是真正可直接比较的。开销仍然会导致更长的总计算时间,如下图的案例 2所示。

图8


6.3.2 相对分配效率(RDE)

如果将chunksize 设置为 1 可以更好地分配任务,则ADE值不包含信息。这里更好仍然意味着更小的Idling Share

为了获得针对最大可能DE调整的DE值,我们必须将考虑的ADE除以我们得到的ADEchunksize=1

相对分配效率 (RDE) = ADE_cs_x / ADE_cs_1

这是它在代码中的样子:

# mp_utils.py

def calc_rde(n_workers, len_iterable, n_chunks, chunksize, last_chunk):
    """Calculate Relative Distribution Efficiency (RDE)."""
    ade_cs1 = calc_ade(
        n_workers, len_iterable, n_chunks=len_iterable,
        chunksize=1, last_chunk=1
    )
    ade = calc_ade(n_workers, len_iterable, n_chunks, chunksize, last_chunk)
    rde = ade / ade_cs1

    return rde

RDE在这里如何定义,本质上是一个关于Parallel Schedule尾部的故事。RDE受尾部中包含的最大有效块大小的影响。(这条尾巴的长度可以是 x 轴长度chunksizelast_chunk。)这会导致RDE自然收敛到 100%(偶数),如下图所示。

图9

RDE ...

  • 是优化潜力的有力提示。
  • 对于较长的可迭代对象来说,自然变得不太可能,因为整个并行计划的相对尾部会缩小。

请在此处找到此答案的第二部分。


推荐阅读