首页 > 解决方案 > 在python中向量化一个6 for循环累积和

问题描述

数学问题是:

在此处输入图像描述

总和中的表达式实际上比上面的表达式复杂得多,但这是一个最小的工作示例,不会使事情过于复杂。我使用 6 个嵌套的 for 循环在 Python 中编写了它,并且正如预期的那样,它的性能非常糟糕(真正的形式性能很差,需要评估数百万次),即使在 Numba、Cython 和朋友的帮助下也是如此。这里它是使用嵌套的 for 循环和累积和编写的:

import numpy as np

def func1(a,b,c,d):
    '''
    Minimal working example of multiple summation
    '''
    B = 0
    for ai in range(0,a):
        for bi in range(0,b):
            for ci in range(0,c):
                for di in range(0,d):
                    for ei in range(0,ai+bi):
                        for fi in range(0,ci+di):
                            B += (2)**(ei-fi-ai-ci-di+1)*(ei**2-2*(ei*fi)-7*di)*np.math.factorial(ei)


    return a, b, c, d, B

表达式由 4 个数字作为输入控制,func1(4,6,3,4)输出为B21769947.844726562。

我环顾四周寻求帮助,并找到了几个 Stack 帖子,其中一些示例是:

NumPy 中的外部产品:向量化六个嵌套循环

在具有不同数组形状的 Python/Numpy 中矢量化三重 for 循环

Python矢量化嵌套for循环

我尝试使用从这些有用的帖子中学到的东西,但经过多次尝试,我一直得到错误的答案。即使对一个内部总和进行矢量化,也会为真正的问题带来巨大的性能提升,但总和范围不同的事实似乎让我失望。有人对如何取得进展有任何提示吗?

标签: pythonnumpyfor-loopvectorization

解决方案


编辑 3:

最终(我认为)版本,结合了max9111 答案中的想法,更加简洁和快速。

import numpy as np
from numba import as nb

@nb.njit()
def func1_jit(a, b, c, d):
    # Precompute
    exp_min = 5 - (a + b + c + d)
    exp_max = b
    exp = 2. ** np.arange(exp_min, exp_max + 1)
    fact_e = np.empty((a + b - 2))
    fact_e[0] = 1
    for ei in range(1, len(fact_e)):
        fact_e[ei] = ei * fact_e[ei - 1]
    # Loops
    B = 0
    for ai in range(0, a):
        for bi in range(0, b):
            for ci in range(0, c):
                for di in range(0, d):
                    for ei in range(0, ai + bi):
                        for fi in range(0, ci + di):
                            B += exp[ei - fi - ai - ci - di + 1 - exp_min] * (ei * ei - 2 * (ei * fi) - 7 * di) * fact_e[ei]
    return B

这已经比以前的任何选项都快了,但我们仍然没有利用多个 CPU。一种方法是在函数本身内,例如并行化外循环。这会在每次调用创建线程时增加一些开销,因此对于较小的输入实际上会慢一些,但对于较大的值应该会明显更快:

import numpy as np
from numba import as nb

@nb.njit(parallel=True)
def func1_par(a, b, c, d):
    # Precompute
    exp_min = 5 - (a + b + c + d)
    exp_max = b
    exp = 2. ** np.arange(exp_min, exp_max + 1)
    fact_e = np.empty((a + b - 2))
    fact_e[0] = 1
    for ei in range(1, len(fact_e)):
        fact_e[ei] = ei * fact_e[ei - 1]
    # Loops
    B = np.empty((a,))
    for ai in nb.prange(0, a):
        Bi = 0
        for bi in range(0, b):
            for ci in range(0, c):
                for di in range(0, d):
                    for ei in range(0, ai + bi):
                        for fi in range(0, ci + di):
                            Bi += exp[ei - fi - ai - ci - di + 1 - exp_min] * (ei * ei - 2 * (ei * fi) - 7 * di) * fact_e[ei]
        B[ai] = Bi
    return np.sum(B)

或者,如果您有很多点要评估函数,您也可以在该级别进行并行化。这里a_arrb_arr和是要评估函数的值的向量c_arrd_arr

from numba import as nb

@nb.njit(parallel=True)
def func1_arr(a_arr, b_arr, c_arr, d_arr):
    B_arr = np.empty((len(a_arr),))
    for i in nb.prange(len(B_arr)):
        B_arr[i] = func1_jit(a_arr[i], b_arr[i], c_arr[i], d_arr[i])
    return B_arr

最佳配置取决于您的输入、使用模式、硬件等,因此您可以结合不同的想法来适应您的情况。


编辑2:

