首页 > 解决方案 > 如何通过矢量化在 python 中加速这个 DP 函数

问题描述

所以我在这里有这个定义,

DP[i,j] = f[i,j] + min(DP[i−1, j −1], DP[i−1, j], DP[i−1, j +1])

它定义了从 NxM 矩阵顶部到矩阵底部的最小应计成本。f 中的每个单元格代表从另一个单元格前往该单元格的价值/成本(1.2、0、10 等)。

矩阵可能很大(1500x1500,它是图像的梯度图),我编写的 DP 算法每次运行我的矩阵大约需要一秒钟。该矩阵每次执行需要运行数百次,因此总程序运行时间长达几分钟。这个循环大约是我瓶颈的 99%,所以我正在尝试使用 Python/numpys 矢量化方法优化这个循环。我只能访问 Numpy 和 Scipy。

注意:我几乎不会在 python 中编程,所以解决方案可能只是显而易见的 idk。

第一次尝试,只是简单的循环,这里的时间大约是每次运行 2-2.5 秒

DP = f.copy()
for r in range(2, len(DP) - 1): # Start at row 2 since row one doesn't change
    for c in range(1, len(DP[0]) - 1):
        DP[r][c] += min(DP[r - 1, c-1:c+2])

第二次尝试,我尝试利用一些 numpy 矢量化函数“fromiter”一次计算整行而不是逐列计算,这里的时间约为每次运行 1-1.5 秒。我的目标是让这个速度至少快一个数量级,但我对如何优化它感到困惑。

DP = f.copy()
for r in range(2, len(DP) - 1):
    def foo(arr):
        idx, val = arr
        if idx == 0 or idx == len(DP[[0]) - 1:
            return np.inf
        return val + min(DP[r - 1, idx - 1], DP[r - 1, idx], DP[r - 1, idx + 1])


    DP[r, :] = np.fromiter(map(foo, enumerate(DP[r, :])))

标签: pythonnumpyscipyvectorization

解决方案


正如 hpaulj 所说,由于您的问题本质上是顺序的,因此很难完全矢量化,尽管这似乎是可能的(每个单元格都根据 row 的值进行更新r=2,差异是考虑到的每行的第 2 行的三元组数) 所以也许你可以找到一个聪明的方法来做到这一点!

话虽如此,一个快速且半向量化的解决方案是使用user42541 提出的带有精美索引的滑动窗口的简洁方式,因此我们用向量化调用替换内部循环:

indexer = np.arange(3)[:,None] + np.arange(DP.shape[1] - 2)[None,:]
for r in range(2, DP.shape[0] - 1):
    DP[r,1:-1] += np.min(DP[r-1,indexer], axis = 0)

对于 1500x1500 的整数数组,这导致相对于您的双循环方法(您的矢量化解决方案在我的电脑中不起作用)大约两个数量级的加速。


推荐阅读