首页 > 解决方案 > 如何通过多维数组优化循环?

问题描述

在 Python 3 上收集了一些 3D netCDF 数据后,我正在遍历每个 x,y 数据点以计算另一个变量。此变量的计算取决于给定 x,y 点的 z。代码似乎运行正确,但速度非常慢;我想知道是否有人对如何优化代码以使其运行得更快有建议。

我已经从定义许多中间变量的更长的代码变成了相当简单的代码,如下所示。即使在修剪代码之后,它也运行缓慢(即,外部 for 循环中的每个 i 都运行几分钟)。

for i in range(0,217):
    print(i)
    for j in range(0,301):
        for k in range(10,30):
            if (data.variables[longvars[v][2]][0][k][i][j]-data.variables[longvars[v][3]][0][i][j]) <= 3000.0:
                break

        if (abs(data.variables[longvars[v][2]][0][k][i][j]-data.variables[longvars[v][3]][0][i][j])-3000.) \
         < (abs(data.variables[longvars[v][2]][0][k-1][i][j]-data.variables[longvars[v][3]][0][i][j])-3000.):
            lev = k
        else:
            lev = k-1

        newd[i][j] = np.sqrt(((data.variables[longvars[v][0]][0][lev][i][j]-data.variables[longvars[v][4]][0][0][i][j])**2)+((data.variables[longvars[v][1]][0][lev][i][j]-data.variables[longvars[v][5]][0][0][i][j])**2))

我想可能有一种方法可以使用另一个数组来执行此操作,该数组为每个 x,y (i,j) 点存储正确的 z (k) 级别,然后对整个数据数组运行计算。但是,我不知道它会更快。我很感激人们可以提供的任何帮助!

标签: python

解决方案


逻辑看起来很合理,但我们可以使用生成器和推导进一步优化它。

让我们将内部逻辑隔离到一个名为findZValue.

def findZValue(v, i, j, variables, longvars, np):

如果我读错了,请原谅我,但看起来您正试图找到最接近 3000 的值的索引?如果是这样,首先我们将创建一个生成器,它返回一个包含索引和“变量 - 变量 - 3000”的绝对值的元组:

def findZValue(v, i, j, variables, longvars, np):
    lev = ((k, abs(variables[longvars[v][2]][0][k][i][j] - variables[longvars[v][3]][0][i][j] - 3000)) for k in range(10, 30))

为了获得我们想要的值,我们将整个东西包装在一个min函数中(键表示我们希望它按第二个值排序)并指定我们要获取索引(即返回的元组中的第一个值min) :

def findZValue(v, i, j, variables, longvars, np):
    lev = min(((k, abs(variables[longvars[v][2]][0][k][i][j] - variables[longvars[v][3]][0][i][j] - 3000)) for k in range(10, 30)), key = lambda t: t[1])[0]

对于放入“newd”的值,看起来您正在取平方和的根(即它的归一化或大小)。幸运的是,numpy(这是我假设的“np”)有一个内置的方法来查找数组的大小/归一化:np.linalg.norm。我们所要做的就是将其他值放入 np.array 中,然后在它们上调用它:

def findZValue(v, i, j, variables, longvars, np):
    lev = min(((k, abs(variables[longvars[v][2]][0][k][i][j] - variables[longvars[v][3]][0][i][j] - 3000)) for k in range(10, 30)), key = lambda t: t[1])[0]
    return np.linalg.norm(np.array(variables[longvars[v][0]][0][lev][i][j]-variables[longvars[v][4]][0][0][i][j], variables[longvars[v][1]][0][lev][i][j]-variables[longvars[v][5]][0][0][i][j]))

现在我们可以将整个循环放入嵌套推导中:

newd = [[findZValue(v, i, j, data.variables, longvars, np) for j in range(301)] for i in range(217)]

def findZValue(v, i, j, variables, longvars, np):
    lev = min(((k, abs(variables[longvars[v][2]][0][k][i][j] - variables[longvars[v][3]][0][i][j] - 3000)) for k in range(10, 30)), key = lambda t: t[1])[0]

    return np.linalg.norm(np.array(variables[longvars[v][0]][0][lev][i][j]-variables[longvars[v][4]][0][0][i][j], variables[longvars[v][1]][0][lev][i][j]-variables[longvars[v][5]][0][0][i][j]))

使用生成器和理解应该比使用 for 循环更快。但是,如果您真的想加快速度,我们可以使用“多处理”。具体来说,一个多处理池。为此,我们需要创建第二个函数来处理每个向量(这是由于对多处理池工作方式的限制):

from multiprocessing import Pool

def findZValue(v, i, j, variables, longvars, np):
    lev = min(((k, abs(variables[longvars[v][2]][0][k][i][j] - variables[longvars[v][3]][0][i][j] - 3000)) for k in range(10, 30)), key = lambda t: t[1])[0]

    return np.linalg.norm(np.array(variables[longvars[v][0]][0][lev][i][j]-variables[longvars[v][4]][0][0][i][j], variables[longvars[v][1]][0][lev][i][j]-variables[longvars[v][5]][0][0][i][j]))

def findZValuesForVector(vector):
    return [findZValue(*values) for values in vector]

with Pool(processes=4) as pool:
    newd = pool.map(findZValuesForVector, [[[v, i, j, data.variables, longvars, np] for j in range(301)] for i in range(217)])

您可以更改为池创建的“进程”数量,以查看什么可以为您提供最佳结果。


推荐阅读