首页 > 解决方案 > 具有多个参数的 lambda 的 Numpy 矢量化

问题描述

昨晚我尝试了一段时间,将各种不同的 numpy 矢量化函数用于一项任务(要明确的是,没有它们是完全可行的),以在点之间创建线性插值。

假设我有一个浮点向量(我们称它们为“点”),

v = np.array([9. , 1. , 4.2, 5.6, 3. , 4.6])

我想在相邻点之间进行插值,所以我需要采取这些对:

def adjacent_pairs(v):
    """
    Given a 1D numpy array `v = np.array([1, ..., n])`, return a 2D numpy array of
    adjacent pairs, `np.array([(1,2), ..., (n-1,n)])`.
    """
    s = v.shape
    d = len(s)
    assert d == 1, ValueError(f"Vector must be 1D - got a {d}D vector: shape = {s})")
    return np.vstack([v[:-1],v[1:]]).T

adjacent_pairs(v)给出:

array([[9. , 1. ],
       [1. , 4.2],
       [4.2, 5.6],
       [5.6, 3. ],
       [3. , 4.6]])

[9., 1.]我想按大小为 0.2 的间隔对这些对(矩阵的行,例如-1 表示降序)并将其乘以要arange作为step参数传递的步长。

这有效:

def interpolate_1d(v, step=0.2):
    v_adj = adjacent_pairs(v)
    d = np.diff(v_adj) / np.abs(np.diff(v_adj))
    interpolated = [np.arange(*r, diff * step) for r, diff in zip(v_adj, d)]
    return interpolated 

但是我意识到这zip()部分不是“在”numpy,也许我应该以这样的方式来做。

我开始研究 numpy 中的各种“矢量化”函数(据我所知,它有时可以加快您的代码速度),但我无法将此代码重新格式化为、 或的抽象np.fromiter,昨晚几个小时后我'我希望更熟悉这些的人可以启发我如何在我的代码中使用其中的一个或多个。np.vectorizenp.frompyfunc

我更喜欢分别传入行和差异符号(as lambda row, diff: ...),但我无法让这些工作,所以我hstack编辑了v_adjandd数组,以便每行都包含它们(我只会需要一个 lambda 参数)。

这是该函数的两个版本:

def interpolate_1d_vectorised(v, step=0.2):
    """
    Couldn't get this to work: how to expand out the two parts at a time to pass to
    the lambda function?
    """
    v_adj = adjacent_pairs(v)
    d = np.diff(v_adj) / np.abs(np.diff(v_adj))
    # lambda_func = lambda row, diff: np.arange(*row, diff * step)
    lambda_func = lambda row, diff: np.arange(row[0], row[1], diff * step)
    row_arange = np.vectorize(lambda_func, signature="(),()->()")
    interpolated = row_arange(v_adj, d)
    return interpolated


def interpolate_1d_vectorised_triples(v, step=0.2):
    v_adj = adjacent_pairs(v)
    d = np.diff(v_adj) / np.abs(np.diff(v_adj))
    triples = np.hstack([v_adj, d])
    triple_lambda = lambda t: np.arange(t[0], t[1], t[2] * step)
    row_arange_t = np.vectorize(triple_lambda, signature="()->()")
    interpolated = row_arange_t(triples)
    return interpolated

我得到的一些示例错误:

我尝试使用 lambda 函数进行调试,该函数只打印出它正在处理的值,并且似乎矢量化发生在数组中的每个值上,而不是每一行(这是我想要的)。这似乎可以解释错误消息,但我仍然不清楚如何一次将三个值(或一次一行)作为向量化函数的输入并为每个输入产生一个输出。

我以前使用过np.apply_along_axisnp.apply_over_axes但我也遇到了各种错误。

我希望这会起作用:

triple_lambda = lambda t: np.arange(t[0], t[1], t[2] * 0.2)
np.apply_along_axis(triple_lambda, 1, triples)

但它给出了: ValueError: could not broadcast input array from shape (16) into shape (40),我认为这意味着插值会使向量更大。

np.apply_over_axes(triple_lambda, triples, axes=[0,2])TypeError: <lambda>() takes 1 positional argument but 2 were given(相同时axes=[0,1])。