其实,忘记我之前说的话。最好的办法是对算法进行 JIT 编译,但以更有效的方式。首先计算昂贵的部分(我采用指数和阶乘),然后将其传递给编译后的循环函数:

import numpy as np
from numba import njit

def func1(a, b, c, d):
    exp_min = 5 - (a + b + c + d)
    exp_max = b
    exp = 2. ** np.arange(exp_min, exp_max + 1)
    ee = np.arange(a + b - 2)
    fact_e = scipy.special.factorial(ee)
    return func1_inner(a, b, c, d, exp_min, exp, fact_e)

@njit()
def func1_inner(a, b, c, d, exp_min, exp, fact_e):
    B = 0
    for ai in range(0, a):
        for bi in range(0, b):
            for ci in range(0, c):
                for di in range(0, d):
                    for ei in range(0, ai + bi):
                        for fi in range(0, ci + di):
                            B += exp[ei - fi - ai - ci - di + 1 - exp_min] * (ei * ei - 2 * (ei * fi) - 7 * di) * fact_e[ei]
    return B

在我的实验中,这是迄今为止最快的选择,并且几乎不需要额外的内存(仅预先计算的值,输入的大小是线性的)。

a, b, c, d = 4, 6, 3, 4
# The original function
%timeit func1_orig(a, b, c, d)
# 2.07 ms ± 33.7 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)
# The grid-evaluated function
%timeit func1_grid(a, b, c, d)
# 256 µs ± 25 µs per loop (mean ± std. dev. of 7 runs, 10000 loops each)
# The precompuation + JIT-compiled function
%timeit func1_jit(a, b, c, d)
# 19.6 µs ± 3.25 µs per loop (mean ± std. dev. of 7 runs, 100000 loops each)

好吧,总是可以选择对整个事物进行网格评估:

import numpy as np
import scipy.special

def func1(a, b, c, d):
    ai, bi, ci, di, ei, fi = np.ogrid[:a, :b, :c, :d, :a + b - 2, :c + d - 2]
    # Compute
    B = (2.) ** (ei - fi - ai - ci - di + 1) * (ei ** 2 - 2 * (ei * fi) - 7 * di) * scipy.special.factorial(ei)
    # Mask out of range elements for last two inner loops
    m = (ei < ai + bi) & (fi < ci + di)
    return np.sum(B * m)

print(func1(4, 6, 3, 4))
# 21769947.844726562

我之所以使用,是因为出于某种原因scipy.special.factorial显然np.factorial不适用于数组。

显然,随着参数的增加,内存成本会增长得非常快。代码实际上执行了比必要更多的计算,因为两个内部循环具有不同数量的迭代,因此(在此方法中)您必须使用最大的,然后删除不需要的。希望矢量化能够弥补这一点。一个小的 IPython 基准测试:

a, b, c, d = 4, 6, 3, 4
# func1_orig is the original loop-based version
%timeit func1_orig(a, b, c, d)
# 2.9 ms ± 110 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)
# func1 here is the vectorized version
%timeit func1(a, b, c, d)
# 210 µs ± 6.34 µs per loop (mean ± std. dev. of 7 runs, 1000 loops each)

编辑:

请注意,以前的方法也不是全有或全无的事情。您可以选择仅对某些循环进行网格评估。例如,两个最里面的循环可以像这样被矢量化:

def func1(a, b, c, d):
    B = 0
    e = np.arange(a + b - 2).reshape((-1, 1))
    f = np.arange(c + d - 2)
    for ai in range(0, a):
        for bi in range(0, b):
            ei = e[:ai + bi]
            for ci in range(0, c):
                for di in range(0, d):
                    fi = f[:ci + di]
                    B += np.sum((2.) ** (ei - fi - ai - ci - di + 1) * (ei ** 2 - 2 * (ei * fi) - 7 * di) * scipy.special.factorial(ei))
    return B

这仍然有循环,但它确实避免了额外的计算,并且内存要求要低得多。哪个最好取决于我猜的输入的大小。在我的测试中,使用原始值 (4, 6, 3, 4) 这比原始函数还要慢;ei此外,在这种情况下,为每个循环创建新数组似乎fi比在预先创建的切片上操作更快。但是,如果将输入乘以 4 (14, 24, 12, 16),那么这比原来的速度快得多(大约 x5),尽管仍然比完全矢量化的速度(大约 x3)慢。另一方面,我可以用这个(大约 5 分钟)计算输入缩放十(40、60、30、40)的值,但由于内存而不是前一个(我没有测试如何使用原始功能需要很长时间)。使用@numba.jitnopython有点帮助,虽然不是很大(由于阶乘函数而不能使用)。您可以根据输入的大小尝试对更多或更少的循环进行矢量化。


推荐阅读