(这就是我放弃的重点)

抱歉,如果这不是使用这些功能的正确应用程序,请让我知道是否还有其他更好的方法(以及如果这些功能将用于替代的话,该怎么办)。我本来打算删除这些尝试并继续前进,但我想我应该在这里问一下,这样我将来可能会学习如何使用这些功能。非常感谢任何建议!

标签: pythonnumpynumpy-ufunc

解决方案


因此,首先,lambda等同于def,但更具限制性。你真的不需要使用lambda,因为你可以像传递任何其他对象一样按名称传递任何函数。

其次,np.vectorize基本上是一个美化的for循环。它一次处理一个元素。您没有返回不同大小的值的选项,这是您需要的。这解释了您当前的错误。即使没有错误,它也不会比你最初的zip. 从文档:

提供该vectorize功能主要是为了方便,而不是为了性能。该实现本质上是一个 for 循环。

让我们从计算每个范围内的元素数量开始:

ranges = np.diff(v)
sign = np.sign(ranges)
steps = np.ceil(np.abs(ranges) / step).astype(int)
steps[-1] += 1

您现在可以创建一个与输出大小相同的增量向量:

increments = np.repeat(step * sign, steps)

cumsum如果为每个段设置初始值,则可以按增量运行。每个段的开始是 的相应值v减去先前的残差。

range_start = np.cumsum(steps[:-1])
increments[0] = v[0]
increments[range_start] = v[1:-1] - (v[0:-2] + sign[:-1] * (steps[:-1] - 1) * step)

现在你可以只取累积和(并设置最后一个元素):

result = np.cumsum(increments)
result[-1] = v[-1]

您可能偶尔会遇到一些舍入误差问题,这就是为什么去除任意残差的通用解决方案是一个好主意的原因。它还可以正确处理步骤的非整数倍数:

>>> interpolate_1d(v)
array([9. , 8.8, 8.6, 8.4, 8.2, 8. , 7.8, 7.6, 7.4, 7.2, 7. , 6.8, 6.6,
       6.4, 6.2, 6. , 5.8, 5.6, 5.4, 5.2, 5. , 4.8, 4.6, 4.4, 4.2, 4. ,
       3.8, 3.6, 3.4, 3.2, 3. , 2.8, 2.6, 2.4, 2.2, 2. , 1.8, 1.6, 1.4,
       1.2, 1. , 1.2, 1.4, 1.6, 1.8, 2. , 2.2, 2.4, 2.6, 2.8, 3. , 3.2,
       3.4, 3.6, 3.8, 4. , 4.2, 4.4, 4.6, 4.8, 5. , 5.2, 5.4, 5.6, 5.4,
       5.2, 5. , 4.8, 4.6, 4.4, 4.2, 4. , 3.8, 3.6, 3.4, 3.2, 3. , 3.2,
       3.4, 3.6, 3.8, 4. , 4.2, 4.4, 4.6])
>>> interpolate_1d([1., 2.5, 1.])
array([1. , 1.2, 1.4, 1.6, 1.8, 2. , 2.2, 2.4, 2.5, 2.3, 2.1, 1.9, 1.7,
       1.5, 1.3, 1.1, 1. ])

关于这一点,如果您 100% 确定您的所有范围都是步长的倍数,并且您不关心一点舍入误差,您可以对原始定义求和,increments无需任何进一步修改:

increments = np.repeat(step * sign, steps)
increments[0] = v[0]
result = np.cumsum(increments)

TL;博士

def interpolate_1d(v, step=0.2):
    ranges = np.diff(v)
    sign = np.sign(ranges)
    steps = np.ceil(np.abs(ranges) / step).astype(int)
    steps[-1] += 1
    range_start = np.cumsum(steps[:-1])
    increments = np.repeat(step * sign, steps)
    increments[0] = v[0]
    increments[range_start] = v[1:-1] - (v[0:-2] + sign[:-1] * (steps[:-1] - 1) * step)
    result = np.cumsum(increments)
    result[-1] = v[-1]
    return result

推荐阅读